diff --git a/async.c b/async.c index a9afdaf..01121b4 100644 --- a/async.c +++ b/async.c @@ -141,6 +141,7 @@ PHP_FUNCTION(Async_suspend) THROW_IF_SCHEDULER_CONTEXT; ZEND_ASYNC_ENQUEUE_COROUTINE(ZEND_ASYNC_CURRENT_COROUTINE); ZEND_ASYNC_SUSPEND(); + zend_async_waker_clean(ZEND_ASYNC_CURRENT_COROUTINE); } PHP_FUNCTION(Async_protect) @@ -277,6 +278,7 @@ PHP_FUNCTION(Async_await) ZEND_ASYNC_SUSPEND(); if (UNEXPECTED(EG(exception) != NULL)) { + zend_async_waker_clean(coroutine); RETURN_THROWS(); } @@ -288,7 +290,7 @@ PHP_FUNCTION(Async_await) ZVAL_COPY(return_value, &coroutine->waker->result); } - zend_async_waker_destroy(coroutine); + zend_async_waker_clean(coroutine); } PHP_FUNCTION(Async_awaitAnyOrFail) @@ -609,7 +611,7 @@ PHP_FUNCTION(Async_delay) ZEND_ASYNC_SUSPEND(); - zend_async_waker_destroy(coroutine); + zend_async_waker_clean(coroutine); } PHP_FUNCTION(Async_timeout) diff --git a/async_API.c b/async_API.c index d3f1448..b32786a 100644 --- a/async_API.c +++ b/async_API.c @@ -824,6 +824,7 @@ static void async_cancel_awaited_futures(async_await_context_t *await_context, H } ZEND_ASYNC_SUSPEND(); + zend_async_waker_clean(ZEND_ASYNC_CURRENT_COROUTINE); } /** @@ -1044,6 +1045,7 @@ void async_await_futures(zval *iterable, if (coroutine->waker->events.nNumOfElements > 0) { ZEND_ASYNC_SUSPEND(); + zend_async_waker_clean(ZEND_ASYNC_CURRENT_COROUTINE); } // If the await on futures has completed and diff --git a/benchmarks/compare_results.php b/benchmarks/compare_results.php new file mode 100644 index 0000000..accc150 --- /dev/null +++ b/benchmarks/compare_results.php @@ -0,0 +1,144 @@ +&1", $output, $return_var); + return [ + 'output' => implode("\n", $output), + 'success' => $return_var === 0 + ]; +} + +// Function to parse benchmark results from output +function parseResults($output) { + $results = []; + + if (preg_match('/Time: ([0-9.]+) seconds/', $output, $matches)) { + $results['time'] = (float)$matches[1]; + } + + if (preg_match('/Switches per second: ([0-9,]+)/', $output, $matches)) { + $results['switches_per_sec'] = (int)str_replace(',', '', $matches[1]); + } + + if (preg_match('/Overhead per switch: ([0-9.]+) μs/', $output, $matches)) { + $results['overhead_us'] = (float)$matches[1]; + } + + if (preg_match('/Used for benchmark: ([0-9.]+) MB/', $output, $matches)) { + $results['memory_mb'] = (float)$matches[1]; + } + + return $results; +} + +echo "Running coroutines benchmark...\n"; +$coroutineResult = runBenchmark('coroutines_benchmark.php'); + +echo "Running fibers benchmark...\n"; +$fiberResult = runBenchmark('fibers_benchmark.php'); + +echo "\n" . str_repeat("=", 60) . "\n"; +echo "COMPARISON RESULTS\n"; +echo str_repeat("=", 60) . "\n\n"; + +if (!$coroutineResult['success']) { + echo "❌ Coroutines benchmark failed:\n"; + echo $coroutineResult['output'] . "\n\n"; +} else { + echo "✅ Coroutines benchmark completed successfully\n\n"; +} + +if (!$fiberResult['success']) { + echo "❌ Fibers benchmark failed:\n"; + echo $fiberResult['output'] . "\n\n"; +} else { + echo "✅ Fibers benchmark completed successfully\n\n"; +} + +// Parse and compare results if both succeeded +if ($coroutineResult['success'] && $fiberResult['success']) { + $coroutineStats = parseResults($coroutineResult['output']); + $fiberStats = parseResults($fiberResult['output']); + + echo "📊 PERFORMANCE COMPARISON:\n\n"; + + // Time comparison + if (isset($coroutineStats['time']) && isset($fiberStats['time'])) { + $timeRatio = $fiberStats['time'] / $coroutineStats['time']; + echo "⏱️ Execution Time:\n"; + echo " Coroutines: " . number_format($coroutineStats['time'], 4) . "s\n"; + echo " Fibers: " . number_format($fiberStats['time'], 4) . "s\n"; + if ($timeRatio > 1) { + echo " 🏆 Coroutines are " . number_format($timeRatio, 2) . "x faster\n\n"; + } else { + echo " 🏆 Fibers are " . number_format(1/$timeRatio, 2) . "x faster\n\n"; + } + } + + // Throughput comparison + if (isset($coroutineStats['switches_per_sec']) && isset($fiberStats['switches_per_sec'])) { + echo "🚀 Throughput (switches/sec):\n"; + echo " Coroutines: " . number_format($coroutineStats['switches_per_sec']) . "\n"; + echo " Fibers: " . number_format($fiberStats['switches_per_sec']) . "\n"; + $throughputRatio = $coroutineStats['switches_per_sec'] / $fiberStats['switches_per_sec']; + if ($throughputRatio > 1) { + echo " 🏆 Coroutines have " . number_format($throughputRatio, 2) . "x higher throughput\n\n"; + } else { + echo " 🏆 Fibers have " . number_format(1/$throughputRatio, 2) . "x higher throughput\n\n"; + } + } + + // Overhead comparison + if (isset($coroutineStats['overhead_us']) && isset($fiberStats['overhead_us'])) { + echo "⚡ Overhead per switch:\n"; + echo " Coroutines: " . number_format($coroutineStats['overhead_us'], 2) . " μs\n"; + echo " Fibers: " . number_format($fiberStats['overhead_us'], 2) . " μs\n"; + $overheadRatio = $fiberStats['overhead_us'] / $coroutineStats['overhead_us']; + if ($overheadRatio > 1) { + echo " 🏆 Coroutines have " . number_format($overheadRatio, 2) . "x lower overhead\n\n"; + } else { + echo " 🏆 Fibers have " . number_format(1/$overheadRatio, 2) . "x lower overhead\n\n"; + } + } + + // Memory comparison + if (isset($coroutineStats['memory_mb']) && isset($fiberStats['memory_mb'])) { + echo "💾 Memory Usage:\n"; + echo " Coroutines: " . number_format($coroutineStats['memory_mb'], 2) . " MB\n"; + echo " Fibers: " . number_format($fiberStats['memory_mb'], 2) . " MB\n"; + $memoryRatio = $fiberStats['memory_mb'] / $coroutineStats['memory_mb']; + if ($memoryRatio > 1) { + echo " 🏆 Coroutines use " . number_format($memoryRatio, 2) . "x less memory\n\n"; + } else { + echo " 🏆 Fibers use " . number_format(1/$memoryRatio, 2) . "x less memory\n\n"; + } + } +} else { + echo "⚠️ Cannot compare results - one or both benchmarks failed\n"; +} + +echo "💡 Note: Results may vary based on system load and configuration\n"; +echo "💡 Run multiple times and average results for production comparisons\n"; + +echo "\nComparison completed.\n"; \ No newline at end of file diff --git a/benchmarks/coroutines_benchmark.php b/benchmarks/coroutines_benchmark.php new file mode 100644 index 0000000..95fcb43 --- /dev/null +++ b/benchmarks/coroutines_benchmark.php @@ -0,0 +1,102 @@ + $end - $start, + 'memoryBeforeCreate' => $memoryBeforeCreate, + 'memoryAfterCreate' => $memoryAfterCreate, + 'creationOverhead' => $memoryAfterCreate - $memoryBeforeCreate + ]; +} + +// Memory usage tracking +function getCurrentMemoryUsage() { + return memory_get_usage(true); +} + +function getPeakMemoryUsage() { + return memory_get_peak_usage(true); +} + +// Run benchmark +echo "Configuration:\n"; +echo "- Iterations: $iterations\n"; +echo "- Switches per iteration: $switches\n"; +echo "- Total context switches: " . ($iterations * $switches) . "\n\n"; + +// Memory usage before benchmark +$memoryBefore = getCurrentMemoryUsage(); + +// Warmup +echo "Warming up...\n"; +benchmarkCoroutines(100, 10); + +echo "\nRunning coroutines benchmark...\n"; + +// Benchmark coroutines +$result = benchmarkCoroutines($iterations, $switches); +$coroutineTime = $result['time']; + +// Memory usage after benchmark +$memoryAfter = getCurrentMemoryUsage(); +$memoryPeak = getPeakMemoryUsage(); + +// Results +echo "\n=== Results ===\n"; +echo "Time: " . number_format($coroutineTime, 4) . " seconds\n"; +echo "Switches per second: " . number_format(($iterations * $switches) / $coroutineTime, 0) . "\n"; +echo "Overhead per switch: " . number_format(($coroutineTime / ($iterations * $switches)) * 1000000, 2) . " μs\n"; + +echo "\nMemory Usage:\n"; +echo "Before: " . number_format($memoryBefore / 1024 / 1024, 2) . " MB\n"; +echo "After creation: " . number_format($result['memoryAfterCreate'] / 1024 / 1024, 2) . " MB\n"; +echo "After completion: " . number_format($memoryAfter / 1024 / 1024, 2) . " MB\n"; +echo "Peak: " . number_format($memoryPeak / 1024 / 1024, 2) . " MB\n"; +echo "Creation overhead: " . number_format($result['creationOverhead'] / 1024 / 1024, 2) . " MB\n"; +echo "Used for benchmark: " . number_format(($memoryAfter - $memoryBefore) / 1024 / 1024, 2) . " MB\n"; + +// Additional metrics +$totalSwitches = $iterations * $switches; +echo "\nPerformance Metrics:\n"; +echo "Total coroutines created: $iterations\n"; +echo "Total context switches: $totalSwitches\n"; +echo "Average time per coroutine: " . number_format($coroutineTime / $iterations * 1000, 2) . " ms\n"; +echo "Memory per coroutine (creation): " . number_format($result['creationOverhead'] / $iterations, 0) . " bytes\n"; +echo "Memory per coroutine (total): " . number_format(($memoryAfter - $memoryBefore) / $iterations, 0) . " bytes\n"; + +echo "\nCoroutines benchmark completed.\n"; \ No newline at end of file diff --git a/benchmarks/fibers_benchmark.php b/benchmarks/fibers_benchmark.php new file mode 100644 index 0000000..df17b16 --- /dev/null +++ b/benchmarks/fibers_benchmark.php @@ -0,0 +1,127 @@ +start(); + } + + $memoryAfterStart = getCurrentMemoryUsage(); + + while (true) { + $alive = false; + foreach ($fibers as $fiber) { + if (!$fiber->isTerminated()) { + $alive = true; + $fiber->resume(); + } + } + if (!$alive) break; + } + + $end = microtime(true); + return [ + 'time' => $end - $start, + 'memoryBeforeCreate' => $memoryBeforeCreate, + 'memoryAfterCreate' => $memoryAfterCreate, + 'memoryAfterStart' => $memoryAfterStart, + 'creationOverhead' => $memoryAfterCreate - $memoryBeforeCreate, + 'startOverhead' => $memoryAfterStart - $memoryAfterCreate + ]; +} + +// Memory usage tracking +function getCurrentMemoryUsage() { + return memory_get_usage(true); +} + +function getPeakMemoryUsage() { + return memory_get_peak_usage(true); +} + +// Run benchmark +echo "Configuration:\n"; +echo "- PHP Version: " . PHP_VERSION . "\n"; +echo "- Iterations: $iterations\n"; +echo "- Switches per iteration: $switches\n"; +echo "- Total context switches: " . ($iterations * $switches) . "\n\n"; + +// Memory usage before benchmark +$memoryBefore = getCurrentMemoryUsage(); + +// Warmup +echo "Warming up...\n"; +benchmarkFibers(100, 10); + +echo "\nRunning fibers benchmark...\n"; + +// Benchmark fibers +$result = benchmarkFibers($iterations, $switches); +$fiberTime = $result['time']; + +// Memory usage after benchmark +$memoryAfter = getCurrentMemoryUsage(); +$memoryPeak = getPeakMemoryUsage(); + +// Results +echo "\n=== Results ===\n"; +echo "Time: " . number_format($fiberTime, 4) . " seconds\n"; +echo "Switches per second: " . number_format(($iterations * $switches) / $fiberTime, 0) . "\n"; +echo "Overhead per switch: " . number_format(($fiberTime / ($iterations * $switches)) * 1000000, 2) . " μs\n"; + +echo "\nMemory Usage:\n"; +echo "Before: " . number_format($memoryBefore / 1024 / 1024, 2) . " MB\n"; +echo "After creation: " . number_format($result['memoryAfterCreate'] / 1024 / 1024, 2) . " MB\n"; +echo "After start: " . number_format($result['memoryAfterStart'] / 1024 / 1024, 2) . " MB\n"; +echo "After completion: " . number_format($memoryAfter / 1024 / 1024, 2) . " MB\n"; +echo "Peak: " . number_format($memoryPeak / 1024 / 1024, 2) . " MB\n"; +echo "Creation overhead: " . number_format($result['creationOverhead'] / 1024 / 1024, 2) . " MB\n"; +echo "Start overhead: " . number_format($result['startOverhead'] / 1024 / 1024, 2) . " MB\n"; +echo "Used for benchmark: " . number_format(($memoryAfter - $memoryBefore) / 1024 / 1024, 2) . " MB\n"; + +// Additional metrics +$totalSwitches = $iterations * $switches; +echo "\nPerformance Metrics:\n"; +echo "Total fibers created: $iterations\n"; +echo "Total context switches: $totalSwitches\n"; +echo "Average time per fiber: " . number_format($fiberTime / $iterations * 1000, 2) . " ms\n"; +echo "Memory per fiber (creation): " . number_format($result['creationOverhead'] / $iterations, 0) . " bytes\n"; +echo "Memory per fiber (start): " . number_format($result['startOverhead'] / $iterations, 0) . " bytes\n"; +echo "Memory per fiber (total): " . number_format(($memoryAfter - $memoryBefore) / $iterations, 0) . " bytes\n"; + +echo "\nFibers benchmark completed.\n"; \ No newline at end of file diff --git a/benchmarks/http_server_coroutines.php b/benchmarks/http_server_coroutines.php new file mode 100644 index 0000000..29888ad --- /dev/null +++ b/benchmarks/http_server_coroutines.php @@ -0,0 +1,111 @@ + ['message' => 'Hello from Async HTTP Server!', 'server' => 'async-coroutines'], + '/health' => ['status' => 'healthy', 'uptime' => time()], + '/json' => ['data' => range(1, 100), 'timestamp' => microtime(true)], + '/small' => ['ok' => true], + default => ['error' => 'Not Found', 'uri' => $uri] + }; + + $status_code = ($uri === '/' || $uri === '/health' || $uri === '/json' || $uri === '/small') ? 200 : 404; + $response_body = json_encode($response_data, JSON_UNESCAPED_SLASHES); + + // Build HTTP response + $response = "HTTP/1.1 $status_code " . ($status_code === 200 ? 'OK' : 'Not Found') . "\r\n"; + $response .= "Content-Type: application/json\r\n"; + $response .= "Content-Length: " . strlen($response_body) . "\r\n"; + $response .= "Server: AsyncCoroutines/1.0\r\n"; + $response .= "Connection: close\r\n"; + $response .= "\r\n"; + $response .= $response_body; + + // Send response + fwrite($client, $response); + fclose($client); +} + +/** + * HTTP Server using coroutines + */ +function startHttpServer($host, $port) { + return spawn(function() use ($host, $port) { + // Create server socket + $server = stream_socket_server("tcp://$host:$port", $errno, $errstr, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN); + if (!$server) { + throw new Exception("Could not create server: $errstr ($errno)"); + } + + echo "Server listening on $host:$port\n"; + echo "Try: curl http://$host:$port/\n"; + echo "Benchmark: wrk -t12 -c400 -d30s http://$host:$port/\n\n"; + + $request_id = 0; + $active_handlers = []; + + while (true) { + // Accept new connections (this is async in async extension) + $client = stream_socket_accept($server, 0); + + if ($client) { + $request_id++; + + // Handle request in separate coroutine + spawn(handleHttpRequest(...), $client, $request_id); + } + } + + fclose($server); + }); +} + +// Start server +try { + $server_task = startHttpServer($host, $port); + + // Run until interrupted + awaitAll([$server_task]); + +} catch (Exception $e) { + echo "Server error: " . $e->getMessage() . "\n"; + exit(1); +} \ No newline at end of file diff --git a/coroutine.c b/coroutine.c index f657421..c3c38a8 100644 --- a/coroutine.c +++ b/coroutine.c @@ -13,6 +13,11 @@ | Author: Edmond | +----------------------------------------------------------------------+ */ + +/////////////////////////////////////////////////////////// +/// 1. Headers, Constants, and Declarations +/////////////////////////////////////////////////////////// + #include "coroutine.h" #include "context.h" @@ -29,502 +34,459 @@ #include "zend_ini.h" #define METHOD(name) PHP_METHOD(Async_Coroutine, name) +#define THIS_COROUTINE ((async_coroutine_t *) ZEND_ASYNC_OBJECT_TO_EVENT(Z_OBJ_P(ZEND_THIS))) zend_class_entry *async_ce_coroutine = NULL; -static zend_function coroutine_root_function = { ZEND_INTERNAL_FUNCTION }; - -/////////////////////////////////////////////////////////// -/// Coroutine methods -/////////////////////////////////////////////////////////// +static zend_object_handlers coroutine_handlers; -#define THIS_COROUTINE ((async_coroutine_t *) ZEND_ASYNC_OBJECT_TO_EVENT(Z_OBJ_P(ZEND_THIS))) +// Forward declarations for internal functions +static void coroutine_call_finally_handlers(async_coroutine_t *coroutine); +static void finally_context_dtor(finally_handlers_context_t *context); -METHOD(getId) -{ - ZEND_PARSE_PARAMETERS_NONE(); +// Forward declarations for event system +static void coroutine_event_start(zend_async_event_t *event); +static void coroutine_event_stop(zend_async_event_t *event); +static void coroutine_add_callback(zend_async_event_t *event, zend_async_event_callback_t *callback); +static void coroutine_del_callback(zend_async_event_t *event, zend_async_event_callback_t *callback); +static bool coroutine_replay(zend_async_event_t *event, zend_async_event_callback_t *callback, zval *result, zend_object **exception); +static zend_string *coroutine_info(zend_async_event_t *event); +static void coroutine_dispose(zend_async_event_t *event); - RETURN_LONG(Z_OBJ_P(ZEND_THIS)->handle); -} +/////////////////////////////////////////////////////////// +/// 2. Object Lifecycle Management +/////////////////////////////////////////////////////////// -METHOD(asHiPriority) +static zend_object *coroutine_object_create(zend_class_entry *class_entry) { - // TODO: Implement priority handling in scheduler - // For now, just return the same coroutine - RETURN_ZVAL(ZEND_THIS, 1, 0); -} + async_coroutine_t *coroutine = zend_object_alloc(sizeof(async_coroutine_t), class_entry); -METHOD(getContext) -{ - ZEND_PARSE_PARAMETERS_NONE(); + ZVAL_UNDEF(&coroutine->coroutine.result); - async_coroutine_t *coroutine = THIS_COROUTINE; + ZEND_ASYNC_EVENT_SET_ZEND_OBJ(&coroutine->coroutine.event); + ZEND_ASYNC_EVENT_SET_NO_FREE_MEMORY(&coroutine->coroutine.event); + ZEND_ASYNC_EVENT_SET_ZEND_OBJ_OFFSET(&coroutine->coroutine.event, XtOffsetOf(async_coroutine_t, std)); - if (coroutine->coroutine.context == NULL) { - async_context_t *context = async_context_new(); - if (UNEXPECTED(context == NULL)) { - RETURN_THROWS(); - } + /* Initialize embedded waker */ + coroutine->coroutine.waker = &coroutine->waker; - coroutine->coroutine.context = &context->base; - } + /* Initialize waker contents (memory is already zeroed by zend_object_alloc) */ + zend_async_waker_init(&coroutine->waker); - // Return the context object - RETURN_OBJ_COPY(&((async_context_t *) coroutine->coroutine.context)->std); -} + /* Initialize switch handlers */ + coroutine->coroutine.switch_handlers = NULL; -METHOD(getResult) -{ - ZEND_PARSE_PARAMETERS_NONE(); + zend_async_event_t *event = &coroutine->coroutine.event; - async_coroutine_t *coroutine = THIS_COROUTINE; + event->start = coroutine_event_start; + event->stop = coroutine_event_stop; + event->add_callback = coroutine_add_callback; + event->del_callback = coroutine_del_callback; + event->replay = coroutine_replay; + event->info = coroutine_info; + event->dispose = coroutine_dispose; - if (!ZEND_COROUTINE_IS_FINISHED(&coroutine->coroutine)) { - RETURN_NULL(); - } + coroutine->coroutine.extended_data = NULL; + coroutine->finally_handlers = NULL; - if (Z_TYPE(coroutine->coroutine.result) == IS_UNDEF) { - RETURN_NULL(); - } + zend_object_std_init(&coroutine->std, class_entry); + object_properties_init(&coroutine->std, class_entry); - RETURN_ZVAL(&coroutine->coroutine.result, 1, 0); + return &coroutine->std; } -METHOD(getException) +static void coroutine_object_destroy(zend_object *object) { - ZEND_PARSE_PARAMETERS_NONE(); - - async_coroutine_t *coroutine = THIS_COROUTINE; + async_coroutine_t *coroutine = (async_coroutine_t *) ZEND_ASYNC_OBJECT_TO_EVENT(object); - if (false == ZEND_COROUTINE_IS_FINISHED(&coroutine->coroutine)) { - RETURN_NULL(); - } + ZEND_ASSERT(ZEND_ASYNC_WAKER_NOT_IN_QUEUE(&coroutine->waker) && + "Coroutine waker must be dequeued before destruction"); - if (coroutine->coroutine.exception == NULL) { - RETURN_NULL(); + if (coroutine->coroutine.scope != NULL) { + async_scope_notify_coroutine_finished(coroutine); + coroutine->coroutine.scope = NULL; } - RETURN_OBJ_COPY(coroutine->coroutine.exception); -} + if (coroutine->coroutine.fcall) { -METHOD(getTrace) -{ - // TODO: Implement debug trace collection - // This would require fiber stack trace functionality - array_init(return_value); -} + zend_fcall_t *fcall = coroutine->coroutine.fcall; + coroutine->coroutine.fcall = NULL; -METHOD(getSpawnFileAndLine) -{ - ZEND_PARSE_PARAMETERS_NONE(); + if (fcall->fci.param_count) { + for (uint32_t i = 0; i < fcall->fci.param_count; i++) { + zval_ptr_dtor(&fcall->fci.params[i]); + } - async_coroutine_t *coroutine = THIS_COROUTINE; + efree(fcall->fci.params); + } - array_init(return_value); + if (fcall->fci.named_params) { + GC_DELREF(fcall->fci.named_params); + fcall->fci.named_params = NULL; + } - if (coroutine->coroutine.filename) { - add_next_index_str(return_value, zend_string_copy(coroutine->coroutine.filename)); - } else { - add_next_index_null(return_value); + zval_ptr_dtor(&fcall->fci.function_name); + efree(fcall); } - add_next_index_long(return_value, coroutine->coroutine.lineno); -} - -METHOD(getSpawnLocation) -{ - ZEND_PARSE_PARAMETERS_NONE(); - - async_coroutine_t *coroutine = THIS_COROUTINE; + if (coroutine->coroutine.context != NULL) { + // If the coroutine has a context, we need to release it. + async_context_t *context = (async_context_t *) coroutine->coroutine.context; + coroutine->coroutine.context = NULL; + async_context_dispose(context); + } if (coroutine->coroutine.filename) { - RETURN_STR(zend_strpprintf(0, "%s:%d", ZSTR_VAL(coroutine->coroutine.filename), coroutine->coroutine.lineno)); - } else { - RETURN_STRING("unknown"); + zend_string_release_ex(coroutine->coroutine.filename, 0); + coroutine->coroutine.filename = NULL; } -} -METHOD(getSuspendFileAndLine) -{ - ZEND_PARSE_PARAMETERS_NONE(); + // Cleanup embedded waker contents + zend_async_waker_destroy(&coroutine->coroutine); + coroutine->coroutine.waker = NULL; - async_coroutine_t *coroutine = THIS_COROUTINE; + if (coroutine->coroutine.internal_context != NULL) { + zend_async_coroutine_internal_context_dispose(&coroutine->coroutine); + } - array_init(return_value); + zval_ptr_dtor(&coroutine->coroutine.result); + ZVAL_UNDEF(&coroutine->coroutine.result); - if (coroutine->coroutine.waker && coroutine->coroutine.waker->filename) { - add_next_index_str(return_value, zend_string_copy(coroutine->coroutine.waker->filename)); - add_next_index_long(return_value, coroutine->coroutine.waker->lineno); - } else { - add_next_index_null(return_value); - add_next_index_long(return_value, 0); - } -} + if (coroutine->coroutine.exception != NULL) { + // If the coroutine has an exception, we need to release it. -METHOD(getSuspendLocation) -{ - ZEND_PARSE_PARAMETERS_NONE(); + zend_object *exception = coroutine->coroutine.exception; + coroutine->coroutine.exception = NULL; + OBJ_RELEASE(exception); + } - async_coroutine_t *coroutine = THIS_COROUTINE; + if (coroutine->deferred_cancellation != NULL) { + zend_object *deferred_cancellation = coroutine->deferred_cancellation; + coroutine->deferred_cancellation = NULL; + OBJ_RELEASE(deferred_cancellation); + } - if (coroutine->coroutine.waker && coroutine->coroutine.waker->filename) { - RETURN_STR(zend_strpprintf( - 0, "%s:%d", ZSTR_VAL(coroutine->coroutine.waker->filename), coroutine->coroutine.waker->lineno)); - } else { - RETURN_STRING("unknown"); + if (coroutine->finally_handlers) { + zend_array_destroy(coroutine->finally_handlers); + coroutine->finally_handlers = NULL; } } -METHOD(isStarted) +static void coroutine_free(zend_object *object) { - ZEND_PARSE_PARAMETERS_NONE(); - RETURN_BOOL(ZEND_COROUTINE_IS_STARTED(&THIS_COROUTINE->coroutine)); + async_coroutine_t *coroutine = (async_coroutine_t *) ZEND_ASYNC_OBJECT_TO_EVENT(object); + + zend_async_callbacks_free(&coroutine->coroutine.event); + zend_object_std_dtor(object); } -METHOD(isQueued) +static HashTable *async_coroutine_object_gc(zend_object *object, zval **table, int *num) { - ZEND_PARSE_PARAMETERS_NONE(); + async_coroutine_t *coroutine = (async_coroutine_t *) ZEND_ASYNC_OBJECT_TO_EVENT(object); + zend_get_gc_buffer *buf = zend_get_gc_buffer_create(); - async_coroutine_t *coroutine = THIS_COROUTINE; + /* Always add basic ZVALs from coroutine structure */ + zend_get_gc_buffer_add_zval(buf, &coroutine->coroutine.result); - if (coroutine->coroutine.waker == NULL) { - RETURN_FALSE; + /* Add objects that may be present */ + if (coroutine->coroutine.exception) { + zend_get_gc_buffer_add_obj(buf, coroutine->coroutine.exception); } - RETURN_BOOL(coroutine->coroutine.waker->status == ZEND_ASYNC_WAKER_QUEUED); -} - -METHOD(isRunning) -{ - ZEND_PARSE_PARAMETERS_NONE(); + if (coroutine->deferred_cancellation) { + zend_get_gc_buffer_add_obj(buf, coroutine->deferred_cancellation); + } - async_coroutine_t *coroutine = THIS_COROUTINE; + /* Add finally handlers if present */ + if (coroutine->finally_handlers) { + zval *val; + ZEND_HASH_FOREACH_VAL(coroutine->finally_handlers, val) + { + zend_get_gc_buffer_add_zval(buf, val); + } + ZEND_HASH_FOREACH_END(); + } - // Coroutine is running if it's the current one and is started but not finished - RETURN_BOOL(ZEND_COROUTINE_IS_STARTED(&coroutine->coroutine) && - false == ZEND_COROUTINE_IS_FINISHED(&coroutine->coroutine)); -} + /* Add internal context HashTable if present */ + if (coroutine->coroutine.internal_context) { + zval *val; + ZEND_HASH_FOREACH_VAL(coroutine->coroutine.internal_context, val) + { + zend_get_gc_buffer_add_zval(buf, val); + } + ZEND_HASH_FOREACH_END(); + } -METHOD(isSuspended) -{ - ZEND_PARSE_PARAMETERS_NONE(); + /* Add fcall function name and parameters if present */ + if (coroutine->coroutine.fcall) { + zend_get_gc_buffer_add_zval(buf, &coroutine->coroutine.fcall->fci.function_name); - RETURN_BOOL(ZEND_COROUTINE_SUSPENDED(&THIS_COROUTINE->coroutine)); -} + /* Add function parameters */ + if (coroutine->coroutine.fcall->fci.param_count > 0 && coroutine->coroutine.fcall->fci.params) { + for (uint32_t i = 0; i < coroutine->coroutine.fcall->fci.param_count; i++) { + zend_get_gc_buffer_add_zval(buf, &coroutine->coroutine.fcall->fci.params[i]); + } + } + } -METHOD(isCancelled) -{ - ZEND_PARSE_PARAMETERS_NONE(); + /* Add waker-related ZVALs if present */ + if (coroutine->coroutine.waker) { + zend_get_gc_buffer_add_zval(buf, &coroutine->waker.result); - RETURN_BOOL(ZEND_COROUTINE_IS_CANCELLED(&THIS_COROUTINE->coroutine) && - ZEND_COROUTINE_IS_FINISHED(&THIS_COROUTINE->coroutine)); -} + if (coroutine->waker.error) { + zend_get_gc_buffer_add_obj(buf, coroutine->waker.error); + } -METHOD(isCancellationRequested) -{ - ZEND_PARSE_PARAMETERS_NONE(); + /* Add events HashTable contents */ + zval *event_val; + zval zval_object; + ZEND_HASH_FOREACH_VAL(&coroutine->waker.events, event_val) + { - async_coroutine_t *coroutine = THIS_COROUTINE; + zend_async_event_t *event = (zend_async_event_t *) Z_PTR_P(event_val); - RETURN_BOOL((ZEND_COROUTINE_IS_CANCELLED(&coroutine->coroutine) && - !ZEND_COROUTINE_IS_FINISHED(&coroutine->coroutine)) || - coroutine->deferred_cancellation != NULL); -} - -METHOD(isFinished) -{ - ZEND_PARSE_PARAMETERS_NONE(); + if (ZEND_ASYNC_EVENT_IS_REFERENCE(event) || ZEND_ASYNC_EVENT_IS_ZEND_OBJ(event)) { + ZVAL_OBJ(&zval_object, ZEND_ASYNC_EVENT_TO_OBJECT(event)); + zend_get_gc_buffer_add_zval(buf, &zval_object); + } + } + ZEND_HASH_FOREACH_END(); - RETURN_BOOL(ZEND_COROUTINE_IS_FINISHED(&THIS_COROUTINE->coroutine)); -} + /* Add triggered events if present */ + if (coroutine->waker.triggered_events) { + ZEND_HASH_FOREACH_VAL(coroutine->waker.triggered_events, event_val) + { + zend_get_gc_buffer_add_zval(buf, event_val); + } + ZEND_HASH_FOREACH_END(); + } + } -METHOD(getAwaitingInfo) -{ - ZEND_PARSE_PARAMETERS_NONE(); + /* Add context ZVALs if present */ + if (coroutine->coroutine.context) { + /* Cast to actual context implementation to access HashTables */ + async_context_t *context = (async_context_t *) coroutine->coroutine.context; - zend_array *info = ZEND_ASYNC_GET_AWAITING_INFO(&THIS_COROUTINE->coroutine); + /* Add all values from context->values HashTable */ + zval *val; + ZEND_HASH_FOREACH_VAL(&context->values, val) + { + zend_get_gc_buffer_add_zval(buf, val); + } + ZEND_HASH_FOREACH_END(); - if (info == NULL) { - array_init(return_value); - } else { - RETURN_ARR(info); + /* Add all object keys from context->keys HashTable */ + ZEND_HASH_FOREACH_VAL(&context->keys, val) + { + zend_get_gc_buffer_add_zval(buf, val); + } + ZEND_HASH_FOREACH_END(); } -} -METHOD(cancel) -{ - zend_object *exception = NULL; + async_fiber_context_t *fiber_context = coroutine->fiber_context; - zend_class_entry *ce_cancellation_exception = ZEND_ASYNC_GET_CE(ZEND_ASYNC_EXCEPTION_CANCELLATION); + /* Check if we should traverse execution stack (similar to fibers) */ + if (fiber_context == NULL || (fiber_context->context.status != ZEND_FIBER_STATUS_SUSPENDED || !fiber_context->execute_data)) { + zend_get_gc_buffer_use(buf, table, num); + return NULL; + } - ZEND_PARSE_PARAMETERS_START(0, 1) - Z_PARAM_OPTIONAL; - Z_PARAM_OBJ_OF_CLASS_OR_NULL(exception, ce_cancellation_exception) - ZEND_PARSE_PARAMETERS_END(); + /* Traverse execution stack for suspended coroutines */ + HashTable *lastSymTable = NULL; + zend_execute_data *ex = fiber_context->execute_data; + for (; ex; ex = ex->prev_execute_data) { + HashTable *symTable; + if (ZEND_CALL_INFO(ex) & ZEND_CALL_GENERATOR) { + zend_generator *generator = (zend_generator *) ex->return_value; + if (!(generator->flags & ZEND_GENERATOR_CURRENTLY_RUNNING)) { + continue; + } + symTable = zend_generator_frame_gc(buf, generator); + } else { + symTable = zend_unfinished_execution_gc_ex( + ex, ex->func && ZEND_USER_CODE(ex->func->type) ? ex->call : NULL, buf, false); + } + if (symTable) { + if (lastSymTable) { + zval *val; + ZEND_HASH_FOREACH_VAL(lastSymTable, val) + { + if (EXPECTED(Z_TYPE_P(val) == IS_INDIRECT)) { + val = Z_INDIRECT_P(val); + } + zend_get_gc_buffer_add_zval(buf, val); + } + ZEND_HASH_FOREACH_END(); + } + lastSymTable = symTable; + } + } - ZEND_ASYNC_CANCEL(&THIS_COROUTINE->coroutine, exception, false); + zend_get_gc_buffer_use(buf, table, num); + return lastSymTable; } -METHOD(onFinally) +zend_coroutine_t *async_new_coroutine(zend_async_scope_t *scope) { - zval *callable; - - ZEND_PARSE_PARAMETERS_START(1, 1) - Z_PARAM_ZVAL(callable) - ZEND_PARSE_PARAMETERS_END(); + zend_object *object = coroutine_object_create(async_ce_coroutine); - if (UNEXPECTED(false == zend_is_callable(callable, 0, NULL))) { - zend_argument_type_error(1, "argument must be callable"); - RETURN_THROWS(); + if (UNEXPECTED(EG(exception))) { + return NULL; } - async_coroutine_t *coroutine = THIS_COROUTINE; - - // Check if coroutine is already finished - if (ZEND_COROUTINE_IS_FINISHED(&coroutine->coroutine)) { - - // Call the callable immediately - zval result, param; - ZVAL_UNDEF(&result); - ZVAL_OBJ(¶m, &coroutine->std); + async_coroutine_t *coroutine = (async_coroutine_t *) ZEND_ASYNC_OBJECT_TO_EVENT(object); + coroutine->coroutine.scope = scope; - if (UNEXPECTED(call_user_function(NULL, NULL, callable, &result, 1, ¶m) == FAILURE)) { - zend_throw_error(NULL, "Failed to call finally handler in finished coroutine"); - zval_ptr_dtor(&result); - RETURN_THROWS(); - } + return &coroutine->coroutine; +} - return; - } +void async_register_coroutine_ce(void) +{ + async_ce_coroutine = register_class_Async_Coroutine(async_ce_awaitable); - // Lazy initialization of finally_handlers array - if (coroutine->finally_handlers == NULL) { - coroutine->finally_handlers = zend_new_array(0); - } + async_ce_coroutine->create_object = coroutine_object_create; - if (UNEXPECTED(zend_hash_next_index_insert(coroutine->finally_handlers, callable) == NULL)) { - async_throw_error("Failed to add finally handler to coroutine"); - RETURN_THROWS(); - } + async_ce_coroutine->default_object_handlers = &coroutine_handlers; - Z_TRY_ADDREF_P(callable); + coroutine_handlers = std_object_handlers; + coroutine_handlers.offset = XtOffsetOf(async_coroutine_t, std); + coroutine_handlers.clone_obj = NULL; + coroutine_handlers.dtor_obj = coroutine_object_destroy; + coroutine_handlers.free_obj = coroutine_free; + coroutine_handlers.get_gc = async_coroutine_object_gc; } /////////////////////////////////////////////////////////// -/// Coroutine methods end +/// 3. Core Coroutine State Management /////////////////////////////////////////////////////////// -/////////////////////////////////////////////////////////// -/// Finally handlers functions -/////////////////////////////////////////////////////////// -static zend_result finally_handlers_iterator_handler(async_iterator_t *iterator, zval *current, zval *key) +ZEND_STACK_ALIGNED void async_coroutine_execute(async_coroutine_t *coroutine) { - finally_handlers_context_t *context = (finally_handlers_context_t *) iterator->extended_data; - zval rv; - ZVAL_UNDEF(&rv); - call_user_function(NULL, NULL, current, &rv, context->params_count, context->params); - zval_ptr_dtor(&rv); + bool should_start_graceful_shutdown = false; + bool is_bailout = false; - // Check for exceptions after handler execution - if (EG(exception)) { - zend_exception_save(); - zend_exception_restore(); - zend_object *current_exception = EG(exception); - GC_ADDREF(current_exception); - zend_clear_exception(); + zend_async_waker_t *waker = coroutine->coroutine.waker; - // Check for graceful/unwind exit exceptions - if (zend_is_graceful_exit(current_exception) || zend_is_unwind_exit(current_exception)) { - // Release CompositeException if exists - if (context->composite_exception) { - OBJ_RELEASE(context->composite_exception); - context->composite_exception = NULL; + if (UNEXPECTED(waker == NULL || waker->status == ZEND_ASYNC_WAKER_IGNORED)) { + + if (ZEND_COROUTINE_IS_CANCELLED(&coroutine->coroutine)) { + zend_try + { + if (EXPECTED(waker != NULL)) { + waker->status = ZEND_ASYNC_WAKER_RESULT; + zend_object *error = waker->error; + + // Transfer error from the Waker to current context if it exists. + if (UNEXPECTED(error)) { + waker->error = NULL; + async_rethrow_exception(error); + } + } + + async_coroutine_finalize(coroutine); } - // Throw graceful/unwind exit and stop iteration - async_rethrow_exception(current_exception); - return SUCCESS; + zend_catch + { + should_start_graceful_shutdown = true; + } + zend_end_try(); } - // Handle regular exceptions - if (context->composite_exception == NULL) { - context->composite_exception = current_exception; - } else if (!instanceof_function(context->composite_exception->ce, async_ce_composite_exception)) { - // Create CompositeException and add first exception - zend_object *composite_exception = async_new_composite_exception(); - if (UNEXPECTED(composite_exception == NULL)) { - // If we can't create CompositeException, throw the current one - async_rethrow_exception(current_exception); - return SUCCESS; - } + coroutine->coroutine.event.dispose(&coroutine->coroutine.event); - async_composite_exception_add_exception(composite_exception, context->composite_exception, true); - async_composite_exception_add_exception(composite_exception, current_exception, true); - context->composite_exception = composite_exception; - } else { - // Add exception to existing CompositeException - async_composite_exception_add_exception(context->composite_exception, current_exception, true); + if(EXPECTED(ZEND_ASYNC_CURRENT_COROUTINE == &coroutine->coroutine)) { + ZEND_ASYNC_CURRENT_COROUTINE = NULL; } + + return; } - return SUCCESS; -} + if (UNEXPECTED(waker->status == ZEND_ASYNC_WAKER_WAITING)) { + zend_error(E_ERROR, "Attempt to resume a coroutine that has not been resolved"); + coroutine->coroutine.event.dispose(&coroutine->coroutine.event); -static void finally_handlers_iterator_dtor(zend_async_iterator_t *zend_iterator) -{ - async_iterator_t *iterator = (async_iterator_t *) zend_iterator; + if(EXPECTED(ZEND_ASYNC_CURRENT_COROUTINE == &coroutine->coroutine)) { + ZEND_ASYNC_CURRENT_COROUTINE = NULL; + } - if (UNEXPECTED(iterator->extended_data == NULL)) { return; } - finally_handlers_context_t *context = iterator->extended_data; - async_scope_t *scope = (async_scope_t *) context->scope; - context->scope = NULL; + waker->status = ZEND_ASYNC_WAKER_RESULT; + zend_object *error = waker->error; - // Throw CompositeException if any exceptions were collected - if (context->composite_exception != NULL) { - if (ZEND_ASYNC_SCOPE_CATCH(&scope->scope, - &context->coroutine->coroutine, - NULL, - context->composite_exception, - false, - ZEND_ASYNC_SCOPE_IS_DISPOSE_SAFELY(&scope->scope))) { - OBJ_RELEASE(context->composite_exception); - context->composite_exception = NULL; - } + // The Waker object can be destroyed immediately if the result is an error. + // It will be delivered to the coroutine as an exception. + if (UNEXPECTED(error)) { + waker->error = NULL; + async_rethrow_exception(error); } - zend_object *composite_exception = context->composite_exception; - context->composite_exception = NULL; + zend_async_waker_clean(&coroutine->coroutine); - if (context->dtor != NULL) { - context->dtor(context); - context->dtor = NULL; - } + ZEND_COROUTINE_SET_STARTED(&coroutine->coroutine); - // Free the context - efree(context); - iterator->extended_data = NULL; + zend_try + { + if (EXPECTED(coroutine->coroutine.internal_entry == NULL)) { + ZEND_ASSERT(coroutine->coroutine.fcall != NULL && "Coroutine function call is not set"); + coroutine->coroutine.fcall->fci.retval = &coroutine->coroutine.result; - if (ZEND_ASYNC_EVENT_REF(&scope->scope.event) > 0) { - ZEND_ASYNC_EVENT_DEL_REF(&scope->scope.event); + zend_call_function(&coroutine->coroutine.fcall->fci, &coroutine->coroutine.fcall->fci_cache); - if (ZEND_ASYNC_EVENT_REF(&scope->scope.event) <= 1) { - scope->scope.try_to_dispose(&scope->scope); + zval_ptr_dtor(&coroutine->coroutine.fcall->fci.function_name); + ZVAL_UNDEF(&coroutine->coroutine.fcall->fci.function_name); + coroutine->coroutine.fcall->fci.retval = NULL; + } else { + coroutine->coroutine.internal_entry(); } } - - if (composite_exception != NULL) { - async_rethrow_exception(composite_exception); - } - - // - // If everything is correct, - // the Scope will destroy itself as soon as the coroutine created within it completes execution. - // Therefore, there's no point in taking additional actions to clean up resources. - // -} - -bool async_call_finally_handlers(HashTable *finally_handlers, finally_handlers_context_t *context, int32_t priority) -{ - if (finally_handlers == NULL || zend_hash_num_elements(finally_handlers) == 0) { - return false; + zend_catch + { + should_start_graceful_shutdown = true; + is_bailout = true; } + zend_end_try(); - // Create a special child scope for finally handlers - zend_async_scope_t *child_scope = ZEND_ASYNC_NEW_SCOPE(context->scope); - if (UNEXPECTED(child_scope == NULL)) { - return false; + zend_try + { + async_coroutine_finalize(coroutine); + OBJ_RELEASE(&coroutine->std); } - - zval handlers; - ZVAL_ARR(&handlers, finally_handlers); - - async_iterator_t *iterator = - async_iterator_new(&handlers, NULL, NULL, finally_handlers_iterator_handler, child_scope, 0, priority, 0); - - zval_ptr_dtor(&handlers); - - if (UNEXPECTED(EG(exception))) { - return false; + zend_catch + { + should_start_graceful_shutdown = true; + is_bailout = true; } + zend_end_try(); - context->composite_exception = NULL; - iterator->extended_data = context; - iterator->extended_dtor = finally_handlers_iterator_dtor; - async_iterator_run_in_coroutine(iterator, priority); - - // - // We retain ownership of the Scope in order to be able to handle exceptions from the Finally handlers. - // example: finally_handlers_iterator_dtor - // If the onFinally handlers throw an exception, it will end up in the Scope, - // so it’s important that the Scope is not destroyed before that moment. - // - ZEND_ASYNC_EVENT_ADD_REF(&context->scope->event); - - if (UNEXPECTED(EG(exception))) { - return false; + if(EXPECTED(ZEND_ASYNC_CURRENT_COROUTINE == &coroutine->coroutine)) { + ZEND_ASYNC_CURRENT_COROUTINE = NULL; } - return true; -} - -/////////////////////////////////////////////////////////// -/// internal functions -/////////////////////////////////////////////////////////// - -static zend_always_inline async_coroutine_t *coroutine_from_context(zend_fiber_context *context) -{ - ZEND_ASSERT(context->kind == async_ce_coroutine && "Fiber context does not belong to a Coroutine fiber"); - - return (async_coroutine_t *) (((char *) context) - XtOffsetOf(async_coroutine_t, context)); -} - -void async_coroutine_cleanup(zend_fiber_context *context) -{ - async_coroutine_t *coroutine = coroutine_from_context(context); - - zend_vm_stack current_stack = EG(vm_stack); - EG(vm_stack) = coroutine->vm_stack; - zend_vm_stack_destroy(); - EG(vm_stack) = current_stack; - coroutine->execute_data = NULL; - - OBJ_RELEASE(&coroutine->std); -} - -static void finally_context_dtor(finally_handlers_context_t *context) -{ - if (context->coroutine != NULL) { - // Release the coroutine reference - OBJ_RELEASE(&context->coroutine->std); - context->coroutine = NULL; + if (UNEXPECTED(should_start_graceful_shutdown)) { + zend_try + { + ZEND_ASYNC_SHUTDOWN(); + } + zend_catch + { + zend_error(E_CORE_WARNING, + "A critical error was detected during the initiation of the graceful shutdown mode."); + zend_bailout(); + } + zend_end_try(); } -} - -static zend_always_inline void coroutine_call_finally_handlers(async_coroutine_t *coroutine) -{ - HashTable *finally_handlers = coroutine->finally_handlers; - coroutine->finally_handlers = NULL; - finally_handlers_context_t *finally_context = ecalloc(1, sizeof(finally_handlers_context_t)); - finally_context->coroutine = coroutine; - finally_context->scope = coroutine->coroutine.scope; - finally_context->dtor = finally_context_dtor; - finally_context->params_count = 1; - ZVAL_OBJ(&finally_context->params[0], &coroutine->std); - if (async_call_finally_handlers(finally_handlers, finally_context, 1)) { - GC_ADDREF(&coroutine->std); // Keep reference to coroutine while handlers are running - } else { - efree(finally_context); - zend_array_destroy(finally_handlers); + if (is_bailout) { + zend_bailout(); } } -void async_coroutine_finalize(zend_fiber_transfer *transfer, async_coroutine_t *coroutine) +void async_coroutine_finalize(async_coroutine_t *coroutine) { // Before finalizing the coroutine - // we check that we’re properly finishing the coroutine’s execution. + // we check that we're properly finishing the coroutine's execution. // The coroutine must not be in the queue! if (UNEXPECTED(ZEND_ASYNC_WAKER_IN_QUEUE(coroutine->coroutine.waker))) { zend_error(E_CORE_WARNING, "Attempt to finalize a coroutine that is still in the queue"); @@ -649,15 +611,7 @@ void async_coroutine_finalize(zend_fiber_transfer *transfer, async_coroutine_t * } zend_end_try(); - if (UNEXPECTED(EG(exception))) { - if (!(coroutine->flags & ZEND_FIBER_FLAG_DESTROYED) || - !(zend_is_graceful_exit(EG(exception)) || zend_is_unwind_exit(EG(exception)))) { - coroutine->flags |= ZEND_FIBER_FLAG_THREW; - transfer->flags = ZEND_FIBER_TRANSFER_FLAG_ERROR; - - ZVAL_OBJ_COPY(&transfer->value, EG(exception)); - } - + if (UNEXPECTED(EG(exception) && (zend_is_graceful_exit(EG(exception)) || zend_is_unwind_exit(EG(exception))))) { zend_clear_exception(); } @@ -673,192 +627,202 @@ void async_coroutine_finalize(zend_fiber_transfer *transfer, async_coroutine_t * } } + coroutine->fiber_context = NULL; + if (UNEXPECTED(do_bailout)) { zend_bailout(); } } /** - * Finalizes the coroutine from the scheduler. - * - * This function is called when the coroutine is being finalized from the scheduler. - * It ensures that the coroutine's waker is properly handled and that any exceptions - * are propagated correctly. + * The function suspends the execution of the coroutine and triggers a switch to another one. + * After calling this function, you must properly clean up the coroutine waker object + * (example zend_async_waker_clean). * - * @param coroutine The coroutine to finalize. + * @param from_main For main coroutine */ -void async_coroutine_finalize_from_scheduler(async_coroutine_t *coroutine) +void async_coroutine_suspend(const bool from_main) { - zend_async_waker_t *waker = coroutine->coroutine.waker; - ZEND_ASSERT(waker != NULL && "Waker must not be NULL when finalizing coroutine from scheduler"); - - // Save EG(exception) state - zend_object *prev_exception = EG(prev_exception); - zend_object *exception = EG(exception); - - EG(exception) = waker->error; - EG(prev_exception) = NULL; + if (UNEXPECTED(from_main)) { + // If the Scheduler was never used, it means no coroutines were created, + // so execution can be finished without doing anything. + if (circular_buffer_is_empty(&ASYNC_G(microtasks)) && zend_hash_num_elements(&ASYNC_G(coroutines)) == 0) { + return; + } - waker->error = NULL; - waker->status = ZEND_ASYNC_WAKER_NO_STATUS; + async_scheduler_main_coroutine_suspend(); + return; + } - bool do_bailout = false; + async_scheduler_coroutine_suspend(); +} - zend_try - { - async_coroutine_finalize(NULL, coroutine); - } - zend_catch - { - do_bailout = true; +void async_coroutine_resume(zend_coroutine_t *coroutine, zend_object *error, const bool transfer_error) +{ + if (UNEXPECTED(coroutine->waker == NULL || coroutine->waker->status == ZEND_ASYNC_WAKER_NO_STATUS)) { + async_throw_error("Cannot resume a coroutine that has not been suspended"); + return; } - zend_end_try(); - // If an exception occurs during finalization, we need to restore the previous exception state - zend_object *new_exception = EG(exception); - zend_object *new_prev_exception = EG(prev_exception); + if (error != NULL) { + if (coroutine->waker->error != NULL) { + + if (false == instanceof_function(error->ce, ZEND_ASYNC_GET_CE(ZEND_ASYNC_EXCEPTION_CANCELLATION))) { + zend_exception_set_previous(error, coroutine->waker->error); + coroutine->waker->error = error; - EG(exception) = exception; - EG(prev_exception) = prev_exception; + if (false == transfer_error) { + GC_ADDREF(error); + } + } else { + if (transfer_error) { + OBJ_RELEASE(error); + } + } + } else { + coroutine->waker->error = error; - if (UNEXPECTED(new_prev_exception)) { - async_rethrow_exception(new_prev_exception); + if (false == transfer_error) { + GC_ADDREF(error); + } + } } - if (UNEXPECTED(new_exception)) { - async_rethrow_exception(new_exception); + if (UNEXPECTED(coroutine->waker->status == ZEND_ASYNC_WAKER_QUEUED)) { + return; } - if (UNEXPECTED(do_bailout)) { - zend_bailout(); + if (UNEXPECTED(circular_buffer_push(&ASYNC_G(coroutine_queue), &coroutine, true)) == FAILURE) { + async_throw_error("Failed to enqueue coroutine"); + return; } + + coroutine->waker->status = ZEND_ASYNC_WAKER_QUEUED; } -ZEND_STACK_ALIGNED void async_coroutine_execute(zend_fiber_transfer *transfer) +void async_coroutine_cancel(zend_coroutine_t *zend_coroutine, + zend_object *error, + bool transfer_error, + const bool is_safely) { - ZEND_ASSERT(Z_TYPE(transfer->value) == IS_NULL && "Initial transfer value to coroutine context must be NULL"); - ZEND_ASSERT(!transfer->flags && "No flags should be set on initial transfer"); - - async_coroutine_t *coroutine = (async_coroutine_t *) ZEND_ASYNC_CURRENT_COROUTINE; - ZEND_COROUTINE_SET_STARTED(&coroutine->coroutine); - - /* Call switch handlers for coroutine entering */ - if (UNEXPECTED(coroutine->coroutine.switch_handlers != NULL)) { - ZEND_COROUTINE_ENTER(&coroutine->coroutine); - } + // If the coroutine finished, do nothing. + if (ZEND_COROUTINE_IS_FINISHED(zend_coroutine)) { + if (transfer_error && error != NULL) { + OBJ_RELEASE(error); + } - /* Determine the current error_reporting ini setting. */ - zend_long error_reporting = INI_INT("error_reporting"); - if (!error_reporting && !INI_STR("error_reporting")) { - error_reporting = E_ALL; + return; } - EG(vm_stack) = NULL; - bool should_start_graceful_shutdown = false; - - zend_first_try - { - zend_vm_stack stack = zend_vm_stack_new_page(ZEND_FIBER_VM_STACK_SIZE, NULL); - EG(vm_stack) = stack; - EG(vm_stack_top) = stack->top + ZEND_CALL_FRAME_SLOT; - EG(vm_stack_end) = stack->end; - EG(vm_stack_page_size) = ZEND_FIBER_VM_STACK_SIZE; + // An attempt to cancel a coroutine that is currently running. + // In this case, nothing actually happens immediately; + // however, the coroutine is marked as having been cancelled, + // and the cancellation exception is stored as its result. + if (UNEXPECTED(zend_coroutine == ZEND_ASYNC_CURRENT_COROUTINE)) { - coroutine->execute_data = (zend_execute_data *) stack->top; + ZEND_COROUTINE_SET_CANCELLED(zend_coroutine); - memset(coroutine->execute_data, 0, sizeof(zend_execute_data)); + if (zend_coroutine->exception == NULL) { + zend_coroutine->exception = error; - coroutine->execute_data->func = &coroutine_root_function; + if (false == transfer_error) { + GC_ADDREF(error); + } + } - EG(current_execute_data) = coroutine->execute_data; - EG(jit_trace_num) = 0; - EG(error_reporting) = (int) error_reporting; + if (zend_coroutine->exception == NULL) { + zend_coroutine->exception = async_new_exception(async_ce_cancellation_exception, "Coroutine cancelled"); + } -#ifdef ZEND_CHECK_STACK_LIMIT - EG(stack_base) = zend_fiber_stack_base(coroutine->context.stack); - EG(stack_limit) = zend_fiber_stack_limit(coroutine->context.stack); -#endif + return; + } - if (EXPECTED(coroutine->coroutine.internal_entry == NULL)) { - ZEND_ASSERT(coroutine->coroutine.fcall != NULL && "Coroutine function call is not set"); - coroutine->coroutine.fcall->fci.retval = &coroutine->coroutine.result; + zend_async_waker_t *waker = zend_async_waker_define(zend_coroutine); - zend_call_function(&coroutine->coroutine.fcall->fci, &coroutine->coroutine.fcall->fci_cache); + const bool is_error_null = (error == NULL); - zval_ptr_dtor(&coroutine->coroutine.fcall->fci.function_name); - ZVAL_UNDEF(&coroutine->coroutine.fcall->fci.function_name); - } else { - coroutine->coroutine.internal_entry(); + if (is_error_null) { + error = async_new_exception(async_ce_cancellation_exception, "Coroutine cancelled"); + transfer_error = true; + if (UNEXPECTED(EG(exception))) { + return; } } - zend_catch - { - coroutine->flags |= ZEND_FIBER_FLAG_BAILOUT; - transfer->flags = ZEND_FIBER_TRANSFER_FLAG_BAILOUT; - should_start_graceful_shutdown = true; - } - zend_end_try(); - zend_first_try - { - async_coroutine_finalize(transfer, coroutine); - } - zend_catch - { - coroutine->flags |= ZEND_FIBER_FLAG_BAILOUT; - transfer->flags = ZEND_FIBER_TRANSFER_FLAG_BAILOUT; - should_start_graceful_shutdown = true; - } - zend_end_try(); + // If the coroutine is currently protected from cancellation, defer the cancellation. + if (ZEND_COROUTINE_IS_PROTECTED(zend_coroutine)) { + async_coroutine_t *coroutine = (async_coroutine_t *) zend_coroutine; - coroutine->context.cleanup = &async_coroutine_cleanup; - coroutine->vm_stack = EG(vm_stack); + if (coroutine->deferred_cancellation == NULL) { + coroutine->deferred_cancellation = error; - if (UNEXPECTED(should_start_graceful_shutdown)) { - zend_first_try - { - ZEND_ASYNC_SHUTDOWN(); - } - zend_catch - { - zend_error(E_CORE_WARNING, - "A critical error was detected during the initiation of the graceful shutdown mode."); + if (false == transfer_error) { + GC_ADDREF(error); + } + } else if (transfer_error) { + OBJ_RELEASE(error); } - zend_end_try(); + + return; } - // - // The scheduler coroutine always terminates into the main execution flow. - // - if (UNEXPECTED(&coroutine->coroutine == ZEND_ASYNC_SCHEDULER)) { + bool was_cancelled = ZEND_COROUTINE_IS_CANCELLED(zend_coroutine); + ZEND_COROUTINE_SET_CANCELLED(zend_coroutine); - ZEND_ASYNC_SCHEDULER = NULL; + if (false == ZEND_COROUTINE_IS_STARTED(zend_coroutine)) { + + if (false == ZEND_ASYNC_WAKER_IN_QUEUE(waker)) { + // + // Situation: the coroutine is not in the queue, but a cancellation is requested. + // It might seem like we can simply remove the coroutine, + // but doing so would break the flow of the coroutine's handlers. + // Therefore, to normalize the flow, + // we place the coroutine in the queue with a status of ignored, + // so that the flow is executed correctly. + // + async_scheduler_coroutine_enqueue(zend_coroutine); + } - if (transfer != ASYNC_G(main_transfer)) { + waker->status = ZEND_ASYNC_WAKER_IGNORED; - if (UNEXPECTED(Z_TYPE(transfer->value) == IS_OBJECT)) { - zend_first_try - { - zval_ptr_dtor(&transfer->value); - } - zend_end_try(); - zend_error(E_CORE_WARNING, "The transfer value must be NULL when the main coroutine is resumed"); - } + // + // Exception override: + // If the coroutine already has an exception + // and it's a cancellation exception, then nothing needs to be done. + // In any other case, the cancellation exception overrides the existing exception. + // + ZEND_ASYNC_WAKER_APPLY_CANCELLATION(waker, error, transfer_error); + async_scheduler_coroutine_enqueue(zend_coroutine); + return; + } - transfer->context = ASYNC_G(main_transfer)->context; - transfer->flags = ASYNC_G(main_transfer)->flags; - ZVAL_COPY_VALUE(&transfer->value, &ASYNC_G(main_transfer)->value); - ZVAL_NULL(&ASYNC_G(main_transfer)->value); + // In safely mode, we don't forcibly terminate the coroutine, + // but we do mark it as a Zombie. + if (is_safely) { + async_scope_mark_coroutine_zombie((async_coroutine_t *) zend_coroutine); + ZEND_ASYNC_DECREASE_COROUTINE_COUNT + if (transfer_error && error != NULL) { + OBJ_RELEASE(error); } - return; } - transfer->context = NULL; + if (was_cancelled && waker->error != NULL && + instanceof_function(waker->error->ce, ZEND_ASYNC_GET_CE(ZEND_ASYNC_EXCEPTION_CANCELLATION))) { + if (transfer_error) { + OBJ_RELEASE(error); + } + } else { + ZEND_ASYNC_WAKER_APPLY_CANCELLATION(waker, error, transfer_error); + } - async_scheduler_coroutine_suspend(transfer); + async_scheduler_coroutine_enqueue(zend_coroutine); } +/////////////////////////////////////////////////////////// +/// 4. Event System Interface +/////////////////////////////////////////////////////////// + static void coroutine_event_start(zend_async_event_t *event) { } @@ -930,9 +894,9 @@ static zend_string *coroutine_info(zend_async_event_t *event) coroutine->std.handle, coroutine->coroutine.filename ? ZSTR_VAL(coroutine->coroutine.filename) : "", coroutine->coroutine.lineno, - coroutine->coroutine.waker->filename ? ZSTR_VAL(coroutine->coroutine.waker->filename) + coroutine->waker.filename ? ZSTR_VAL(coroutine->waker.filename) : "", - coroutine->coroutine.waker->lineno, + coroutine->waker.lineno, ZSTR_VAL(zend_coroutine_name)); } else { return zend_strpprintf(0, @@ -944,553 +908,531 @@ static zend_string *coroutine_info(zend_async_event_t *event) } } -void async_coroutine_suspend(const bool from_main) +static void coroutine_dispose(zend_async_event_t *event) { - if (UNEXPECTED(from_main)) { - // If the Scheduler was never used, it means no coroutines were created, - // so execution can be finished without doing anything. - if (circular_buffer_is_empty(&ASYNC_G(microtasks)) && zend_hash_num_elements(&ASYNC_G(coroutines)) == 0) { - return; - } - - async_scheduler_main_coroutine_suspend(); - return; - } - - async_scheduler_coroutine_suspend(NULL); + async_coroutine_t *coroutine = (async_coroutine_t *) event; + OBJ_RELEASE(&coroutine->std); } -void async_coroutine_resume(zend_coroutine_t *coroutine, zend_object *error, const bool transfer_error) +/////////////////////////////////////////////////////////// +/// 5. Context Management API +/////////////////////////////////////////////////////////// + +bool async_coroutine_context_set(zend_coroutine_t *z_coroutine, zval *key, zval *value) { - if (UNEXPECTED(coroutine->waker == NULL)) { - async_throw_error("Cannot resume a coroutine that has not been suspended"); - return; - } + async_coroutine_t *coroutine = + (async_coroutine_t *) (z_coroutine != NULL ? z_coroutine : ZEND_ASYNC_CURRENT_COROUTINE); - if (error != NULL) { - if (coroutine->waker->error != NULL) { + if (UNEXPECTED(coroutine == NULL || coroutine->coroutine.context == NULL)) { + return false; + } - if (false == instanceof_function(error->ce, ZEND_ASYNC_GET_CE(ZEND_ASYNC_EXCEPTION_CANCELLATION))) { - zend_exception_set_previous(error, coroutine->waker->error); - coroutine->waker->error = error; + coroutine->coroutine.context->set(coroutine->coroutine.context, key, value); + return true; +} - if (false == transfer_error) { - GC_ADDREF(error); - } - } else { - if (transfer_error) { - OBJ_RELEASE(error); - } - } - } else { - coroutine->waker->error = error; +bool async_coroutine_context_get(zend_coroutine_t *z_coroutine, zval *key, zval *result) +{ + async_coroutine_t *coroutine = + (async_coroutine_t *) (z_coroutine != NULL ? z_coroutine : ZEND_ASYNC_CURRENT_COROUTINE); - if (false == transfer_error) { - GC_ADDREF(error); - } + if (UNEXPECTED(coroutine == NULL || coroutine->coroutine.context == NULL)) { + if (result != NULL) { + ZVAL_NULL(result); } + return false; } - if (UNEXPECTED(coroutine->waker->status == ZEND_ASYNC_WAKER_QUEUED)) { - return; - } + return coroutine->coroutine.context->find(coroutine->coroutine.context, key, result, false); +} - if (UNEXPECTED(circular_buffer_push(&ASYNC_G(coroutine_queue), &coroutine, true)) == FAILURE) { - async_throw_error("Failed to enqueue coroutine"); - return; +bool async_coroutine_context_has(zend_coroutine_t *z_coroutine, zval *key) +{ + async_coroutine_t *coroutine = + (async_coroutine_t *) (z_coroutine != NULL ? z_coroutine : ZEND_ASYNC_CURRENT_COROUTINE); + + if (UNEXPECTED(coroutine == NULL || coroutine->coroutine.context == NULL)) { + return false; } - coroutine->waker->status = ZEND_ASYNC_WAKER_QUEUED; + return coroutine->coroutine.context->find(coroutine->coroutine.context, key, NULL, false); } -void async_coroutine_cancel(zend_coroutine_t *zend_coroutine, - zend_object *error, - bool transfer_error, - const bool is_safely) +bool async_coroutine_context_delete(zend_coroutine_t *z_coroutine, zval *key) { - // If the coroutine finished, do nothing. - if (ZEND_COROUTINE_IS_FINISHED(zend_coroutine)) { - if (transfer_error && error != NULL) { - OBJ_RELEASE(error); - } + async_coroutine_t *coroutine = + (async_coroutine_t *) (z_coroutine != NULL ? z_coroutine : ZEND_ASYNC_CURRENT_COROUTINE); - return; + if (UNEXPECTED(coroutine == NULL || coroutine->coroutine.context == NULL)) { + return false; } - // An attempt to cancel a coroutine that is currently running. - // In this case, nothing actually happens immediately; - // however, the coroutine is marked as having been cancelled, - // and the cancellation exception is stored as its result. - if (UNEXPECTED(zend_coroutine == ZEND_ASYNC_CURRENT_COROUTINE)) { + return coroutine->coroutine.context->unset(coroutine->coroutine.context, key); +} - ZEND_COROUTINE_SET_CANCELLED(zend_coroutine); +/////////////////////////////////////////////////////////// +/// 6. Finally Handler System +/////////////////////////////////////////////////////////// - if (zend_coroutine->exception == NULL) { - zend_coroutine->exception = error; +static zend_result finally_handlers_iterator_handler(async_iterator_t *iterator, zval *current, zval *key) +{ + finally_handlers_context_t *context = (finally_handlers_context_t *) iterator->extended_data; + zval rv; + ZVAL_UNDEF(&rv); + call_user_function(NULL, NULL, current, &rv, context->params_count, context->params); + zval_ptr_dtor(&rv); + ZVAL_UNDEF(&rv); - if (false == transfer_error) { - GC_ADDREF(error); - } - } + // Check for exceptions after handler execution + if (EG(exception)) { + zend_exception_save(); + zend_exception_restore(); + zend_object *current_exception = EG(exception); + GC_ADDREF(current_exception); + zend_clear_exception(); - if (zend_coroutine->exception == NULL) { - zend_coroutine->exception = async_new_exception(async_ce_cancellation_exception, "Coroutine cancelled"); + // Check for graceful/unwind exit exceptions + if (zend_is_graceful_exit(current_exception) || zend_is_unwind_exit(current_exception)) { + // Release CompositeException if exists + if (context->composite_exception) { + OBJ_RELEASE(context->composite_exception); + context->composite_exception = NULL; + } + // Throw graceful/unwind exit and stop iteration + async_rethrow_exception(current_exception); + return SUCCESS; } - return; - } + // Handle regular exceptions + if (context->composite_exception == NULL) { + context->composite_exception = current_exception; + } else if (!instanceof_function(context->composite_exception->ce, async_ce_composite_exception)) { + // Create CompositeException and add first exception + zend_object *composite_exception = async_new_composite_exception(); + if (UNEXPECTED(composite_exception == NULL)) { + // If we can't create CompositeException, throw the current one + async_rethrow_exception(current_exception); + return SUCCESS; + } - if (zend_coroutine->waker == NULL) { - zend_async_waker_new(zend_coroutine); + async_composite_exception_add_exception(composite_exception, context->composite_exception, true); + async_composite_exception_add_exception(composite_exception, current_exception, true); + context->composite_exception = composite_exception; + } else { + // Add exception to existing CompositeException + async_composite_exception_add_exception(context->composite_exception, current_exception, true); + } } - zend_async_waker_t *waker = zend_coroutine->waker; - - if (UNEXPECTED(waker == NULL)) { - async_throw_error("Waker is not initialized"); + return SUCCESS; +} - if (transfer_error) { - OBJ_RELEASE(error); - } +static void finally_handlers_iterator_dtor(zend_async_iterator_t *zend_iterator) +{ + async_iterator_t *iterator = (async_iterator_t *) zend_iterator; + if (UNEXPECTED(iterator->extended_data == NULL)) { return; } - const bool is_error_null = (error == NULL); + finally_handlers_context_t *context = iterator->extended_data; + async_scope_t *scope = (async_scope_t *) context->scope; + context->scope = NULL; - if (is_error_null) { - error = async_new_exception(async_ce_cancellation_exception, "Coroutine cancelled"); - transfer_error = true; - if (UNEXPECTED(EG(exception))) { - return; + // Throw CompositeException if any exceptions were collected + if (context->composite_exception != NULL) { + if (ZEND_ASYNC_SCOPE_CATCH(&scope->scope, + &context->coroutine->coroutine, + NULL, + context->composite_exception, + false, + ZEND_ASYNC_SCOPE_IS_DISPOSE_SAFELY(&scope->scope))) { + OBJ_RELEASE(context->composite_exception); + context->composite_exception = NULL; } } - // If the coroutine is currently protected from cancellation, defer the cancellation. - if (ZEND_COROUTINE_IS_PROTECTED(zend_coroutine)) { - async_coroutine_t *coroutine = (async_coroutine_t *) zend_coroutine; + zend_object *composite_exception = context->composite_exception; + context->composite_exception = NULL; - if (coroutine->deferred_cancellation == NULL) { - coroutine->deferred_cancellation = error; + if (context->dtor != NULL) { + context->dtor(context); + context->dtor = NULL; + } - if (false == transfer_error) { - GC_ADDREF(error); - } - } else if (transfer_error) { - OBJ_RELEASE(error); + // Free the context + efree(context); + iterator->extended_data = NULL; + + if (ZEND_ASYNC_EVENT_REF(&scope->scope.event) > 0) { + ZEND_ASYNC_EVENT_DEL_REF(&scope->scope.event); + + if (ZEND_ASYNC_EVENT_REF(&scope->scope.event) <= 1) { + scope->scope.try_to_dispose(&scope->scope); } + } - return; + if (composite_exception != NULL) { + async_rethrow_exception(composite_exception); + } + + // + // If everything is correct, + // the Scope will destroy itself as soon as the coroutine created within it completes execution. + // Therefore, there's no point in taking additional actions to clean up resources. + // +} + +bool async_call_finally_handlers(HashTable *finally_handlers, finally_handlers_context_t *context, int32_t priority) +{ + if (finally_handlers == NULL || zend_hash_num_elements(finally_handlers) == 0) { + return false; } - bool was_cancelled = ZEND_COROUTINE_IS_CANCELLED(zend_coroutine); - ZEND_COROUTINE_SET_CANCELLED(zend_coroutine); + // Create a special child scope for finally handlers + zend_async_scope_t *child_scope = ZEND_ASYNC_NEW_SCOPE(context->scope); + if (UNEXPECTED(child_scope == NULL)) { + return false; + } - if (false == ZEND_COROUTINE_IS_STARTED(zend_coroutine)) { + zval handlers; + ZVAL_ARR(&handlers, finally_handlers); - if (false == ZEND_ASYNC_WAKER_IN_QUEUE(waker)) { - // - // Situation: the coroutine is not in the queue, but a cancellation is requested. - // It might seem like we can simply remove the coroutine, - // but doing so would break the flow of the coroutine's handlers. - // Therefore, to normalize the flow, - // we place the coroutine in the queue with a status of ignored, - // so that the flow is executed correctly. - // - async_scheduler_coroutine_enqueue(zend_coroutine); - } + async_iterator_t *iterator = + async_iterator_new(&handlers, NULL, NULL, finally_handlers_iterator_handler, child_scope, 0, priority, 0); - waker->status = ZEND_ASYNC_WAKER_IGNORED; + zval_ptr_dtor(&handlers); + ZVAL_UNDEF(&handlers); - // - // Exception override: - // If the coroutine already has an exception - // and it's a cancellation exception, then nothing needs to be done. - // In any other case, the cancellation exception overrides the existing exception. - // - ZEND_ASYNC_WAKER_APPLY_CANCELLATION(waker, error, transfer_error); - async_scheduler_coroutine_enqueue(zend_coroutine); - return; + if (UNEXPECTED(EG(exception))) { + return false; } - // In safely mode, we don't forcibly terminate the coroutine, - // but we do mark it as a Zombie. - if (is_safely) { - async_scope_mark_coroutine_zombie((async_coroutine_t *) zend_coroutine); - ZEND_ASYNC_DECREASE_COROUTINE_COUNT - if (transfer_error && error != NULL) { - OBJ_RELEASE(error); - } - return; - } + context->composite_exception = NULL; + iterator->extended_data = context; + iterator->extended_dtor = finally_handlers_iterator_dtor; + async_iterator_run_in_coroutine(iterator, priority); - if (was_cancelled && waker->error != NULL && - instanceof_function(waker->error->ce, ZEND_ASYNC_GET_CE(ZEND_ASYNC_EXCEPTION_CANCELLATION))) { - if (transfer_error) { - OBJ_RELEASE(error); - } - } else { - ZEND_ASYNC_WAKER_APPLY_CANCELLATION(waker, error, transfer_error); + // + // We retain ownership of the Scope in order to be able to handle exceptions from the Finally handlers. + // example: finally_handlers_iterator_dtor + // If the onFinally handlers throw an exception, it will end up in the Scope, + // so it's important that the Scope is not destroyed before that moment. + // + ZEND_ASYNC_EVENT_ADD_REF(&context->scope->event); + + if (UNEXPECTED(EG(exception))) { + return false; } - async_scheduler_coroutine_enqueue(zend_coroutine); + return true; } -static void coroutine_dispose(zend_async_event_t *event) +static void finally_context_dtor(finally_handlers_context_t *context) { - async_coroutine_t *coroutine = (async_coroutine_t *) event; - OBJ_RELEASE(&coroutine->std); + if (context->coroutine != NULL) { + // Release the coroutine reference + OBJ_RELEASE(&context->coroutine->std); + context->coroutine = NULL; + } } -static void coroutine_object_destroy(zend_object *object) +static zend_always_inline void coroutine_call_finally_handlers(async_coroutine_t *coroutine) { - async_coroutine_t *coroutine = (async_coroutine_t *) ZEND_ASYNC_OBJECT_TO_EVENT(object); - - ZEND_ASSERT((coroutine->coroutine.waker == NULL || - (coroutine->coroutine.waker->status == ZEND_ASYNC_WAKER_QUEUED || - coroutine->coroutine.waker->status == ZEND_ASYNC_WAKER_IGNORED)) && - "Coroutine waker must be dequeued before destruction"); + HashTable *finally_handlers = coroutine->finally_handlers; + coroutine->finally_handlers = NULL; + finally_handlers_context_t *finally_context = ecalloc(1, sizeof(finally_handlers_context_t)); + finally_context->coroutine = coroutine; + finally_context->scope = coroutine->coroutine.scope; + finally_context->dtor = finally_context_dtor; + finally_context->params_count = 1; + ZVAL_OBJ(&finally_context->params[0], &coroutine->std); - if (coroutine->coroutine.scope != NULL) { - async_scope_notify_coroutine_finished(coroutine); - coroutine->coroutine.scope = NULL; + if (async_call_finally_handlers(finally_handlers, finally_context, 1)) { + GC_ADDREF(&coroutine->std); // Keep reference to coroutine while handlers are running + } else { + efree(finally_context); + zend_array_destroy(finally_handlers); } +} - if (coroutine->coroutine.fcall) { - - zend_fcall_t *fcall = coroutine->coroutine.fcall; - coroutine->coroutine.fcall = NULL; +/////////////////////////////////////////////////////////// +/// 7. PHP Methods +/////////////////////////////////////////////////////////// - if (fcall->fci.param_count) { - for (uint32_t i = 0; i < fcall->fci.param_count; i++) { - zval_ptr_dtor(&fcall->fci.params[i]); - } +// Context Management Method +METHOD(getContext) +{ + ZEND_PARSE_PARAMETERS_NONE(); - efree(fcall->fci.params); - } + async_coroutine_t *coroutine = THIS_COROUTINE; - if (fcall->fci.named_params) { - GC_DELREF(fcall->fci.named_params); - fcall->fci.named_params = NULL; + if (coroutine->coroutine.context == NULL) { + async_context_t *context = async_context_new(); + if (UNEXPECTED(context == NULL)) { + RETURN_THROWS(); } - zval_ptr_dtor(&fcall->fci.function_name); - efree(fcall); + coroutine->coroutine.context = &context->base; } - if (coroutine->coroutine.context != NULL) { - // If the coroutine has a context, we need to release it. - async_context_t *context = (async_context_t *) coroutine->coroutine.context; - coroutine->coroutine.context = NULL; - async_context_dispose(context); - } + // Return the context object + RETURN_OBJ_COPY(&((async_context_t *) coroutine->coroutine.context)->std); +} - if (coroutine->coroutine.filename) { - zend_string_release_ex(coroutine->coroutine.filename, 0); - coroutine->coroutine.filename = NULL; - } +// Finally Handler Method +METHOD(onFinally) +{ + zval *callable; - if (coroutine->coroutine.waker) { - zend_async_waker_destroy(&coroutine->coroutine); - coroutine->coroutine.waker = NULL; - } + ZEND_PARSE_PARAMETERS_START(1, 1) + Z_PARAM_ZVAL(callable) + ZEND_PARSE_PARAMETERS_END(); - if (coroutine->coroutine.internal_context != NULL) { - zend_async_coroutine_internal_context_dispose(&coroutine->coroutine); + if (UNEXPECTED(false == zend_is_callable(callable, 0, NULL))) { + zend_argument_type_error(1, "argument must be callable"); + RETURN_THROWS(); } - zval_ptr_dtor(&coroutine->coroutine.result); + async_coroutine_t *coroutine = THIS_COROUTINE; - if (coroutine->coroutine.exception != NULL) { - // If the coroutine has an exception, we need to release it. + // Check if coroutine is already finished + if (ZEND_COROUTINE_IS_FINISHED(&coroutine->coroutine)) { - zend_object *exception = coroutine->coroutine.exception; - coroutine->coroutine.exception = NULL; - OBJ_RELEASE(exception); + // Call the callable immediately + zval result, param; + ZVAL_UNDEF(&result); + ZVAL_OBJ(¶m, &coroutine->std); + + if (UNEXPECTED(call_user_function(NULL, NULL, callable, &result, 1, ¶m) == FAILURE)) { + zend_throw_error(NULL, "Failed to call finally handler in finished coroutine"); + zval_ptr_dtor(&result); + RETURN_THROWS(); + } + + return; } - if (coroutine->deferred_cancellation != NULL) { - zend_object *deferred_cancellation = coroutine->deferred_cancellation; - coroutine->deferred_cancellation = NULL; - OBJ_RELEASE(deferred_cancellation); + // Lazy initialization of finally_handlers array + if (coroutine->finally_handlers == NULL) { + coroutine->finally_handlers = zend_new_array(0); } - if (coroutine->finally_handlers) { - zend_array_destroy(coroutine->finally_handlers); - coroutine->finally_handlers = NULL; + if (UNEXPECTED(zend_hash_next_index_insert(coroutine->finally_handlers, callable) == NULL)) { + async_throw_error("Failed to add finally handler to coroutine"); + RETURN_THROWS(); } + + Z_TRY_ADDREF_P(callable); } -static void coroutine_free(zend_object *object) +// Identity Methods +METHOD(getId) { - async_coroutine_t *coroutine = (async_coroutine_t *) ZEND_ASYNC_OBJECT_TO_EVENT(object); + ZEND_PARSE_PARAMETERS_NONE(); - zend_async_callbacks_free(&coroutine->coroutine.event); - zend_object_std_dtor(object); + RETURN_LONG(Z_OBJ_P(ZEND_THIS)->handle); } -static zend_object *coroutine_object_create(zend_class_entry *class_entry) +METHOD(asHiPriority) { - async_coroutine_t *coroutine = zend_object_alloc(sizeof(async_coroutine_t), class_entry); - - ZVAL_UNDEF(&coroutine->coroutine.result); - - ZEND_ASYNC_EVENT_SET_ZEND_OBJ(&coroutine->coroutine.event); - ZEND_ASYNC_EVENT_SET_NO_FREE_MEMORY(&coroutine->coroutine.event); - ZEND_ASYNC_EVENT_SET_ZEND_OBJ_OFFSET(&coroutine->coroutine.event, XtOffsetOf(async_coroutine_t, std)); - - /* Initialize switch handlers */ - coroutine->coroutine.switch_handlers = NULL; + // TODO: Implement priority handling in scheduler + // For now, just return the same coroutine + RETURN_ZVAL(ZEND_THIS, 1, 0); +} - zend_async_event_t *event = &coroutine->coroutine.event; +// Result Methods +METHOD(getResult) +{ + ZEND_PARSE_PARAMETERS_NONE(); - event->start = coroutine_event_start; - event->stop = coroutine_event_stop; - event->add_callback = coroutine_add_callback; - event->del_callback = coroutine_del_callback; - event->replay = coroutine_replay; - event->info = coroutine_info; - event->dispose = coroutine_dispose; + async_coroutine_t *coroutine = THIS_COROUTINE; - coroutine->flags = ZEND_FIBER_STATUS_INIT; - coroutine->coroutine.extended_data = NULL; - coroutine->finally_handlers = NULL; + if (!ZEND_COROUTINE_IS_FINISHED(&coroutine->coroutine)) { + RETURN_NULL(); + } - zend_object_std_init(&coroutine->std, class_entry); - object_properties_init(&coroutine->std, class_entry); + if (Z_TYPE(coroutine->coroutine.result) == IS_UNDEF) { + RETURN_NULL(); + } - return &coroutine->std; + RETURN_ZVAL(&coroutine->coroutine.result, 1, 0); } -zend_coroutine_t *async_new_coroutine(zend_async_scope_t *scope) +METHOD(getException) { - zend_object *object = coroutine_object_create(async_ce_coroutine); + ZEND_PARSE_PARAMETERS_NONE(); - if (UNEXPECTED(EG(exception))) { - return NULL; + async_coroutine_t *coroutine = THIS_COROUTINE; + + if (false == ZEND_COROUTINE_IS_FINISHED(&coroutine->coroutine)) { + RETURN_NULL(); } - async_coroutine_t *coroutine = (async_coroutine_t *) ZEND_ASYNC_OBJECT_TO_EVENT(object); - coroutine->coroutine.scope = scope; + if (coroutine->coroutine.exception == NULL) { + RETURN_NULL(); + } + + RETURN_OBJ_COPY(coroutine->coroutine.exception); +} - return &coroutine->coroutine; +METHOD(getTrace) +{ + // TODO: Implement debug trace collection + // This would require fiber stack trace functionality + array_init(return_value); } -static HashTable *async_coroutine_object_gc(zend_object *object, zval **table, int *num) +// Location Methods +METHOD(getSpawnFileAndLine) { - async_coroutine_t *coroutine = (async_coroutine_t *) ZEND_ASYNC_OBJECT_TO_EVENT(object); - zend_get_gc_buffer *buf = zend_get_gc_buffer_create(); + ZEND_PARSE_PARAMETERS_NONE(); - /* Always add basic ZVALs from coroutine structure */ - zend_get_gc_buffer_add_zval(buf, &coroutine->coroutine.result); + async_coroutine_t *coroutine = THIS_COROUTINE; - /* Add objects that may be present */ - if (coroutine->coroutine.exception) { - zend_get_gc_buffer_add_obj(buf, coroutine->coroutine.exception); - } + array_init(return_value); - if (coroutine->deferred_cancellation) { - zend_get_gc_buffer_add_obj(buf, coroutine->deferred_cancellation); + if (coroutine->coroutine.filename) { + add_next_index_str(return_value, zend_string_copy(coroutine->coroutine.filename)); + } else { + add_next_index_null(return_value); } - /* Add finally handlers if present */ - if (coroutine->finally_handlers) { - zval *val; - ZEND_HASH_FOREACH_VAL(coroutine->finally_handlers, val) - { - zend_get_gc_buffer_add_zval(buf, val); - } - ZEND_HASH_FOREACH_END(); - } + add_next_index_long(return_value, coroutine->coroutine.lineno); +} - /* Add internal context HashTable if present */ - if (coroutine->coroutine.internal_context) { - zval *val; - ZEND_HASH_FOREACH_VAL(coroutine->coroutine.internal_context, val) - { - zend_get_gc_buffer_add_zval(buf, val); - } - ZEND_HASH_FOREACH_END(); - } +METHOD(getSpawnLocation) +{ + ZEND_PARSE_PARAMETERS_NONE(); - /* Add fcall function name and parameters if present */ - if (coroutine->coroutine.fcall) { - zend_get_gc_buffer_add_zval(buf, &coroutine->coroutine.fcall->fci.function_name); + async_coroutine_t *coroutine = THIS_COROUTINE; - /* Add function parameters */ - if (coroutine->coroutine.fcall->fci.param_count > 0 && coroutine->coroutine.fcall->fci.params) { - for (uint32_t i = 0; i < coroutine->coroutine.fcall->fci.param_count; i++) { - zend_get_gc_buffer_add_zval(buf, &coroutine->coroutine.fcall->fci.params[i]); - } - } + if (coroutine->coroutine.filename) { + RETURN_STR(zend_strpprintf(0, "%s:%d", ZSTR_VAL(coroutine->coroutine.filename), coroutine->coroutine.lineno)); + } else { + RETURN_STRING("unknown"); } +} - /* Add waker-related ZVALs if present */ - if (coroutine->coroutine.waker) { - zend_get_gc_buffer_add_zval(buf, &coroutine->coroutine.waker->result); - - if (coroutine->coroutine.waker->error) { - zend_get_gc_buffer_add_obj(buf, coroutine->coroutine.waker->error); - } - - /* Add events HashTable contents */ - zval *event_val; - zval zval_object; - ZEND_HASH_FOREACH_VAL(&coroutine->coroutine.waker->events, event_val) - { +METHOD(getSuspendFileAndLine) +{ + ZEND_PARSE_PARAMETERS_NONE(); - zend_async_event_t *event = (zend_async_event_t *) Z_PTR_P(event_val); + async_coroutine_t *coroutine = THIS_COROUTINE; - if (ZEND_ASYNC_EVENT_IS_REFERENCE(event) || ZEND_ASYNC_EVENT_IS_ZEND_OBJ(event)) { - ZVAL_OBJ(&zval_object, ZEND_ASYNC_EVENT_TO_OBJECT(event)); - zend_get_gc_buffer_add_zval(buf, &zval_object); - } - } - ZEND_HASH_FOREACH_END(); + array_init(return_value); - /* Add triggered events if present */ - if (coroutine->coroutine.waker->triggered_events) { - ZEND_HASH_FOREACH_VAL(coroutine->coroutine.waker->triggered_events, event_val) - { - zend_get_gc_buffer_add_zval(buf, event_val); - } - ZEND_HASH_FOREACH_END(); - } + if (coroutine->waker.filename) { + add_next_index_str(return_value, zend_string_copy(coroutine->waker.filename)); + add_next_index_long(return_value, coroutine->waker.lineno); + } else { + add_next_index_null(return_value); + add_next_index_long(return_value, 0); } +} - /* Add context ZVALs if present */ - if (coroutine->coroutine.context) { - /* Cast to actual context implementation to access HashTables */ - async_context_t *context = (async_context_t *) coroutine->coroutine.context; +METHOD(getSuspendLocation) +{ + ZEND_PARSE_PARAMETERS_NONE(); - /* Add all values from context->values HashTable */ - zval *val; - ZEND_HASH_FOREACH_VAL(&context->values, val) - { - zend_get_gc_buffer_add_zval(buf, val); - } - ZEND_HASH_FOREACH_END(); + async_coroutine_t *coroutine = THIS_COROUTINE; - /* Add all object keys from context->keys HashTable */ - ZEND_HASH_FOREACH_VAL(&context->keys, val) - { - zend_get_gc_buffer_add_zval(buf, val); - } - ZEND_HASH_FOREACH_END(); + if (coroutine->waker.filename) { + RETURN_STR(zend_strpprintf( + 0, "%s:%d", ZSTR_VAL(coroutine->waker.filename), coroutine->waker.lineno)); + } else { + RETURN_STRING("unknown"); } +} - /* Check if we should traverse execution stack (similar to fibers) */ - if (coroutine->context.status != ZEND_FIBER_STATUS_SUSPENDED || !coroutine->execute_data) { - zend_get_gc_buffer_use(buf, table, num); - return NULL; - } +// Status Methods +METHOD(isStarted) +{ + ZEND_PARSE_PARAMETERS_NONE(); + RETURN_BOOL(ZEND_COROUTINE_IS_STARTED(&THIS_COROUTINE->coroutine)); +} - /* Traverse execution stack for suspended coroutines */ - HashTable *lastSymTable = NULL; - zend_execute_data *ex = coroutine->execute_data; - for (; ex; ex = ex->prev_execute_data) { - HashTable *symTable; - if (ZEND_CALL_INFO(ex) & ZEND_CALL_GENERATOR) { - zend_generator *generator = (zend_generator *) ex->return_value; - if (!(generator->flags & ZEND_GENERATOR_CURRENTLY_RUNNING)) { - continue; - } - symTable = zend_generator_frame_gc(buf, generator); - } else { - symTable = zend_unfinished_execution_gc_ex( - ex, ex->func && ZEND_USER_CODE(ex->func->type) ? ex->call : NULL, buf, false); - } - if (symTable) { - if (lastSymTable) { - zval *val; - ZEND_HASH_FOREACH_VAL(lastSymTable, val) - { - if (EXPECTED(Z_TYPE_P(val) == IS_INDIRECT)) { - val = Z_INDIRECT_P(val); - } - zend_get_gc_buffer_add_zval(buf, val); - } - ZEND_HASH_FOREACH_END(); - } - lastSymTable = symTable; - } +METHOD(isQueued) +{ + ZEND_PARSE_PARAMETERS_NONE(); + + async_coroutine_t *coroutine = THIS_COROUTINE; + + if (coroutine->coroutine.waker == NULL) { + RETURN_FALSE; } - zend_get_gc_buffer_use(buf, table, num); - return lastSymTable; + RETURN_BOOL(coroutine->waker.status == ZEND_ASYNC_WAKER_QUEUED); } -static zend_object_handlers coroutine_handlers; - -void async_register_coroutine_ce(void) +METHOD(isRunning) { - async_ce_coroutine = register_class_Async_Coroutine(async_ce_awaitable); + ZEND_PARSE_PARAMETERS_NONE(); - async_ce_coroutine->create_object = coroutine_object_create; + async_coroutine_t *coroutine = THIS_COROUTINE; - async_ce_coroutine->default_object_handlers = &coroutine_handlers; + // Coroutine is running if it's the current one and is started but not finished + RETURN_BOOL(ZEND_COROUTINE_IS_STARTED(&coroutine->coroutine) && + false == ZEND_COROUTINE_IS_FINISHED(&coroutine->coroutine)); +} - coroutine_handlers = std_object_handlers; - coroutine_handlers.offset = XtOffsetOf(async_coroutine_t, std); - coroutine_handlers.clone_obj = NULL; - coroutine_handlers.dtor_obj = coroutine_object_destroy; - coroutine_handlers.free_obj = coroutine_free; - coroutine_handlers.get_gc = async_coroutine_object_gc; +METHOD(isSuspended) +{ + ZEND_PARSE_PARAMETERS_NONE(); + + RETURN_BOOL(ZEND_COROUTINE_SUSPENDED(&THIS_COROUTINE->coroutine)); } -////////////////////////////////////////////////////////////////////// -/// Coroutine Context API -////////////////////////////////////////////////////////////////////// +METHOD(isCancelled) +{ + ZEND_PARSE_PARAMETERS_NONE(); + RETURN_BOOL(ZEND_COROUTINE_IS_CANCELLED(&THIS_COROUTINE->coroutine) && + ZEND_COROUTINE_IS_FINISHED(&THIS_COROUTINE->coroutine)); +} -bool async_coroutine_context_set(zend_coroutine_t *z_coroutine, zval *key, zval *value) +METHOD(isCancellationRequested) { - async_coroutine_t *coroutine = - (async_coroutine_t *) (z_coroutine != NULL ? z_coroutine : ZEND_ASYNC_CURRENT_COROUTINE); + ZEND_PARSE_PARAMETERS_NONE(); - if (UNEXPECTED(coroutine == NULL || coroutine->coroutine.context == NULL)) { - return false; - } + async_coroutine_t *coroutine = THIS_COROUTINE; - coroutine->coroutine.context->set(coroutine->coroutine.context, key, value); - return true; + RETURN_BOOL((ZEND_COROUTINE_IS_CANCELLED(&coroutine->coroutine) && + !ZEND_COROUTINE_IS_FINISHED(&coroutine->coroutine)) || + coroutine->deferred_cancellation != NULL); } -bool async_coroutine_context_get(zend_coroutine_t *z_coroutine, zval *key, zval *result) +METHOD(isFinished) { - async_coroutine_t *coroutine = - (async_coroutine_t *) (z_coroutine != NULL ? z_coroutine : ZEND_ASYNC_CURRENT_COROUTINE); - - if (UNEXPECTED(coroutine == NULL || coroutine->coroutine.context == NULL)) { - if (result != NULL) { - ZVAL_NULL(result); - } - return false; - } + ZEND_PARSE_PARAMETERS_NONE(); - return coroutine->coroutine.context->find(coroutine->coroutine.context, key, result, false); + RETURN_BOOL(ZEND_COROUTINE_IS_FINISHED(&THIS_COROUTINE->coroutine)); } -bool async_coroutine_context_has(zend_coroutine_t *z_coroutine, zval *key) +// Advanced Methods +METHOD(getAwaitingInfo) { - async_coroutine_t *coroutine = - (async_coroutine_t *) (z_coroutine != NULL ? z_coroutine : ZEND_ASYNC_CURRENT_COROUTINE); + ZEND_PARSE_PARAMETERS_NONE(); - if (UNEXPECTED(coroutine == NULL || coroutine->coroutine.context == NULL)) { - return false; - } + zend_array *info = ZEND_ASYNC_GET_AWAITING_INFO(&THIS_COROUTINE->coroutine); - return coroutine->coroutine.context->find(coroutine->coroutine.context, key, NULL, false); + if (info == NULL) { + array_init(return_value); + } else { + RETURN_ARR(info); + } } -bool async_coroutine_context_delete(zend_coroutine_t *z_coroutine, zval *key) +METHOD(cancel) { - async_coroutine_t *coroutine = - (async_coroutine_t *) (z_coroutine != NULL ? z_coroutine : ZEND_ASYNC_CURRENT_COROUTINE); + zend_object *exception = NULL; - if (UNEXPECTED(coroutine == NULL || coroutine->coroutine.context == NULL)) { - return false; - } + zend_class_entry *ce_cancellation_exception = ZEND_ASYNC_GET_CE(ZEND_ASYNC_EXCEPTION_CANCELLATION); - return coroutine->coroutine.context->unset(coroutine->coroutine.context, key); -} \ No newline at end of file + ZEND_PARSE_PARAMETERS_START(0, 1) + Z_PARAM_OPTIONAL; + Z_PARAM_OBJ_OF_CLASS_OR_NULL(exception, ce_cancellation_exception) + ZEND_PARSE_PARAMETERS_END(); + + ZEND_ASYNC_CANCEL(&THIS_COROUTINE->coroutine, exception, false); +} diff --git a/coroutine.h b/coroutine.h index ca2ca2b..9f2f659 100644 --- a/coroutine.h +++ b/coroutine.h @@ -19,32 +19,44 @@ #include "php_async_api.h" #include -ZEND_STACK_ALIGNED void async_coroutine_execute(zend_fiber_transfer *transfer); -PHP_ASYNC_API extern zend_class_entry *async_ce_coroutine; +/* Fiber context structure for pooling */ +typedef struct _async_fiber_context_s async_fiber_context_t; + +struct _async_fiber_context_s +{ + /* Native C fiber context (stack + registers) */ + zend_fiber_context context; + + /* Active fiber VM stack */ + zend_vm_stack vm_stack; + + /* Current Zend VM execute data */ + zend_execute_data *execute_data; + + /* Flags from enum zend_fiber_flag */ + uint8_t flags; +}; typedef struct _async_coroutine_s async_coroutine_t; +ZEND_STACK_ALIGNED void async_coroutine_execute(async_coroutine_t *coroutine); +PHP_ASYNC_API extern zend_class_entry *async_ce_coroutine; + struct _async_coroutine_s { /* Basic structure for coroutine. */ zend_coroutine_t coroutine; + + /* Embedded waker (always allocated, no malloc needed) */ + zend_async_waker_t waker; - /* Flags are defined in enum zend_fiber_flag. */ - uint8_t flags; - - /* Native C fiber context. */ - zend_fiber_context context; - - /* Current Zend VM execute data being run by the coroutine. */ - zend_execute_data *execute_data; + /* Reference to fiber context */ + async_fiber_context_t *fiber_context; /* deferred cancellation object. */ zend_object *deferred_cancellation; - /* Active fiber vm stack. */ - zend_vm_stack vm_stack; - /* Finally handlers array (zval callables) - lazy initialization */ HashTable *finally_handlers; @@ -74,7 +86,7 @@ struct _finally_handlers_context_s void async_register_coroutine_ce(void); zend_coroutine_t *async_new_coroutine(zend_async_scope_t *scope); void async_coroutine_cleanup(zend_fiber_context *context); -void async_coroutine_finalize(zend_fiber_transfer *transfer, async_coroutine_t *coroutine); +void async_coroutine_finalize(async_coroutine_t *coroutine); void async_coroutine_finalize_from_scheduler(async_coroutine_t *coroutine); void async_coroutine_suspend(const bool from_main); void async_coroutine_resume(zend_coroutine_t *coroutine, zend_object *error, const bool transfer_error); diff --git a/docs/context-switching-flow.puml b/docs/context-switching-flow.puml new file mode 100644 index 0000000..1fe007f --- /dev/null +++ b/docs/context-switching-flow.puml @@ -0,0 +1,45 @@ +@startuml Fiber Workflow +!theme materia +title Fiber Workflow + +participant "This Fiber" as ThisFiber +participant "Other Fiber" as WorkerF1 +participant "Fiber Pool" as Pool + +loop Scheduler Main Loop + alt Has coroutines in queue + note over ThisFiber : Get next coroutine from queue + + alt Coroutine already has fiber + ThisFiber -> WorkerF1 : fiber_switch_context() + activate WorkerF1 + note right : Resume existing fiber + + WorkerF1 -> ThisFiber : return + deactivate WorkerF1 + note right : Continue fiber + + else Coroutine has no fiber + alt Aren’t we the Scheduler? + note over ThisFiber : Execute coroutine directly + + else We are the Scheduler, and we need a new Fiber for the coroutine. + ThisFiber -> Pool : Get fiber from pool + Pool -> ThisFiber : fiber_context (or create new) + + ThisFiber -> WorkerF1 : fiber_switch_context() + activate WorkerF1 + note right : Start new coroutine in new fiber + + WorkerF1 -> ThisFiber : return when done + deactivate WorkerF1 + end + end + + else No coroutines + note over ThisFiber : Wait for events/timers + end +end + + +@enduml \ No newline at end of file diff --git a/internal/circular_buffer.c b/internal/circular_buffer.c index 93d29de..cdbdb98 100644 --- a/internal/circular_buffer.c +++ b/internal/circular_buffer.c @@ -502,10 +502,7 @@ bool circular_buffer_is_empty(const circular_buffer_t *buffer) return buffer->head == buffer->tail; } -bool circular_buffer_is_not_empty(const circular_buffer_t *buffer) -{ - return buffer->head != buffer->tail; -} +/* circular_buffer_is_not_empty is now inline in header */ /** * Check if the circular buffer is full. diff --git a/internal/circular_buffer.h b/internal/circular_buffer.h index 23d3ede..9526af4 100644 --- a/internal/circular_buffer.h +++ b/internal/circular_buffer.h @@ -69,7 +69,6 @@ void circular_buffer_destroy(circular_buffer_t *buffer); bool circular_buffer_is_full(const circular_buffer_t *buffer); bool circular_buffer_is_empty(const circular_buffer_t *buffer); -bool circular_buffer_is_not_empty(const circular_buffer_t *buffer); zend_result circular_buffer_push(circular_buffer_t *buffer, const void *value, bool should_resize); zend_result circular_buffer_push_front(circular_buffer_t *buffer, const void *value, bool should_resize); zend_result circular_buffer_pop(circular_buffer_t *buffer, void *value); @@ -81,4 +80,45 @@ circular_buffer_t *zval_circular_buffer_new(const size_t count, const allocator_ zend_result zval_circular_buffer_push(circular_buffer_t *buffer, zval *value, bool should_resize); zend_result zval_circular_buffer_pop(circular_buffer_t *buffer, zval *value); +/* Inline optimized functions - placed after all declarations */ + +/* Inline version for hot path performance */ +static zend_always_inline bool circular_buffer_is_not_empty(const circular_buffer_t *buffer) { + return buffer->head != buffer->tail; +} + +/* Fast specialized version for pointer push (8 bytes) */ +static zend_always_inline zend_result circular_buffer_push_ptr(circular_buffer_t *buffer, void *ptr) { + // Check if buffer is full using bitwise AND (capacity is power of 2) + if (EXPECTED(((buffer->head + 1) & (buffer->capacity - 1)) != buffer->tail)) { + // Direct pointer assignment - no memcpy overhead + *(void**)((char*)buffer->data + buffer->head * sizeof(void*)) = ptr; + buffer->head = (buffer->head + 1) & (buffer->capacity - 1); + return SUCCESS; + } + return FAILURE; +} + +/* Fast specialized version for pointer pop (8 bytes) */ +static zend_always_inline zend_result circular_buffer_pop_ptr(circular_buffer_t *buffer, void **ptr) { + // Check if buffer is empty + if (EXPECTED(buffer->head != buffer->tail)) { + // Direct pointer read - no memcpy overhead + *ptr = *(void**)((char*)buffer->data + buffer->tail * sizeof(void*)); + buffer->tail = (buffer->tail + 1) & (buffer->capacity - 1); + return SUCCESS; + } + return FAILURE; +} + +/* Smart wrapper for pointer push with resize fallback */ +static zend_always_inline zend_result circular_buffer_push_ptr_with_resize(circular_buffer_t *buffer, void *ptr) { + // Try fast path first (no resize) + if (EXPECTED(circular_buffer_push_ptr(buffer, ptr) == SUCCESS)) { + return SUCCESS; + } + // Fallback to slow path with resize - need address of ptr for memcpy + return circular_buffer_push(buffer, &ptr, true); +} + #endif // ASYNC_CIRCULAR_BUFFER_V2_H \ No newline at end of file diff --git a/libuv_reactor.c b/libuv_reactor.c index 642356a..e1643ec 100644 --- a/libuv_reactor.c +++ b/libuv_reactor.c @@ -163,6 +163,11 @@ void libuv_reactor_shutdown(void) /* {{{ libuv_reactor_execute */ bool libuv_reactor_execute(bool no_wait) { + // OPTIMIZATION: Skip uv_run() if no libuv handles to avoid unnecessary clock_gettime() calls + if (!uv_loop_alive(UVLOOP)) { + return ZEND_ASYNC_ACTIVE_EVENT_COUNT > 0; + } + const bool has_handles = uv_run(UVLOOP, no_wait ? UV_RUN_NOWAIT : UV_RUN_ONCE); return ZEND_ASYNC_ACTIVE_EVENT_COUNT > 0 || has_handles; @@ -2080,7 +2085,7 @@ static int libuv_exec(zend_async_exec_mode exec_mode, } ZEND_ASYNC_SUSPEND(); - zend_async_waker_destroy(coroutine); + zend_async_waker_clean(coroutine); if (UNEXPECTED(EG(exception))) { return -1; diff --git a/php_async.h b/php_async.h index 8f44472..ad4c221 100644 --- a/php_async.h +++ b/php_async.h @@ -82,6 +82,9 @@ zend_async_context_t *root_context; /* The default concurrency */ int default_concurrency; +/* Fiber context pool for performance optimization */ +circular_buffer_t fiber_context_pool; + /* The reactor */ uv_loop_t uvloop; bool reactor_started; diff --git a/scheduler.c b/scheduler.c index 7ddc128..f840d14 100644 --- a/scheduler.c +++ b/scheduler.c @@ -24,6 +24,56 @@ #include "zend_common.h" #include "zend_observer.h" +/////////////////////////////////////////////////////////// +/// STATIC DECLARATIONS AND CONSTANTS +/////////////////////////////////////////////////////////// + +#define FIBER_DEBUG_LOG_ON false +#define FIBER_DEBUG_SWITCH false +#if FIBER_DEBUG_LOG_ON +# define FIBER_DEBUG(...) fprintf(stdout, __VA_ARGS__) +#else +# define FIBER_DEBUG(...) ((void)0) +#endif + +static zend_function root_function = { ZEND_INTERNAL_FUNCTION }; + +typedef enum +{ + COROUTINE_NOT_EXISTS, + COROUTINE_SWITCHED, + COROUTINE_IGNORED, + COROUTINE_FINISHED, + SHOULD_BE_EXIT +} switch_status; + +static ZEND_STACK_ALIGNED void fiber_entry(zend_fiber_transfer *transfer); +static void fiber_context_cleanup(zend_fiber_context *context); + +#define TRY_HANDLE_EXCEPTION() \ + if (UNEXPECTED(EG(exception) != NULL)) { \ + if (ZEND_ASYNC_GRACEFUL_SHUTDOWN) { \ + finally_shutdown(); \ + break; \ + } \ + start_graceful_shutdown(); \ + } + +#define TRY_HANDLE_SUSPEND_EXCEPTION() \ + if (UNEXPECTED(EG(exception) != NULL)) { \ + if (ZEND_ASYNC_GRACEFUL_SHUTDOWN) { \ + finally_shutdown(); \ + switch_to_scheduler(transfer); \ + zend_exception_restore(); \ + return; \ + } \ + start_graceful_shutdown(); \ + } + +/////////////////////////////////////////////////////////// +/// MODULE INIT/SHUTDOWN +/////////////////////////////////////////////////////////// + void async_scheduler_startup(void) { } @@ -32,6 +82,75 @@ void async_scheduler_shutdown(void) { } +/////////////////////////////////////////////////////////// +/// FIBER CONTEXT MANAGEMENT +/////////////////////////////////////////////////////////// + +static void fiber_context_cleanup(zend_fiber_context *context) +{ + async_fiber_context_t *fiber_context = (async_fiber_context_t *) context; + + // There's no need to destroy execute_data + // because it's also located in the fiber's stack. + efree(fiber_context); +} + +async_fiber_context_t* async_fiber_context_create(void) +{ + async_fiber_context_t *context = ecalloc(1, sizeof(async_fiber_context_t)); + + if (zend_fiber_init_context(&context->context, async_ce_coroutine, fiber_entry, EG(fiber_stack_size)) == FAILURE) { + efree(context); + return NULL; + } + + context->flags = ZEND_FIBER_STATUS_INIT; + context->context.cleanup = fiber_context_cleanup; + + return context; +} + +static zend_always_inline void fiber_pool_init(void) +{ + circular_buffer_ctor(&ASYNC_G(fiber_context_pool), ASYNC_FIBER_POOL_SIZE, sizeof(async_fiber_context_t*), NULL); +} + +static void fiber_pool_cleanup(void) +{ + async_fiber_context_t *fiber_context = NULL; + + zend_coroutine_t *coroutine = ZEND_ASYNC_CURRENT_COROUTINE; + ZEND_ASYNC_CURRENT_COROUTINE = NULL; + + while (circular_buffer_pop_ptr(&ASYNC_G(fiber_context_pool), (void**)&fiber_context) == SUCCESS) { + if (fiber_context != NULL) { + zend_fiber_transfer transfer = { + .context = &fiber_context->context, + .flags = 0 + }; + + // If the current coroutine is NULL, this state explicitly tells the fiber to stop execution. + // We set this value each time, since the switch_to_scheduler function may change it. + ZEND_ASYNC_CURRENT_COROUTINE = NULL; + zend_fiber_switch_context(&transfer); + + // Transfer the exception to the current coroutine. + if (UNEXPECTED(transfer.flags & ZEND_FIBER_TRANSFER_FLAG_ERROR)) { + async_rethrow_exception(Z_OBJ(transfer.value)); + ZVAL_NULL(&transfer.value); + } + } + } + + ZEND_ASYNC_CURRENT_COROUTINE = coroutine; + + circular_buffer_dtor(&ASYNC_G(fiber_context_pool)); +} + +/////////////////////////////////////////////////////////// +/// MICROTASK EXECUTION +/////////////////////////////////////////////////////////// + zend_always_inline static void execute_microtasks(void) { circular_buffer_t *buffer = &ASYNC_G(microtasks); @@ -59,81 +178,74 @@ zend_always_inline static void execute_microtasks(void) } } +/////////////////////////////////////////////////////////// +/// FIBER CONTEXT SWITCHING UTILITIES +/////////////////////////////////////////////////////////// + +static zend_always_inline void fiber_context_update_before_suspend(void) +{ + async_coroutine_t *coroutine = (async_coroutine_t *) ZEND_ASYNC_CURRENT_COROUTINE; + + if (coroutine != NULL && coroutine->fiber_context != NULL) { + coroutine->fiber_context->execute_data = EG(current_execute_data); + } +} + /** - * Defines a transfer object for the coroutine context. - * - * This function is used to return control from a coroutine FOR THE LAST TIME. - * If you only need to suspend coroutines, you should use the `switch_context()` function. + * Transfers the current exception (EG(exception)) to the transfer object. * - * This function initializes the coroutine context if it is not already initialized, - * and sets the transfer value to the provided exception or NULL. - * - * @param coroutine The coroutine to define the transfer for. - * @param exception The exception to pass, or NULL if no exception is to be passed. - * @param transfer The transfer object to define. + * @param transfer Transfer object that will hold the transfer information. */ -static zend_always_inline void -define_transfer(async_coroutine_t *coroutine, zend_object *exception, zend_fiber_transfer *transfer) -{ - if (UNEXPECTED(coroutine->context.status == ZEND_FIBER_STATUS_INIT && - zend_fiber_init_context( - &coroutine->context, async_ce_coroutine, async_coroutine_execute, EG(fiber_stack_size)) == - FAILURE)) { - zend_throw_error(NULL, "Failed to initialize coroutine context"); +static zend_always_inline void transfer_current_exception(zend_fiber_transfer *transfer) +{ + if (EXPECTED(EG(exception) == NULL)) { + transfer->flags &= ~ZEND_FIBER_TRANSFER_FLAG_ERROR; return; } - transfer->context = &coroutine->context; - transfer->flags = exception != NULL ? ZEND_FIBER_TRANSFER_FLAG_ERROR : 0; - - if (exception != NULL) { - ZVAL_OBJ(&transfer->value, exception); - } else { - ZVAL_NULL(&transfer->value); + if (EG(prev_exception)) { + zend_exception_save(); + zend_exception_restore(); } - ZEND_ASYNC_CURRENT_COROUTINE = &coroutine->coroutine; + zend_object *exception = EG(exception); + GC_ADDREF(exception); + zend_clear_exception(); + + transfer->flags |= ZEND_FIBER_TRANSFER_FLAG_ERROR; + ZVAL_OBJ(&transfer->value, exception); } /** - * Switches the context to the given coroutine, optionally passing an exception. - * - * If the coroutine context is not initialized, it will be initialized first. - * If an exception is provided, it will be set in the transfer value. - * - * IMPORTANT! This function must be called ONLY if the coroutine being switched from is not finishing its execution. - * If the coroutine is yielding control for the **last time**, then you must use define_transfer(). + * Switches control from the current fiber to the coroutine's fiber. + * The coroutine's fiber must be defined! * * @param coroutine The coroutine to switch to. - * @param exception The exception to pass, or NULL if no exception is to be passed. */ -static zend_always_inline void switch_context(async_coroutine_t *coroutine, zend_object *exception) +static zend_always_inline void fiber_switch_context(async_coroutine_t *coroutine) { + async_fiber_context_t *fiber_context = coroutine->fiber_context; + + ZEND_ASSERT(fiber_context != NULL && "Fiber context is NULL in fiber_switch_context"); + zend_fiber_transfer transfer = { - .context = &coroutine->context, - .flags = exception != NULL ? ZEND_FIBER_TRANSFER_FLAG_ERROR : 0, + .context = &fiber_context->context, + .flags = 0 }; - if (coroutine->context.status == ZEND_FIBER_STATUS_INIT && - zend_fiber_init_context( - &coroutine->context, async_ce_coroutine, async_coroutine_execute, EG(fiber_stack_size)) == FAILURE) { - zend_throw_error(NULL, "Failed to initialize coroutine context"); - return; - } +#if FIBER_DEBUG_SWITCH + zend_fiber_context *from = EG(current_fiber_context); + zend_fiber_context *to = &coroutine->fiber_context->context; - if (exception != NULL) { - ZVAL_OBJ(&transfer.value, exception); + if (ZEND_ASYNC_SCHEDULER == &coroutine->coroutine) { + FIBER_DEBUG("Switch fiber: %p => %p for scheduler: %p\n", from, to, coroutine); } else { - ZVAL_NULL(&transfer.value); + FIBER_DEBUG("Switch fiber: %p => %p for coroutine: %p\n", from, to, coroutine); } - - zend_coroutine_t *previous_coroutine = ZEND_ASYNC_CURRENT_COROUTINE; - ZEND_ASYNC_CURRENT_COROUTINE = &coroutine->coroutine; +#endif zend_fiber_switch_context(&transfer); - ZEND_ASYNC_CURRENT_COROUTINE = previous_coroutine; - /* Forward bailout into current coroutine. */ if (UNEXPECTED(transfer.flags & ZEND_FIBER_TRANSFER_FLAG_BAILOUT)) { ZEND_ASYNC_CURRENT_COROUTINE = NULL; @@ -147,11 +259,54 @@ static zend_always_inline void switch_context(async_coroutine_t *coroutine, zend } } +/** + * Switches to the scheduler coroutine. + * + * This method is used to transfer control to the special internal Scheduler coroutine. + * The transfer parameter can be NULL for temporary suspension. + * However, if the current coroutine is losing control PERMANENTLY, you must provide transfer. + * + * If the transfer object is provided, it will define the transfer for the scheduler. + * If no transfer is provided, it will switch to the scheduler context without defining a transfer. + * + * @param transfer The transfer object to define for the scheduler, or NULL if not needed. + */ +static zend_always_inline void switch_to_scheduler(zend_fiber_transfer *transfer) +{ + async_coroutine_t *async_coroutine = (async_coroutine_t *) ZEND_ASYNC_SCHEDULER; + + ZEND_ASSERT(async_coroutine != NULL && "Scheduler coroutine is not initialized"); + + if (transfer != NULL) { + // In case the control is transferred to the Scheduler, + // the bailout flag must be cleared so that the Scheduler continues to operate normally. + // In other words, critical exceptions should not cause the Scheduler to terminate fatally. + transfer->flags &= ~ZEND_FIBER_TRANSFER_FLAG_BAILOUT; + transfer->context = &async_coroutine->fiber_context->context; + transfer_current_exception(transfer); + ZEND_ASYNC_CURRENT_COROUTINE = &async_coroutine->coroutine; + } else { + fiber_context_update_before_suspend(); + ZEND_ASYNC_CURRENT_COROUTINE = &async_coroutine->coroutine; + fiber_switch_context(async_coroutine); + } +} + +static zend_always_inline void return_to_main(zend_fiber_transfer *transfer) +{ + transfer->context = ASYNC_G(main_transfer)->context; + transfer_current_exception(transfer); +} + +/////////////////////////////////////////////////////////// +/// COROUTINE QUEUE MANAGEMENT +/////////////////////////////////////////////////////////// + static zend_always_inline async_coroutine_t *next_coroutine(void) { async_coroutine_t *coroutine; - if (UNEXPECTED(circular_buffer_pop(&ASYNC_G(coroutine_queue), &coroutine) == FAILURE)) { + if (UNEXPECTED(circular_buffer_pop_ptr(&ASYNC_G(coroutine_queue), (void**)&coroutine) == FAILURE)) { ZEND_ASSERT("Failed to pop the coroutine from the pending queue."); return NULL; } @@ -159,108 +314,207 @@ static zend_always_inline async_coroutine_t *next_coroutine(void) return coroutine; } -typedef enum +static zend_always_inline async_fiber_context_t *fiber_context_allocate(void) { - COROUTINE_NOT_EXISTS, - COROUTINE_SWITCHED, - COROUTINE_IGNORED -} switch_status; + async_fiber_context_t *fiber_context = NULL; + + circular_buffer_pop_ptr(&ASYNC_G(fiber_context_pool), (void**)&fiber_context); + + if (fiber_context == NULL) { + fiber_context = async_fiber_context_create(); + } + + return fiber_context; +} + +static zend_always_inline bool return_fiber_to_pool(async_fiber_context_t *fiber_context) +{ + circular_buffer_t *buffer = &ASYNC_G(fiber_context_pool); + + if (buffer->capacity > 0 && false == circular_buffer_is_full(buffer)) { + if (EXPECTED(circular_buffer_push_ptr(buffer, fiber_context) != FAILURE)) { + return true; + } + + async_throw_error("Failed to push fiber context to the pool"); + return false; + } + + return false; +} /** * Executes the next coroutine in the queue. * - * If the coroutine is not ready to be executed, it will return false. - * If the coroutine is finished, it will clean up and return true. + * The function is used when one Fiber switches to another. + * The execute_next_coroutine_from_fiber function is used + * when a Fiber is ready to execute another coroutine within itself. * - * @param transfer The transfer object to define the context for the coroutine. * @return switch_status - status of the coroutine switching. */ -static switch_status execute_next_coroutine(zend_fiber_transfer *transfer) +static zend_always_inline switch_status execute_next_coroutine(void) { async_coroutine_t *async_coroutine = next_coroutine(); - zend_coroutine_t *coroutine = &async_coroutine->coroutine; - if (UNEXPECTED(coroutine == NULL)) { + if (UNEXPECTED(async_coroutine == NULL)) { return COROUTINE_NOT_EXISTS; } - zend_async_waker_t *waker = coroutine->waker; + zend_coroutine_t *coroutine = &async_coroutine->coroutine; - if (UNEXPECTED(waker == NULL || waker->status == ZEND_ASYNC_WAKER_IGNORED)) { + // If the current coroutine is the same as the one we are trying to execute, + if (UNEXPECTED(coroutine == ZEND_ASYNC_CURRENT_COROUTINE)) { + return COROUTINE_SWITCHED; + } - // - // This state triggers if the fiber has never been started; - // in this case, it is deallocated differently than usual. - // Finalizing handlers are called. Memory is freed in the correct order! - // - if (ZEND_COROUTINE_IS_CANCELLED(coroutine)) { - async_coroutine_finalize_from_scheduler(async_coroutine); + if (async_coroutine->waker.status == ZEND_ASYNC_WAKER_IGNORED) { + ZEND_ASYNC_CURRENT_COROUTINE = coroutine; + FIBER_DEBUG("Execute coroutine %p in Fiber %p\n", coroutine, EG(current_fiber_context)); + async_coroutine_execute(async_coroutine); + ZEND_ASYNC_CURRENT_COROUTINE = NULL; + return COROUTINE_IGNORED; + } else if (async_coroutine->fiber_context != NULL) { + fiber_context_update_before_suspend(); + ZEND_ASYNC_CURRENT_COROUTINE = coroutine; + fiber_switch_context(async_coroutine); + return COROUTINE_SWITCHED; + } else { + + // The coroutine doesn't have its own Fiber, + // so we first need to allocate a Fiber context for it and then start it. + circular_buffer_pop_ptr(&ASYNC_G(fiber_context_pool), (void**)&async_coroutine->fiber_context); + + if (async_coroutine->fiber_context == NULL) { + async_coroutine->fiber_context = async_fiber_context_create(); } - coroutine->event.dispose(&coroutine->event); - return COROUTINE_IGNORED; + fiber_context_update_before_suspend(); + ZEND_ASYNC_CURRENT_COROUTINE = coroutine; + fiber_switch_context(async_coroutine); + return COROUTINE_SWITCHED; } +} - if (UNEXPECTED(waker->status == ZEND_ASYNC_WAKER_WAITING)) { - zend_error(E_ERROR, "Attempt to resume a fiber that has not been resolved"); - coroutine->event.dispose(&coroutine->event); - return COROUTINE_IGNORED; +/** + * Executes the next coroutine in the queue. + * + * This function is used in two different cases: + * Inside a Fiber that is free to run a coroutine, in which case transfer != NULL. + * During a suspend operation, when the Fiber is occupied by the current + * coroutine but needs to switch to another Fiber with a new one. + * + * @param transfer A transfer object that is not NULL if the current Fiber has no owning coroutine. + * @param fiber_context The current Fiber context if available. + * @return switch_status - status of the coroutine switching. + */ + +#define AVAILABLE_FOR_COROUTINE (transfer != NULL) + +static zend_always_inline switch_status execute_next_coroutine_from_fiber(zend_fiber_transfer *transfer, async_fiber_context_t *fiber_context) +{ + async_coroutine_t *async_coroutine = next_coroutine(); + + if (UNEXPECTED(async_coroutine == NULL)) { + return COROUTINE_NOT_EXISTS; } - waker->status = ZEND_ASYNC_WAKER_RESULT; - zend_object *error = waker->error; + zend_coroutine_t *coroutine = &async_coroutine->coroutine; - // The Waker object can be destroyed immediately if the result is an error. - // It will be delivered to the coroutine as an exception. - if (error != NULL) { - waker->error = NULL; - zend_async_waker_destroy(coroutine); +next_coroutine: + + if (async_coroutine->waker.status == ZEND_ASYNC_WAKER_IGNORED) { + // Case: the coroutine in queue was cancelled. + // In this case, only the coroutine finalization process needs to be executed, + // and it can be done immediately. + ZEND_ASYNC_CURRENT_COROUTINE = coroutine; + FIBER_DEBUG("Execute coroutine %p in Fiber %p\n", coroutine, EG(current_fiber_context)); + async_coroutine_execute(async_coroutine); + ZEND_ASYNC_CURRENT_COROUTINE = NULL; + return COROUTINE_IGNORED; } - if (transfer != NULL) { - define_transfer(async_coroutine, error, transfer); - return COROUTINE_SWITCHED; - } else if (ZEND_ASYNC_CURRENT_COROUTINE == coroutine) { - if (error != NULL) { - async_rethrow_exception(error); + if (AVAILABLE_FOR_COROUTINE && async_coroutine->fiber_context == fiber_context) { + + // Case: The current coroutine is assigned to this fiber. + // Just execute it without switching. + FIBER_DEBUG("Execute coroutine %p in Fiber %p\n", coroutine, EG(current_fiber_context)); + async_coroutine_execute(async_coroutine); + return COROUTINE_FINISHED; + + } else if (AVAILABLE_FOR_COROUTINE && async_coroutine->fiber_context != NULL) { + + // Case: the current fiber has no coroutine to execute, + // but the next coroutine in the queue is already in use. + if (return_fiber_to_pool(fiber_context)) { + fiber_context_update_before_suspend(); + ZEND_ASYNC_CURRENT_COROUTINE = coroutine; + fiber_switch_context(async_coroutine); + + // When control returns to us, we try to execute the coroutine that is currently active. + coroutine = ZEND_ASYNC_CURRENT_COROUTINE; + + if (UNEXPECTED(coroutine == NULL)) { + // There are no more coroutines to execute; we need to exit. + return SHOULD_BE_EXIT; + } + + async_coroutine = (async_coroutine_t *) coroutine; + goto next_coroutine; + } else { + // The pool is already full, so the Fiber should be destroyed after the switch occurs. + transfer->context = &async_coroutine->fiber_context->context; + transfer_current_exception(transfer); + return SHOULD_BE_EXIT; } + + } else if (async_coroutine->fiber_context != NULL) { + fiber_context_update_before_suspend(); + ZEND_ASYNC_CURRENT_COROUTINE = coroutine; + fiber_switch_context(async_coroutine); return COROUTINE_SWITCHED; + } else if (AVAILABLE_FOR_COROUTINE && async_coroutine->fiber_context == NULL) { + // Note that async_coroutine_execute is also called in cases + // where the coroutine was never executed and was canceled. + // In this case, no context switch occurs, so this code executes regardless of which fiber it's running in. + async_coroutine->fiber_context = fiber_context; + ZEND_ASYNC_CURRENT_COROUTINE = coroutine; + FIBER_DEBUG("Execute coroutine %p in Fiber %p\n", coroutine, EG(current_fiber_context)); + async_coroutine_execute(async_coroutine); + return COROUTINE_FINISHED; } else { - switch_context(async_coroutine, error); - } - // - // At this point, the async_coroutine must already be destroyed - // + // (AVAILABLE_FOR_COROUTINE == false) + // The coroutine doesn't have its own Fiber, + // so we first need to allocate a Fiber context for it and then start it. + circular_buffer_pop_ptr(&ASYNC_G(fiber_context_pool), (void**)&async_coroutine->fiber_context); - return COROUTINE_SWITCHED; + if (async_coroutine->fiber_context == NULL) { + async_coroutine->fiber_context = async_fiber_context_create(); + } + + fiber_context_update_before_suspend(); + ZEND_ASYNC_CURRENT_COROUTINE = coroutine; + fiber_switch_context(async_coroutine); + return COROUTINE_SWITCHED; + } } -/** - * Switches to the scheduler coroutine. - * - * This method is used to transfer control to the special internal Scheduler coroutine. - * The transfer parameter can be NULL for temporary suspension. - * However, if the current coroutine is losing control PERMANENTLY, you must provide transfer. - * - * If the transfer object is provided, it will define the transfer for the scheduler. - * If no transfer is provided, it will switch to the scheduler context without defining a transfer. - * - * @param transfer The transfer object to define for the scheduler, or NULL if not needed. - */ -static zend_always_inline void switch_to_scheduler(zend_fiber_transfer *transfer) +zend_always_inline static void execute_queued_coroutines(void) { - async_coroutine_t *async_coroutine = (async_coroutine_t *) ZEND_ASYNC_SCHEDULER; - - ZEND_ASSERT(async_coroutine != NULL && "Scheduler coroutine is not initialized"); + // @todo: need to refactoring + while (false == circular_buffer_is_empty(&ASYNC_G(coroutine_queue))) { + execute_next_coroutine(); - if (transfer != NULL) { - define_transfer(async_coroutine, NULL, transfer); - } else { - switch_context(async_coroutine, NULL); + if (UNEXPECTED(EG(exception))) { + zend_exception_save(); + } } } +/////////////////////////////////////////////////////////// +/// DEADLOCK RESOLUTION AND ERROR HANDLING +/////////////////////////////////////////////////////////// + static bool resolve_deadlocks(void) { zval *value; @@ -310,80 +564,13 @@ static bool resolve_deadlocks(void) } ZEND_HASH_FOREACH_END(); - - return false; -} - -zend_always_inline static void execute_queued_coroutines(void) -{ - while (false == circular_buffer_is_empty(&ASYNC_G(coroutine_queue))) { - execute_next_coroutine(NULL); - - if (UNEXPECTED(EG(exception))) { - zend_exception_save(); - } - } -} - -static void async_scheduler_dtor(void) -{ - ZEND_ASYNC_SCHEDULER_CONTEXT = true; - - execute_microtasks(); - - ZEND_ASYNC_SCHEDULER_CONTEXT = false; - - if (UNEXPECTED(false == circular_buffer_is_empty(&ASYNC_G(microtasks)))) { - async_warning("%u microtasks were not executed", circular_buffer_count(&ASYNC_G(microtasks))); - } - - if (UNEXPECTED(false == circular_buffer_is_empty(&ASYNC_G(coroutine_queue)))) { - async_warning("%u deferred coroutines were not executed", circular_buffer_count(&ASYNC_G(coroutine_queue))); - } - - zval_c_buffer_cleanup(&ASYNC_G(coroutine_queue)); - zval_c_buffer_cleanup(&ASYNC_G(microtasks)); - - zval *current; - // foreach by fibers_state and release all fibers - ZEND_HASH_FOREACH_VAL(&ASYNC_G(coroutines), current) - { - async_coroutine_t *coroutine = Z_PTR_P(current); - OBJ_RELEASE(&coroutine->std); - } - ZEND_HASH_FOREACH_END(); - - zend_hash_clean(&ASYNC_G(coroutines)); - zend_hash_destroy(&ASYNC_G(coroutines)); - zend_hash_init(&ASYNC_G(coroutines), 0, NULL, NULL, 0); - - ZEND_ASYNC_GRACEFUL_SHUTDOWN = false; - ZEND_ASYNC_SCHEDULER_CONTEXT = false; - - zend_exception_restore(); -} - -static void dispose_coroutines(void) -{ - zval *current; - - ZEND_HASH_FOREACH_VAL(&ASYNC_G(coroutines), current) - { - zend_coroutine_t *coroutine = Z_PTR_P(current); - - if (coroutine->waker != NULL) { - coroutine->waker->status = ZEND_ASYNC_WAKER_IGNORED; - } - - coroutine->event.dispose(&coroutine->event); - - if (EG(exception)) { - zend_exception_save(); - } - } - ZEND_HASH_FOREACH_END(); + + return false; } +/////////////////////////////////////////////////////////// +/// SHUTDOWN AND CLEANUP +/////////////////////////////////////////////////////////// static void cancel_queued_coroutines(void) { zend_exception_save(); @@ -397,7 +584,7 @@ static void cancel_queued_coroutines(void) { zend_coroutine_t *coroutine = Z_PTR_P(current); - if (((async_coroutine_t *) coroutine)->context.status == ZEND_FIBER_STATUS_INIT) { + if (false == ZEND_COROUTINE_IS_STARTED(coroutine)) { // No need to cancel the fiber if it has not been started. coroutine->waker->status = ZEND_ASYNC_WAKER_IGNORED; ZEND_COROUTINE_SET_CANCELLED(coroutine); @@ -423,32 +610,6 @@ static void cancel_queued_coroutines(void) zend_exception_restore(); } -void async_scheduler_start_waker_events(zend_async_waker_t *waker) -{ - ZEND_ASSERT(waker != NULL && "Waker is NULL in async_scheduler_start_waker_events"); - - zval *current; - ZEND_HASH_FOREACH_VAL(&waker->events, current) - { - const zend_async_waker_trigger_t *trigger = Z_PTR_P(current); - trigger->event->start(trigger->event); - } - ZEND_HASH_FOREACH_END(); -} - -void async_scheduler_stop_waker_events(zend_async_waker_t *waker) -{ - ZEND_ASSERT(waker != NULL && "Waker is NULL in async_scheduler_stop_waker_events"); - - zval *current; - ZEND_HASH_FOREACH_VAL(&waker->events, current) - { - const zend_async_waker_trigger_t *trigger = Z_PTR_P(current); - trigger->event->stop(trigger->event); - } - ZEND_HASH_FOREACH_END(); -} - void start_graceful_shutdown(void) { if (ZEND_ASYNC_GRACEFUL_SHUTDOWN) { @@ -503,16 +664,83 @@ static void finally_shutdown(void) } } -void async_scheduler_main_loop(void); +static void async_scheduler_dtor(void) +{ + ZEND_ASYNC_SCHEDULER_CONTEXT = true; -#define TRY_HANDLE_EXCEPTION() \ - if (UNEXPECTED(EG(exception) != NULL)) { \ - if (ZEND_ASYNC_GRACEFUL_SHUTDOWN) { \ - finally_shutdown(); \ - break; \ - } \ - start_graceful_shutdown(); \ + execute_microtasks(); + + ZEND_ASYNC_SCHEDULER_CONTEXT = false; + + if (UNEXPECTED(false == circular_buffer_is_empty(&ASYNC_G(microtasks)))) { + async_warning("%u microtasks were not executed", circular_buffer_count(&ASYNC_G(microtasks))); + } + + if (UNEXPECTED(false == circular_buffer_is_empty(&ASYNC_G(coroutine_queue)))) { + async_warning("%u deferred coroutines were not executed", circular_buffer_count(&ASYNC_G(coroutine_queue))); + } + + // Destroy the scheduler coroutine at the end. + async_coroutine_t *async_coroutine = (async_coroutine_t *) ZEND_ASYNC_SCHEDULER; + ZEND_ASYNC_SCHEDULER = NULL; + async_coroutine->fiber_context = NULL; + OBJ_RELEASE(&async_coroutine->std); + + zval_c_buffer_cleanup(&ASYNC_G(coroutine_queue)); + zval_c_buffer_cleanup(&ASYNC_G(microtasks)); + + zval *current; + // foreach by fibers_state and release all fibers + ZEND_HASH_FOREACH_VAL(&ASYNC_G(coroutines), current) + { + async_coroutine_t *coroutine = Z_PTR_P(current); + OBJ_RELEASE(&coroutine->std); + } + ZEND_HASH_FOREACH_END(); + + zend_hash_clean(&ASYNC_G(coroutines)); + zend_hash_destroy(&ASYNC_G(coroutines)); + zend_hash_init(&ASYNC_G(coroutines), 0, NULL, NULL, 0); + + ZEND_ASYNC_GRACEFUL_SHUTDOWN = false; + ZEND_ASYNC_SCHEDULER_CONTEXT = false; + + zend_exception_restore(); +} + +/////////////////////////////////////////////////////////// +/// WAKER EVENT MANAGEMENT +/////////////////////////////////////////////////////////// + +static zend_always_inline void start_waker_events(zend_async_waker_t *waker) +{ + ZEND_ASSERT(waker != NULL && "Waker is NULL in async_scheduler_start_waker_events"); + + zval *current; + ZEND_HASH_FOREACH_VAL(&waker->events, current) + { + const zend_async_waker_trigger_t *trigger = Z_PTR_P(current); + trigger->event->start(trigger->event); + } + ZEND_HASH_FOREACH_END(); +} + +static zend_always_inline void stop_waker_events(zend_async_waker_t *waker) +{ + ZEND_ASSERT(waker != NULL && "Waker is NULL in async_scheduler_stop_waker_events"); + + zval *current; + ZEND_HASH_FOREACH_VAL(&waker->events, current) + { + const zend_async_waker_trigger_t *trigger = Z_PTR_P(current); + trigger->event->stop(trigger->event); } + ZEND_HASH_FOREACH_END(); +} + +/////////////////////////////////////////////////////////// +/// SCHEDULER CORE +/////////////////////////////////////////////////////////// /** * The main loop of the scheduler. @@ -540,6 +768,12 @@ void async_scheduler_launch(void) return; } + fiber_pool_init(); + + if (UNEXPECTED(EG(exception) != NULL)) { + return; + } + // // We convert the current main execution flow into the main coroutine. // The main coroutine differs from others in that it is already started, and its handle is NULL. @@ -585,10 +819,18 @@ void async_scheduler_launch(void) return; } + // Create a new Fiber context for the main coroutine. + async_fiber_context_t *fiber_context = ecalloc(1, sizeof(async_fiber_context_t)); // Copy the main coroutine context - main_coroutine->context = *EG(main_fiber_context); + fiber_context->context = *EG(main_fiber_context); + fiber_context->execute_data = EG(current_execute_data); + // Set the current fiber context to the main coroutine context - EG(current_fiber_context) = &main_coroutine->context; + EG(current_fiber_context) = &fiber_context->context; + zend_fiber_context *zend_fiber_context = &fiber_context->context; + + // The main coroutine will always own the fiber, unlike other coroutines. + main_coroutine->fiber_context = fiber_context; zend_fiber_switch_blocked(); @@ -620,9 +862,9 @@ void async_scheduler_launch(void) // It's essentially a switch from the zero context to the coroutine context, even though, // logically, both contexts belong to the main execution thread. // - main_coroutine->context.status = ZEND_FIBER_STATUS_INIT; - zend_observer_fiber_switch_notify(main_transfer->context, &main_coroutine->context); - main_coroutine->context.status = ZEND_FIBER_STATUS_RUNNING; + zend_fiber_context->status = ZEND_FIBER_STATUS_INIT; + zend_observer_fiber_switch_notify(main_transfer->context, zend_fiber_context); + zend_fiber_context->status = ZEND_FIBER_STATUS_RUNNING; ASYNC_G(main_transfer) = main_transfer; ASYNC_G(main_vm_stack) = EG(vm_stack); @@ -637,7 +879,26 @@ void async_scheduler_launch(void) return; } - scheduler_coroutine->internal_entry = async_scheduler_main_loop; + scope = ZEND_ASYNC_NEW_SCOPE(NULL); + if (UNEXPECTED(EG(exception))) { + return; + } + + ZVAL_UNDEF(&options); + scope->before_coroutine_enqueue(scheduler_coroutine, scope, &options); + zval_dtor(&options); + + if (UNEXPECTED(EG(exception) != NULL)) { + return; + } + + scope->after_coroutine_enqueue(scheduler_coroutine, scope); + if (UNEXPECTED(EG(exception) != NULL)) { + return; + } + + scheduler_coroutine->internal_entry = NULL; + ((async_coroutine_t *) scheduler_coroutine)->fiber_context = async_fiber_context_create(); ZEND_ASYNC_SCHEDULER = scheduler_coroutine; ZEND_ASYNC_ACTIVATE; @@ -650,6 +911,9 @@ void async_scheduler_launch(void) /** * A special function that is called when the main coroutine permanently loses the execution flow. * Exiting this function means that the entire PHP script has finished. + * + * This function is needed because the main coroutine runs differently from the others + * — its logic cycle is broken. */ void async_scheduler_main_coroutine_suspend(void) { @@ -665,13 +929,16 @@ void async_scheduler_main_coroutine_suspend(void) async_coroutine_t *coroutine = (async_coroutine_t *) ZEND_ASYNC_CURRENT_COROUTINE; zend_fiber_transfer *transfer = ASYNC_G(main_transfer); + zend_fiber_context *fiber_context = &coroutine->fiber_context->context; + async_fiber_context_t *async_fiber_context = coroutine->fiber_context; + coroutine->fiber_context = NULL; + ZEND_ASYNC_CURRENT_COROUTINE = NULL; zend_try { // We reach this point when the main coroutine has completed its execution. - async_coroutine_finalize(transfer, coroutine); - - coroutine->context.cleanup = NULL; + async_coroutine_finalize(coroutine); + fiber_context->cleanup = NULL; OBJ_RELEASE(&coroutine->std); @@ -686,6 +953,9 @@ void async_scheduler_main_coroutine_suspend(void) // so that on the next switch we return to this exact point. EG(current_fiber_context) = transfer->context; + // Destroy main Fiber context. + efree(async_fiber_context); + switch_to_scheduler(NULL); } zend_catch @@ -733,17 +1003,6 @@ void async_scheduler_main_coroutine_suspend(void) } } -#define TRY_HANDLE_SUSPEND_EXCEPTION() \ - if (UNEXPECTED(EG(exception) != NULL)) { \ - if (ZEND_ASYNC_GRACEFUL_SHUTDOWN) { \ - finally_shutdown(); \ - switch_to_scheduler(transfer); \ - zend_exception_restore(); \ - return; \ - } \ - start_graceful_shutdown(); \ - } - void async_scheduler_coroutine_enqueue(zend_coroutine_t *coroutine) { /** @@ -762,7 +1021,7 @@ void async_scheduler_coroutine_enqueue(zend_coroutine_t *coroutine) } // If the transfer is NULL, it means that the coroutine is being resumed - // That’s why we’re adding it to the queue. + // That's why we're adding it to the queue. // coroutine->waker->status != ZEND_ASYNC_WAKER_QUEUED means not need to add to queue twice if (coroutine != NULL && (coroutine->waker == NULL || false == ZEND_ASYNC_WAKER_IN_QUEUE(coroutine->waker))) { if (coroutine->waker == NULL) { @@ -775,20 +1034,60 @@ void async_scheduler_coroutine_enqueue(zend_coroutine_t *coroutine) coroutine->waker = waker; } - coroutine->waker->status = ZEND_ASYNC_WAKER_QUEUED; - - if (UNEXPECTED(circular_buffer_push(&ASYNC_G(coroutine_queue), &coroutine, true)) == FAILURE) { + if (UNEXPECTED(circular_buffer_push_ptr_with_resize(&ASYNC_G(coroutine_queue), coroutine) == FAILURE)) { async_throw_error("Failed to enqueue coroutine"); + } else { + coroutine->waker->status = ZEND_ASYNC_WAKER_QUEUED; } // // We stop all events as soon as the coroutine is ready to run. // - async_scheduler_stop_waker_events(coroutine->waker); + stop_waker_events(coroutine->waker); + } +} + +/** + * Implements a single tick of the Scheduler + * and is called from the suspend operation while the context switch has not yet occurred. + */ +static zend_always_inline void scheduler_next_tick(void) +{ + zend_fiber_transfer *transfer = NULL; + ZEND_ASYNC_SCHEDULER_CONTEXT = true; + + execute_microtasks(); + TRY_HANDLE_SUSPEND_EXCEPTION(); + + const bool has_handles = ZEND_ASYNC_REACTOR_EXECUTE(circular_buffer_is_not_empty(&ASYNC_G(coroutine_queue))); + TRY_HANDLE_SUSPEND_EXCEPTION(); + + ZEND_ASYNC_SCHEDULER_CONTEXT = false; + + TRY_HANDLE_SUSPEND_EXCEPTION(); + + const bool is_next_coroutine = circular_buffer_is_not_empty(&ASYNC_G(coroutine_queue)); + + if (UNEXPECTED(false == has_handles && false == is_next_coroutine && + zend_hash_num_elements(&ASYNC_G(coroutines)) > 0 && circular_buffer_is_empty(&ASYNC_G(microtasks)) && + resolve_deadlocks())) { + switch_to_scheduler(transfer); + } + + if (EXPECTED(is_next_coroutine)) { + // + // The execute_next_coroutine() may fail to transfer control to another coroutine for various reasons. + // In that case, it returns false, and we are then required to yield control to the scheduler. + // + if (COROUTINE_SWITCHED != execute_next_coroutine() && EG(exception) == NULL) { + switch_to_scheduler(transfer); + } + } else { + switch_to_scheduler(transfer); } } -void async_scheduler_coroutine_suspend(zend_fiber_transfer *transfer) +void async_scheduler_coroutine_suspend(void) { // // Before suspending the coroutine, we save the current exception state. @@ -817,19 +1116,21 @@ void async_scheduler_coroutine_suspend(zend_fiber_transfer *transfer) // we start all its Waker-events. // This causes timers to start, POLL objects to begin waiting for events, and so on. // - if (transfer == NULL && coroutine != NULL && coroutine->waker != NULL) { + if (coroutine != NULL && coroutine->waker != NULL) { + + const bool not_in_queue = ZEND_ASYNC_WAKER_NOT_IN_QUEUE(coroutine->waker); - // Let’s check that the coroutine has something to wait for; - // If a coroutine isn’t waiting for anything, it must be in the execution queue. - // otherwise, it’s a potential deadlock. - if (coroutine->waker->events.nNumOfElements == 0 && false == ZEND_ASYNC_WAKER_IN_QUEUE(coroutine->waker)) { + // Let's check that the coroutine has something to wait for; + // If a coroutine isn't waiting for anything, it must be in the execution queue. + // otherwise, it's a potential deadlock. + if (coroutine->waker->events.nNumOfElements == 0 && not_in_queue) { async_throw_error("The coroutine has no events to wait for"); - zend_async_waker_destroy(coroutine); + zend_async_waker_clean(coroutine); zend_exception_restore(); return; } - async_scheduler_start_waker_events(coroutine->waker); + start_waker_events(coroutine->waker); // If an exception occurs during the startup of the Waker object, // that exception belongs to the current coroutine, @@ -837,12 +1138,15 @@ void async_scheduler_coroutine_suspend(zend_fiber_transfer *transfer) if (UNEXPECTED(EG(exception))) { // Before returning, We are required to properly destroy the Waker object. zend_exception_save(); - async_scheduler_stop_waker_events(coroutine->waker); - zend_async_waker_destroy(coroutine); - zend_exception_restore(); + stop_waker_events(coroutine->waker); + zend_async_waker_clean(coroutine); zend_exception_restore(); return; } + + if (not_in_queue) { + coroutine->waker->status = ZEND_ASYNC_WAKER_WAITING; + } } if (UNEXPECTED(coroutine->switch_handlers)) { @@ -851,94 +1155,120 @@ void async_scheduler_coroutine_suspend(zend_fiber_transfer *transfer) } // Define current filename and line number for the coroutine suspend. - if (coroutine->waker != NULL) { + if (EXPECTED(coroutine->waker)) { zend_apply_current_filename_and_line(&coroutine->waker->filename, &coroutine->waker->lineno); } - // - // The async_scheduler_coroutine_suspend function is called - // with the transfer parameter not null when the current coroutine finishes execution. - // This means that the transfer structure may contain an exception object - // if the coroutine ended with an error. - // We are required to handle this situation. - // - if (UNEXPECTED(transfer != NULL && transfer->flags & ZEND_FIBER_TRANSFER_FLAG_ERROR)) { - - zend_object *exception = Z_OBJ(transfer->value); - ZEND_ASSERT(Z_TYPE(transfer->value) == IS_OBJECT && "The transfer value must be an exception object"); + scheduler_next_tick(); - transfer->flags = 0; // Reset the flags to avoid reprocessing the exception - ZVAL_NULL(&transfer->value); // Reset the transfer value to avoid memory leaks + ZEND_ASYNC_CURRENT_COROUTINE = coroutine; - if (ZEND_ASYNC_EXIT_EXCEPTION != NULL) { - zend_exception_set_previous(exception, ZEND_ASYNC_EXIT_EXCEPTION); - ZEND_ASYNC_EXIT_EXCEPTION = exception; - } else { - ZEND_ASYNC_EXIT_EXCEPTION = exception; - } - - if (ZEND_ASYNC_GRACEFUL_SHUTDOWN) { - finally_shutdown(); - } else { - start_graceful_shutdown(); - } + if (EXPECTED(coroutine->waker->status == ZEND_ASYNC_WAKER_QUEUED)) { + coroutine->waker->status = ZEND_ASYNC_WAKER_RESULT; + } - switch_to_scheduler(transfer); - zend_exception_restore(); - return; + if (UNEXPECTED(coroutine->switch_handlers)) { + ZEND_COROUTINE_ENTER(coroutine); } - ZEND_ASYNC_SCHEDULER_CONTEXT = true; + // Rethrow exception if waker has it + if (coroutine->waker->error != NULL) { + zend_object *exception = coroutine->waker->error; + coroutine->waker->error = NULL; + async_rethrow_exception(exception); + } - execute_microtasks(); - TRY_HANDLE_SUSPEND_EXCEPTION(); + zend_exception_restore(); +} - const bool has_handles = ZEND_ASYNC_REACTOR_EXECUTE(circular_buffer_is_not_empty(&ASYNC_G(coroutine_queue))); - TRY_HANDLE_SUSPEND_EXCEPTION(); +/////////////////////////////////////////////////////////// +/// FIBER ENTRY POINT +/////////////////////////////////////////////////////////// - execute_microtasks(); +/** + * The main entry point for the Fiber. + * + * Fibers are containers for coroutine execution. A single fiber can run multiple coroutines. + * There are three types of fibers: + * Main, Scheduler, and Regular. + * The Main fiber is the primary execution thread that exists at startup. + * The Scheduler fiber is responsible solely for managing the event loop. + * The Regular fiber runs both scheduler tasks and coroutines. + * + * @param transfer Control transfer context + */ +ZEND_STACK_ALIGNED void fiber_entry(zend_fiber_transfer *transfer) +{ + ZEND_ASSERT(!transfer->flags && "No flags should be set on initial transfer"); - ZEND_ASYNC_SCHEDULER_CONTEXT = false; + transfer->context = NULL; - TRY_HANDLE_SUSPEND_EXCEPTION(); + /* Determine the current error_reporting ini setting. */ + zend_long error_reporting = INI_INT("error_reporting"); + if (!error_reporting && !INI_STR("error_reporting")) { + error_reporting = E_ALL; + } - const bool is_next_coroutine = circular_buffer_is_not_empty(&ASYNC_G(coroutine_queue)); + EG(vm_stack) = NULL; - if (UNEXPECTED(false == has_handles && false == is_next_coroutine && - zend_hash_num_elements(&ASYNC_G(coroutines)) > 0 && circular_buffer_is_empty(&ASYNC_G(microtasks)) && - resolve_deadlocks())) { - switch_to_scheduler(transfer); - } + async_coroutine_t *coroutine = (async_coroutine_t *) ZEND_ASYNC_CURRENT_COROUTINE; + ZEND_ASSERT(coroutine != NULL && "The current coroutine must be initialized"); - if (EXPECTED(is_next_coroutine)) { - // - // The execute_next_coroutine() may fail to transfer control to another coroutine for various reasons. - // In that case, it returns false, and we are then required to yield control to the scheduler. - // - if (COROUTINE_SWITCHED != execute_next_coroutine(transfer) && EG(exception) == NULL) { - switch_to_scheduler(transfer); - } - } else { - switch_to_scheduler(transfer); - } + async_fiber_context_t *fiber_context = coroutine->fiber_context; + ZEND_ASSERT(fiber_context != NULL && "The fiber context must be initialized"); + zend_fiber_context *internal_fiber_context = &fiber_context->context; - if (UNEXPECTED(coroutine->switch_handlers && transfer == NULL)) { - ZEND_COROUTINE_ENTER(coroutine); - } + const bool is_scheduler = &coroutine->coroutine == ZEND_ASYNC_SCHEDULER; - zend_exception_restore(); -} + // Allocate VM stack on C stack instead of heap + char vm_stack_memory[ZEND_FIBER_VM_STACK_SIZE]; -void async_scheduler_main_loop(void) -{ - zend_try + zend_first_try { + zend_vm_stack stack = (zend_vm_stack)vm_stack_memory; + + // Initialize VM stack structure manually + // see zend_vm_stack_init() + stack->top = ZEND_VM_STACK_ELEMENTS(stack); + stack->end = (zval*)((char*)vm_stack_memory + ZEND_FIBER_VM_STACK_SIZE); + stack->prev = NULL; + + // we allocate space for the first call frame, thereby normalizing the stack + EG(vm_stack) = stack; + EG(vm_stack_top) = stack->top + ZEND_CALL_FRAME_SLOT; + EG(vm_stack_end) = stack->end; + EG(vm_stack_page_size) = ZEND_FIBER_VM_STACK_SIZE; + + zend_execute_data *execute_data = (zend_execute_data *) stack->top; + memset(execute_data, 0, sizeof(zend_execute_data)); + + execute_data->func = &root_function; + // We store a reference to the first call frame for subsequent VM state switching. + fiber_context->execute_data = execute_data; + + EG(current_execute_data) = execute_data; + EG(jit_trace_num) = 0; + EG(error_reporting) = (int) error_reporting; + +#ifdef ZEND_CHECK_STACK_LIMIT + EG(stack_base) = zend_fiber_stack_base(internal_fiber_context->stack); + EG(stack_limit) = zend_fiber_stack_limit(internal_fiber_context->stack); +#endif + + if (EXPECTED(false == is_scheduler)) { + FIBER_DEBUG("Execute primary coroutine %p in Fiber %p\n", coroutine, EG(current_fiber_context)); + async_coroutine_execute(coroutine); + } + bool has_handles = true; bool has_next_coroutine = true; bool was_executed = false; + switch_status status = COROUTINE_NOT_EXISTS; do { + TRY_HANDLE_EXCEPTION(); + ZEND_ASYNC_SCHEDULER_HEARTBEAT; ZEND_ASYNC_SCHEDULER_CONTEXT = true; @@ -950,16 +1280,39 @@ void async_scheduler_main_loop(void) has_handles = ZEND_ASYNC_REACTOR_EXECUTE(has_next_coroutine); TRY_HANDLE_EXCEPTION(); - execute_microtasks(); - TRY_HANDLE_EXCEPTION(); - ZEND_ASYNC_SCHEDULER_CONTEXT = false; if (EXPECTED(has_next_coroutine)) { - const switch_status status = execute_next_coroutine(NULL); - was_executed = status == COROUTINE_SWITCHED || status == COROUTINE_IGNORED; - } else { + status = execute_next_coroutine_from_fiber(is_scheduler ? NULL : transfer, fiber_context); + was_executed = status != COROUTINE_NOT_EXISTS; + + if (UNEXPECTED(status == SHOULD_BE_EXIT)) { + break; + } + + } else if (is_scheduler) { + // The scheduler continues running even if there are no coroutines in the queue to execute. was_executed = false; + } else { + // There are no more coroutines in the execution queue; + // perhaps we should terminate this Fiber. + + // If the Fiber context pool is not empty, we can return the Fiber context to the pool. + // and then switch to the scheduler. + if (return_fiber_to_pool(fiber_context)) { + switch_to_scheduler(NULL); + async_coroutine_t *next_coroutine = (async_coroutine_t *) ZEND_ASYNC_CURRENT_COROUTINE; + + if (UNEXPECTED(next_coroutine == NULL)) { + break; + } + + next_coroutine->fiber_context = fiber_context; + FIBER_DEBUG("Execute coroutine %p in Fiber %p\n", next_coroutine, EG(current_fiber_context)); + async_coroutine_execute(next_coroutine); + } else { + break; + } } TRY_HANDLE_EXCEPTION(); @@ -969,21 +1322,39 @@ void async_scheduler_main_loop(void) circular_buffer_is_empty(&ASYNC_G(coroutine_queue)) && circular_buffer_is_empty(&ASYNC_G(microtasks)) && resolve_deadlocks())) { break; - } + } } while (zend_hash_num_elements(&ASYNC_G(coroutines)) > 0 || circular_buffer_is_not_empty(&ASYNC_G(microtasks)) || ZEND_ASYNC_REACTOR_LOOP_ALIVE()); + } zend_catch { - dispose_coroutines(); - async_scheduler_dtor(); - zend_bailout(); + fiber_context->flags |= ZEND_FIBER_FLAG_BAILOUT; + transfer->flags = ZEND_FIBER_TRANSFER_FLAG_BAILOUT; } zend_end_try(); + // At this point, the fiber is finishing and should properly transfer control back. + + // If the fiber's return target is already defined, do nothing. + if (transfer->context != NULL) { + return; + } + + // If the fiber is not the scheduler, we must switch to the scheduler. + if (false == is_scheduler) { + // If the fiber is not the scheduler, we must switch to the scheduler. + // The transfer value must be NULL, as we are not transferring any value. + switch_to_scheduler(transfer); + return; + } + + // It's the scheduler fiber, so we must finalize it. ZEND_ASSERT(ZEND_ASYNC_REACTOR_LOOP_ALIVE() == false && "The event loop must be stopped"); + fiber_pool_cleanup(); + zend_object *exit_exception = ZEND_ASYNC_EXIT_EXCEPTION; async_scheduler_dtor(); @@ -996,4 +1367,18 @@ void async_scheduler_main_loop(void) } // Here we are guaranteed to exit the coroutine without exceptions. -} \ No newline at end of file + return_to_main(transfer); + + // Free VM stack + zend_vm_stack stack = EG(vm_stack); + + // Destroy the VM stack associated with the fiber context. + // Except for the first segment, which is located directly in the fiber's stack. + while (stack != NULL && stack->prev != NULL) { + zend_vm_stack prev = stack->prev; + efree(stack); + stack = prev; + } + + EG(vm_stack) = NULL; +} diff --git a/scheduler.h b/scheduler.h index e634a5d..b97b31d 100644 --- a/scheduler.h +++ b/scheduler.h @@ -18,6 +18,9 @@ #include +/* Fiber context pool configuration */ +#define ASYNC_FIBER_POOL_SIZE 4 + BEGIN_EXTERN_C() void async_scheduler_startup(void); @@ -30,13 +33,14 @@ void async_scheduler_launch(void); * A function that is called when control needs to be transferred from a coroutine to the Scheduler. * In reality, no context switch occurs. * The Scheduler's logic runs directly within the coroutine that called suspend. - * - * @param transfer (optional) The transfer object that contains the context of the coroutine. */ -void async_scheduler_coroutine_suspend(zend_fiber_transfer *transfer); +void async_scheduler_coroutine_suspend(void); void async_scheduler_main_coroutine_suspend(void); void async_scheduler_coroutine_enqueue(zend_coroutine_t *coroutine); +/* Fiber context creation */ +async_fiber_context_t* async_fiber_context_create(void); + END_EXTERN_C() #endif // PHP_SCHEDULER_H diff --git a/scope.c b/scope.c index 388ce92..9245d99 100644 --- a/scope.c +++ b/scope.c @@ -314,7 +314,7 @@ METHOD(awaitCompletion) zend_async_resume_when( current_coroutine, &scope_object->scope->scope.event, false, zend_async_waker_callback_resolve, NULL); if (UNEXPECTED(EG(exception))) { - zend_async_waker_destroy(current_coroutine); + zend_async_waker_clean(current_coroutine); RETURN_THROWS(); } @@ -324,12 +324,12 @@ METHOD(awaitCompletion) zend_async_waker_callback_cancel, NULL); if (UNEXPECTED(EG(exception))) { - zend_async_waker_destroy(current_coroutine); + zend_async_waker_clean(current_coroutine); RETURN_THROWS(); } ZEND_ASYNC_SUSPEND(); - zend_async_waker_destroy(current_coroutine); + zend_async_waker_clean(current_coroutine); } METHOD(awaitAfterCancellation) @@ -382,7 +382,7 @@ METHOD(awaitAfterCancellation) scope_coroutine_callback_t *scope_callback = (scope_coroutine_callback_t *) zend_async_coroutine_callback_new( current_coroutine, callback_resolve_when_zombie_completed, sizeof(scope_coroutine_callback_t)); if (UNEXPECTED(scope_callback == NULL)) { - zend_async_waker_destroy(current_coroutine); + zend_async_waker_clean(current_coroutine); RETURN_THROWS(); } @@ -396,7 +396,7 @@ METHOD(awaitAfterCancellation) zend_async_resume_when(current_coroutine, &scope_object->scope->scope.event, true, NULL, &scope_callback->callback); if (UNEXPECTED(EG(exception))) { - zend_async_waker_destroy(current_coroutine); + zend_async_waker_clean(current_coroutine); RETURN_THROWS(); } @@ -407,13 +407,13 @@ METHOD(awaitAfterCancellation) zend_async_waker_callback_cancel, NULL); if (UNEXPECTED(EG(exception))) { - zend_async_waker_destroy(current_coroutine); + zend_async_waker_clean(current_coroutine); RETURN_THROWS(); } } ZEND_ASYNC_SUSPEND(); - zend_async_waker_destroy(current_coroutine); + zend_async_waker_clean(current_coroutine); } METHOD(isFinished) diff --git a/tests/bailout/003-stack-overflow-simple.phpt b/tests/bailout/003-stack-overflow-simple.phpt index 0b04fab..0f22636 100644 --- a/tests/bailout/003-stack-overflow-simple.phpt +++ b/tests/bailout/003-stack-overflow-simple.phpt @@ -7,6 +7,8 @@ if ($zend_mm_enabled === "0") { die("skip Zend MM disabled"); } ?> +--INI-- +opcache.jit_hot_func=0 --FILE--