forked from nikic/ditaio
-
Notifications
You must be signed in to change notification settings - Fork 5
/
CoroutineInterface.php
473 lines (419 loc) · 13.3 KB
/
CoroutineInterface.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
<?php
namespace Async;
use Async\TaskInterface;
use Async\ParallelInterface;
use Async\FiberInterface;
use Async\Spawn\Channeled;
use Async\Spawn\FutureInterface;
use Async\RuntimeException;
use Async\Threads\Thread;
use Async\Threads\TWorker;
interface CoroutineInterface
{
/**
* Creates a new task (using the next free task id), wraps **Generator**, a `coroutine` into a `Task` and schedule its execution.
* Returns the `Task` object/id.
*
* @see https://docs.python.org/3.10/library/asyncio-task.html#creating-tasks
* @source https://github.com/python/cpython/blob/11909c12c75a7f377460561abc97707a4006fc07/Lib/asyncio/tasks.py#L331
*
* @param \Generator $coroutine
* @param bool $isAsync should task type be set to a `async` function
*
* @return int task ID
*/
public function createTask(\Generator $coroutine, bool $isAsync = false): int;
/**
* Add an new task into the running task queue.
*
* @param TaskInterface $task
*/
public function schedule(TaskInterface $task);
/**
* Add a fiber instance (using the next free task id).
* The fiber added has been wrapped into a coroutine for the `tasks map` list and schedules its execution.
*
* @param FiberInterface $fiber
* @return int fiber task id
*/
public function addFiber(FiberInterface $fiber);
/**
* Add an fiber instance into the running task queue.
*
* @param FiberInterface $fiber
*/
public function scheduleFiber(FiberInterface $fiber);
/**
* Check if `object` is a FiberInterface instance.
*
* @param mixed $fiber
* @return boolean
*/
public function isFiber($object);
/**
* Performs a clean application shutdown, killing tasks/processes, and resetting all data, except **created** `async` functions.
* - This function needs to be prefixed with `yield`
*
* Provide `$skipTask` incase called by an Signal Handler.
*
* @param int $skipTask - Defaults to the main parent task.
* - The calling `$skipTask` task id will not get cancelled, the script execution will return to.
* - Use `currentTask()` to retrieve caller's task id.
*/
public function shutdown(?int $skipTask = 1);
/**
* Reset all `Coroutine` data.
*/
public function close();
/**
* kill/remove an subprocess progress `realtime` ipc handler task.
*
* @param TaskInterface $task
*
* @return void
*/
public function cancelProgress(TaskInterface $task);
public function cancelledList(): ?array;
/**
* kill/remove an task using task id,
* optionally pass custom cancel state for third party code integration.
*
* @see https://docs.python.org/3.9/library/asyncio-task.html#asyncio.Task.cancel
* @source https://github.com/python/cpython/blob/bb0b5c12419b8fa657c96185d62212aea975f500/Lib/asyncio/tasks.py#L181
*
* @param int $tid
* @param mixed $customState
* @param string $errorMessage
* @return bool
*
* @throws \InvalidArgumentException
*/
public function cancelTask(int $tid, $customState = null, string $errorMessage = 'Invalid task ID!');
/**
* Start the main supervisor task.
* Walk the task `queue` and execute the tasks.
* If a task is finished it's dropped, otherwise rescheduled at the end of the `queue`.
* - The `task` that's finish with any `result`, is moved into an `completed` task list.
*
* @see https://docs.python.org/3.7/library/asyncio-task.html#running-an-asyncio-program
*/
public function run();
/**
* Set main supervisor task running state to `false`.
* This allows the supervisor task to be recreated.
*
* @return void
*/
public function ioStop();
/**
* Adds a read `event/socket/stream/file` descriptor to start
* monitoring for read availability and invoke callback
* once it's available for reading.
*
* @see https://docs.python.org/3.9/library/asyncio-eventloop.html#asyncio.loop.add_reader
* @source https://github.com/python/cpython/blob/aa056ed472e9d0a79ea21784f6f5171d12a13f85/Lib/asyncio/selector_events.py#L257
*
* @param resource $stream
* @param Fiber|Task|\Generator|Callable $task
*/
public function addReader($stream, $task, bool $addEvent = true): CoroutineInterface;
/**
* Adds a write `event/socket/stream/file` descriptor to start
* monitoring for write availability and invoke callback
* once it's available for writing.
*
* @see https://docs.python.org/3.9/library/asyncio-eventloop.html#asyncio.loop.add_writer
* @source https://github.com/python/cpython/blob/aa056ed472e9d0a79ea21784f6f5171d12a13f85/Lib/asyncio/selector_events.py#L294
*
* @param resource $stream
* @param Fiber|Task|\Generator|Callable $task
*/
public function addWriter($stream, $task, bool $addEvent = true): CoroutineInterface;
/**
* Stop monitoring the `event/socket/stream/file` descriptor for read availability.
*
* @see https://docs.python.org/3.9/library/asyncio-eventloop.html#asyncio.loop.remove_reader
* @source https://github.com/python/cpython/blob/aa056ed472e9d0a79ea21784f6f5171d12a13f85/Lib/asyncio/selector_events.py#L273
*
* @param resource $stream
*/
public function removeReader($stream, bool $removeEvent = true): CoroutineInterface;
/**
* Stop monitoring the `event/socket/stream/file` descriptor for write availability.
*
* @see https://docs.python.org/3.9/library/asyncio-eventloop.html#asyncio.loop.remove_writer
* @source https://github.com/python/cpython/blob/aa056ed472e9d0a79ea21784f6f5171d12a13f85/Lib/asyncio/selector_events.py#L310
*
* @param resource $stream
*/
public function removeWriter($stream, bool $removeEvent = true): CoroutineInterface;
/**
* Executes a function after x seconds.
*
* @param Task|\Generator|Callable $task
* @param float $timeout
* @param int $tid task ID
*/
public function addTimeout($task = null, float $timeout = 0.0, int $tid = null);
/**
* Stop the execution of a `Task`'s timeout.
*
* @param TaskInterface $task
* @return void
*/
public function clearTimeout(TaskInterface $task): void;
/**
* Creates an object instance of the value which will signal to `Coroutine::create` that it's a return value.
*
* - yield Coroutine::value("I'm a return value!");
*
* @internal
*
* @param mixed $value
* @return ReturnValue
*/
public static function value($value);
/**
* Creates an object instance of the value which will signal to `Coroutine::create` that it's a return value.
*
* @internal
*
* @param mixed $value
* @return PlainValue
*/
public static function plain($value);
/**
* Return the currently running/pending task list.
*
* @internal
*
* @return array<TaskInterface>|null
*/
public function currentList(): ?array;
/**
* Return list of completed tasks, which the **results** has not been retrieved using `gather()`.
*
* @internal
*
* @return array<TaskInterface>|null
*/
public function completedList(): ?array;
/**
* Check `Id` among **completed** `Task` list.
*
* @internal
*
* @param integer $tid
* @return boolean
*/
public function isCompleted(int $tid): bool;
/**
* Return an completed `task` by `Id`.
*
* @param integer $tid
* @return FiberInterface|TaskInterface
*/
public function getCompleted(int $tid);
/**
* Update _completed_ tasks list, and _current/running_ task, if cancelling the task.
*
* @internal
*
* @param integer $taskId a completed `task` Id.
* @param array $completeList already **modified** completed task list.
* @param callable|null $onClear optionally custom update function.
* @param boolean $cancel should the `task` be **killed/removed**.
* @param boolean $forceUpdate pull the completed list.
* @return void
*/
public function updateCompleted(
int $taskId,
array $completeList = [],
?callable $onClear = null,
bool $cancel = false,
bool $forceUpdate = false
): void;
/**
* Return the `Task` instance reference by `int` task id.
*
* @param int $taskId
*
* @internal
*
* @return null|TaskInterface|FiberInstance
*/
public function getTask(?int $taskId = 0);
public function isGroup(int $tid): bool;
public function getGroup(): ?array;
public function getGroupResult(int $tid);
public function setGroupResult(int $tid, $value): void;
/**
* Add callable for parallel processing, in an separate php process
*
* @see https://docs.python.org/3.8/library/asyncio-subprocess.html#creating-subprocesses
*
* @param callable $callable
* @param int|float|null $timeout The timeout in seconds or null to disable
* @param bool $display set to show child process output
* @param Channeled|resource|mixed|null $channel IPC communication to be pass to the underlying process standard input.
*
* @return FutureInterface
*/
public function addFuture($callable, int $timeout = 0, bool $display = false, $channel = null): FutureInterface;
/**
* This will cause a _new thread_ to be **created** and **spawned** for the associated `Thread` object,
* where its _internal_ task `queue` will begin to be processed.
* - Add callable for parallel processing, in an separate `thread`
*
* @see https://docs.python.org/3.10/library/threading.html#module-threading
*
* @param string|int $tid Thread ID
* @param callable $callable
* @param mixed ...$args
* @return TWorker
*/
public function addThread(int $tid, callable $callable, ...$args): TWorker;
/**
* There are no **UV** file system operations/events pending.
*
* @return bool
*/
public function isFsEmpty(): bool;
/**
* Add a UV file system operation to counter.
*
* @return void
*/
public function fsAdd(): void;
/**
* Remove a UV file system operation from counter.
*
* @return void
*/
public function fsRemove(): void;
/**
* There are no **UV** network operations pending.
*
* @return bool
*/
public function isIoEmpty(): bool;
/**
* Add a UV network operation to counter.
*
* @return void
*/
public function ioAdd(): void;
/**
* Remove a UV network operation from counter.
*
* @return void
*/
public function ioRemove(): void;
/**
* Return the `Coroutine` class `libuv` loop handle, otherwise throw exception, if enabled and no driver found.
*
* @return null|\UVLoop
* @throws RuntimeException
*/
public function getUV(): ?\UVLoop;
/**
* Is `libuv` features available.
*/
public function isUv(): bool;
/**
* Turn **On** _manual_ `main supervisor task` execution, pause *automatic*.
*
* @return void
*/
public function futureOn(): void;
/**
* Turn **Off** _manual_ `main supervisor task` execution, resume *automatic*.
*
* @return void
*/
public function futureOff(): void;
/**
* Setup to use `libuv` features, reset/recreate **UV** handle, enable/disable.
* - This will `stop` and `delete` any current **UV** event loop instance.
* - This will also reset `FileSystem::setup` and **symplely/spawn** `Spawn::setup`
* with the same config.
*
* @param bool $useUvLoop
*
* @return CoroutineInterface
*/
public function setup(bool $useUvLoop = true): CoroutineInterface;
/**
* The `Parallel` class pool future instance.
*
* @return ParallelInterface
*/
public function getParallel(): ParallelInterface;
/**
* The `Thread` class instance.
*
* @return Thread
*/
public function getThread(): Thread;
/**
* Check if `PCNTL` extension is available for asynchronous signaling.
*
* @return bool
*/
public function isPcntl(): bool;
/**
* Run all `tasks` in the queue.
*
* If there are none, no I/O, timers or etc... the script/application will exit immediately.
*
* @internal
*
* @param bool $isReturn - a conditional return or a flag for additional processing after one loop tick.
*/
public function execute($isReturn = false);
/**
* Execute/schedule the retrieved `$task`.
*
* @internal
*
* @param Task|\Generator|Callable $task
* @param mixed $parameters
*/
public function executeTask($task, $parameters = null);
/**
* Create and manage a stack of nested coroutine calls. This allows turning
* regular functions/methods into sub-coroutines just by yielding them.
*
* - $value = (yield functions/methods($foo, $bar));
*
* @internal
*
* @param \Generator $gen
*/
public static function create(\Generator $gen);
/**
* Register a listener to be notified when a signal has been caught by this process.
*
* This is useful to catch user interrupt signals or shutdown signals from the `OS`.
*
* The listener callback function MUST be able to accept a single parameter,
* the signal added by this method or you MAY use a function which
* has no parameters at all.
*
* **Note: A listener can only be added once to the same signal, any
* attempts to add it more than once will be ignored.**
*
* @param int $signal
* @param Task|\Generator|Callable $listener
*/
public function addSignal($signal, $listener);
/**
* Removes a previously added signal listener.
*
* Any attempts to remove listeners that aren't registered will be ignored.
*
* @param int $signal
* @param Task|\Generator|Callable $listener
*/
public function removeSignal($signal, $listener);
}