diff --git a/.github/workflows/build-alpine.yml b/.github/workflows/build-alpine.yml index 5483286..d03cbbf 100644 --- a/.github/workflows/build-alpine.yml +++ b/.github/workflows/build-alpine.yml @@ -224,7 +224,6 @@ jobs: run: | /usr/local/bin/php ../../run-tests.php \ ${{ matrix.asan && '--asan' || '' }} \ - -d zend_extension=opcache.so \ -d opcache.enable_cli=1 \ -d opcache.jit_buffer_size=64M \ -d opcache.jit=tracing \ @@ -241,7 +240,6 @@ jobs: run: | /usr/local/bin/php ../../run-tests.php \ ${{ matrix.asan && '--asan' || '' }} \ - -d zend_extension=opcache.so \ -d opcache.enable_cli=1 \ -P -q -x -j$(nproc) \ -g FAIL,BORK,LEAK,XLEAK \ diff --git a/.github/workflows/build-freebsd.yml b/.github/workflows/build-freebsd.yml index 47f1042..66a97ac 100644 --- a/.github/workflows/build-freebsd.yml +++ b/.github/workflows/build-freebsd.yml @@ -167,7 +167,6 @@ jobs: --show-diff \ --show-slow 4000 \ --set-timeout 120 \ - -d zend_extension=opcache.so \ -d opcache.enable_cli=1 \ ext/async/tests @@ -180,7 +179,6 @@ jobs: --show-diff \ --show-slow 4000 \ --set-timeout 120 \ - -d zend_extension=opcache.so \ -d opcache.enable_cli=1 \ -d opcache.jit_buffer_size=64M \ -d opcache.jit=tracing \ @@ -195,7 +193,6 @@ jobs: --show-diff \ --show-slow 4000 \ --set-timeout 120 \ - -d zend_extension=opcache.so \ -d opcache.enable_cli=1 \ -d opcache.jit_buffer_size=64M \ -d opcache.jit=function \ diff --git a/.github/workflows/build-linux.yml b/.github/workflows/build-linux.yml index e078670..c68f8ab 100644 --- a/.github/workflows/build-linux.yml +++ b/.github/workflows/build-linux.yml @@ -98,7 +98,7 @@ jobs: libxslt1-dev \ libicu-dev - # Build LibUV from source (need >= 1.44.0) + # Build LibUV from source (need >= 1.45.0) sudo apt-get install -y cmake ninja-build # Download and build LibUV 1.48.0 @@ -270,7 +270,6 @@ jobs: /usr/local/bin/php ../../run-tests.php \ $TEST_PARAMS \ - -d zend_extension=opcache.so \ -d opcache.enable_cli=1 \ -P -q -j$PARALLEL_JOBS \ -g FAIL,BORK,LEAK,XLEAK \ @@ -287,7 +286,6 @@ jobs: working-directory: php-src/ext/async run: | /usr/local/bin/php ../../run-tests.php \ - -d zend_extension=opcache.so \ -d opcache.enable_cli=1 \ -d opcache.jit_buffer_size=64M \ -d opcache.jit=tracing \ @@ -306,7 +304,6 @@ jobs: working-directory: php-src/ext/async run: | /usr/local/bin/php ../../run-tests.php \ - -d zend_extension=opcache.so \ -d opcache.enable_cli=1 \ -d opcache.jit_buffer_size=64M \ -d opcache.jit=function \ diff --git a/.github/workflows/build-macos.yml b/.github/workflows/build-macos.yml index 6f4ce89..7c2b2f1 100644 --- a/.github/workflows/build-macos.yml +++ b/.github/workflows/build-macos.yml @@ -159,7 +159,6 @@ jobs: working-directory: php-src/ext/async run: | /usr/local/bin/php ../../run-tests.php \ - -d zend_extension=opcache.so \ -d opcache.enable_cli=1 \ -d opcache.jit_buffer_size=64M \ -d opcache.jit=tracing \ @@ -175,7 +174,6 @@ jobs: working-directory: php-src/ext/async run: | /usr/local/bin/php ../../run-tests.php \ - -d zend_extension=opcache.so \ -d opcache.enable_cli=1 \ -P -q -x -j$(sysctl -n hw.logicalcpu) \ -g FAIL,BORK,LEAK,XLEAK \ @@ -191,7 +189,6 @@ jobs: working-directory: php-src/ext/async run: | /usr/local/bin/php ../../run-tests.php \ - -d zend_extension=opcache.so \ -d opcache.enable_cli=1 \ -d opcache.jit_buffer_size=64M \ -d opcache.jit=function \ diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index c5c2113..18e85c7 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -70,8 +70,8 @@ jobs: firebird-dev \ valgrind cmake - # Cache LibUV 1.44.0 installation to avoid rebuilding - - name: Cache LibUV 1.44.0 + # Cache LibUV 1.45.0 installation to avoid rebuilding + - name: Cache LibUV 1.45.0 id: cache-libuv uses: actions/cache@v4 with: @@ -79,31 +79,31 @@ jobs: /usr/local/lib/libuv* /usr/local/include/uv* /usr/local/lib/pkgconfig/libuv.pc - key: ${{ runner.os }}-libuv-1.44.0-release + key: ${{ runner.os }}-libuv-1.45.0-release - - name: Install LibUV >= 1.44.0 + - name: Install LibUV >= 1.45.0 run: | - # Check if we have cached LibUV 1.44.0 + # Check if we have cached LibUV 1.45.0 if [ "${{ steps.cache-libuv.outputs.cache-hit }}" == "true" ]; then - echo "Using cached LibUV 1.44.0 installation" + echo "Using cached LibUV 1.45.0 installation" sudo ldconfig echo "LibUV version: $(pkg-config --modversion libuv)" # Check if system libuv meets requirements - elif pkg-config --exists libuv && pkg-config --atleast-version=1.44.0 libuv; then + elif pkg-config --exists libuv && pkg-config --atleast-version=1.45.0 libuv; then echo "System libuv version: $(pkg-config --modversion libuv)" sudo apt-get install -y libuv1-dev else - echo "Installing LibUV 1.44.0 from source" - wget https://github.com/libuv/libuv/archive/v1.44.0.tar.gz - tar -xzf v1.44.0.tar.gz - cd libuv-1.44.0 + echo "Installing LibUV 1.45.0 from source" + wget https://github.com/libuv/libuv/archive/v1.45.0.tar.gz + tar -xzf v1.45.0.tar.gz + cd libuv-1.45.0 mkdir build && cd build cmake .. -DCMAKE_BUILD_TYPE=Release make -j$(nproc) sudo make install sudo ldconfig cd ../.. - echo "LibUV 1.44.0 compiled and installed" + echo "LibUV 1.45.0 compiled and installed" fi - name: Configure PHP @@ -113,7 +113,6 @@ jobs: ./configure \ --enable-zts \ --enable-fpm \ - --enable-opcache \ --with-pdo-mysql=mysqlnd \ --with-mysqli=mysqlnd \ --with-pgsql \ @@ -178,7 +177,6 @@ jobs: run: | /usr/local/bin/php -v /usr/local/bin/php ../../run-tests.php \ - -d zend_extension=opcache.so \ -d opcache.enable_cli=1 \ -d opcache.jit_buffer_size=64M \ -d opcache.jit=tracing \ diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f96286..99a8981 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,36 @@ All notable changes to the Async extension for PHP will be documented in this fi The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased] +## [0.4.0] - 2025-09-31 + +### Added +- **UDP socket stream support for TrueAsync** +- **SSL support for socket stream** +- **Poll Proxy**: New `zend_async_poll_proxy_t` structure for optimized file descriptor management + - Efficient caching of event handlers to reduce EventLoop creation overhead + - Poll proxy event aggregation and improved lifecycle management + +### Fixed +- **Fixing `ref_count` logic for the `zend_async_event_callback_t` structure**: + - The add/dispose methods correctly increment the counter + - Memory leaks fixed +- Fixed await iterator logic for `awaitXXX` functions + +### Changed +- **Memory Optimization**: Enhanced memory allocation for async structures + - Optimized waker trigger structures with improved memory layout + - Enhanced memory management for poll proxy events + - Better resource cleanup and lifecycle management +- **Event Loop Performance**: Major scheduler optimizations + - **Automatic Event Cleanup**: Added automatic waker event cleanup when coroutines resume (see `ZEND_ASYNC_WAKER_CLEAN_EVENTS`) + - Separate queue implementation for resumed coroutines to improve stability + - Reduced unnecessary LibUV calls in scheduler tick processing +- **Socket Performance**: + - Event handler caching for sockets to avoid constant EventLoop recreation + - Optimized `network_async_accept_incoming` to try `accept()` before waiting + - Enhanced stream_select functionality with event-driven architecture + - Improved blocking operation handling with boolean return values +- Upgrade `LibUV` to version `1.45` due to a timer bug that causes the application to hang ## [0.3.0] - 2025-07-16 diff --git a/README.md b/README.md index 8fce2b1..5a199d1 100644 --- a/README.md +++ b/README.md @@ -47,9 +47,9 @@ docker run --rm true-async-php php -m | grep true_async ### Requirements - **PHP 8.5.0+** -- **LibUV ≥ 1.44.0** (required) - Fixes critical `UV_RUN_ONCE` busy loop issue that could cause high CPU usage +- **LibUV ≥ 1.45.0** (required) - Fixes critical `UV_RUN_ONCE` busy loop issue that could cause high CPU usage -### Why LibUV 1.44.0+ is Required +### Why LibUV 1.45.0+ is Required Prior to libuv 1.44, there was a critical issue in `uv__io_poll()`/`uv__run_pending` logic that could cause the event loop to "stick" after the first callback when running in `UV_RUN_ONCE` mode, especially when new ready events appeared within callbacks. This resulted in: @@ -95,17 +95,17 @@ The fix in libuv 1.44 ensures that `UV_RUN_ONCE` properly returns after processi 4. **Install LibUV:**: -**IMPORTANT:** LibUV version 1.44.0 or later is required. +**IMPORTANT:** LibUV version 1.45.0 or later is required. For Debian/Ubuntu: ```bash -# Check if system libuv meets requirements (≥1.44.0) +# Check if system libuv meets requirements (≥1.45.0) pkg-config --modversion libuv # If version is too old, install from source: -wget https://github.com/libuv/libuv/archive/v1.44.0.tar.gz -tar -xzf v1.44.0.tar.gz -cd libuv-1.44.0 +wget https://github.com/libuv/libuv/archive/v1.45.0.tar.gz +tar -xzf v1.45.0.tar.gz +cd libuv-1.45.0 mkdir build && cd build cmake .. -DCMAKE_BUILD_TYPE=Release make -j$(nproc) diff --git a/async.c b/async.c index 01121b4..4947785 100644 --- a/async.c +++ b/async.c @@ -896,6 +896,9 @@ static PHP_GINIT_FUNCTION(async) /* Maximum number of coroutines in the concurrent iterator */ async_globals->default_concurrency = 32; + /* Initialize reactor execution optimization */ + async_globals->last_reactor_tick = 0; + #ifdef PHP_WIN32 async_globals->watcherThread = NULL; async_globals->ioCompletionPort = NULL; @@ -907,7 +910,8 @@ static PHP_GINIT_FUNCTION(async) } /* {{{ PHP_GSHUTDOWN_FUNCTION */ -static PHP_GSHUTDOWN_FUNCTION(async){ +static PHP_GSHUTDOWN_FUNCTION(async) +{ #ifdef PHP_WIN32 #endif } /* }}} */ @@ -959,6 +963,7 @@ PHP_RINIT_FUNCTION(async) /* {{{ */ ZEND_ASYNC_INITIALIZE; circular_buffer_ctor(&ASYNC_G(microtasks), 64, sizeof(zend_async_microtask_t *), &zend_std_allocator); circular_buffer_ctor(&ASYNC_G(coroutine_queue), 128, sizeof(zend_coroutine_t *), &zend_std_allocator); + circular_buffer_ctor(&ASYNC_G(resumed_coroutines), 64, sizeof(zend_coroutine_t *), &zend_std_allocator); zend_hash_init(&ASYNC_G(coroutines), 128, NULL, NULL, 0); ASYNC_G(reactor_started) = false; diff --git a/async_API.c b/async_API.c index b32786a..6dbf418 100644 --- a/async_API.c +++ b/async_API.c @@ -208,6 +208,7 @@ static void engine_shutdown(void) circular_buffer_dtor(&ASYNC_G(microtasks)); circular_buffer_dtor(&ASYNC_G(coroutine_queue)); + circular_buffer_dtor(&ASYNC_G(resumed_coroutines)); zend_hash_destroy(&ASYNC_G(coroutines)); if (ASYNC_G(root_context) != NULL) { @@ -609,7 +610,8 @@ static void await_iterator_dispose(async_await_iterator_t *iterator, async_itera iterator->zend_iterator = NULL; // When the iterator has finished, it’s now possible to specify the exact number of elements since it’s known. - iterator->await_context->total = iterator->await_context->futures_count; + iterator->await_context->total = + iterator->await_context->futures_count + iterator->await_context->resolved_count; // Scenario: the iterator has already finished, and there’s nothing left to await. // In that case, the coroutine needs to be terminated. diff --git a/benchmarks/http_server_keepalive.php b/benchmarks/http_server_keepalive.php index becac33..8c3b173 100644 --- a/benchmarks/http_server_keepalive.php +++ b/benchmarks/http_server_keepalive.php @@ -2,10 +2,10 @@ /** * HTTP Server with Keep-Alive Support * High-performance HTTP server implementation with connection pooling - * + * * Usage: * php http_server_keepalive.php [host] [port] - * + * * Test with wrk: * wrk -t12 -c400 -d30s --http1.1 http://127.0.0.1:8080/ */ @@ -14,21 +14,25 @@ ini_set('memory_limit', '512M'); use function Async\spawn; -use function Async\awaitAll; +use function Async\await; +use function Async\delay; // Configuration $host = $argv[1] ?? '127.0.0.1'; $port = (int)($argv[2] ?? 8080); $keepaliveTimeout = 30; // seconds +$socketCoroutines = 0; +$socketCoroutinesRun = 0; +$socketCoroutinesFinished = 0; +$requestCount = 0; +$requestHandled = 0; + echo "=== Async HTTP Server with Keep-Alive ===\n"; echo "Starting server on http://$host:$port\n"; echo "Keep-Alive timeout: {$keepaliveTimeout}s\n"; echo "Press Ctrl+C to stop\n\n"; -// Global connection pool -$activeConnections = []; -$connectionId = 0; // Cached JSON responses for performance $cachedResponses = [ @@ -41,19 +45,20 @@ /** * Fast HTTP request parsing for benchmarks - only extract URI */ -function parseHttpRequest($request) { +function parseHttpRequest($request) +{ // Fast path: find first space and second space to extract URI $firstSpace = strpos($request, ' '); if ($firstSpace === false) return '/'; - + $secondSpace = strpos($request, ' ', $firstSpace + 1); if ($secondSpace === false) return '/'; - + $uri = substr($request, $firstSpace + 1, $secondSpace - $firstSpace - 1); - + // Check for Connection: close header (simple search) $connectionClose = stripos($request, 'connection: close') !== false; - + return [ 'uri' => $uri, 'connection_close' => $connectionClose @@ -61,108 +66,120 @@ function parseHttpRequest($request) { } /** - * Fast request processing with cached responses + * Process HTTP request and send response */ -function processHttpRequest($uri) { +function processHttpRequest($client, $rawRequest) +{ global $cachedResponses; - + + $parsedRequest = parseHttpRequest($rawRequest); + $uri = $parsedRequest['uri']; + $shouldKeepAlive = !$parsedRequest['connection_close']; + // Use cached responses for static content if (isset($cachedResponses[$uri])) { - return [$cachedResponses[$uri], 200]; - } - - // Dynamic endpoints - if ($uri === '/benchmark') { - $responseBody = json_encode(['id' => uniqid(), 'time' => microtime(true)], JSON_UNESCAPED_SLASHES); - return [$responseBody, 200]; + $responseBody = $cachedResponses[$uri]; + $statusCode = 200; + } else { + // 404 response + $responseBody = json_encode(['error' => 'Not Found', 'uri' => $uri], JSON_UNESCAPED_SLASHES); + $statusCode = 404; } - - // 404 response - $responseBody = json_encode(['error' => 'Not Found', 'uri' => $uri], JSON_UNESCAPED_SLASHES); - return [$responseBody, 404]; -} -/** - * Fast HTTP response building - */ -function buildHttpResponse($responseBody, $statusCode, $keepAlive = true) { - $statusText = $statusCode === 200 ? 'OK' : 'Not Found'; + // Build and send response directly $contentLength = strlen($responseBody); - - // Build response using array for better performance - $headers = [ - "HTTP/1.1 $statusCode $statusText", - "Content-Type: application/json", - "Content-Length: $contentLength", - "Server: AsyncKeepAlive/1.0" - ]; - - if ($keepAlive) { - $headers[] = "Connection: keep-alive"; - $headers[] = "Keep-Alive: timeout=30, max=1000"; + $statusText = $statusCode === 200 ? 'OK' : 'Not Found'; + + if ($shouldKeepAlive) { + $response = 'HTTP/1.1 ' . $statusCode . ' ' . $statusText . "\r\n" . + 'Content-Type: application/json' . "\r\n" . + 'Content-Length: ' . $contentLength . "\r\n" . + 'Server: AsyncKeepAlive/1.0' . "\r\n" . + 'Connection: keep-alive' . "\r\n" . + 'Keep-Alive: timeout=30, max=1000' . "\r\n\r\n" . $responseBody; } else { - $headers[] = "Connection: close"; + $response = 'HTTP/1.1 ' . $statusCode . ' ' . $statusText . "\r\n" . + 'Content-Type: application/json' . "\r\n" . + 'Content-Length: ' . $contentLength . "\r\n" . + 'Server: AsyncKeepAlive/1.0' . "\r\n" . + 'Connection: close' . "\r\n\r\n" . $responseBody; + } + + $written = fwrite($client, $response); + + if ($written === false) { + return false; // Write failed } - - return implode("\r\n", $headers) . "\r\n\r\n" . $responseBody; + + return $shouldKeepAlive; } /** - * Handle persistent connection with Keep-Alive support + * Handle socket connection with keep-alive support + * Each socket gets its own coroutine that lives for the entire connection */ -function handleConnection($client, $connId) { - global $activeConnections; - - $activeConnections[$connId] = $client; - - $requestCount = 0; - $startTime = time(); - +function handleSocket($client) +{ + global $socketCoroutinesRun, $socketCoroutinesFinished; + global $requestCount, $requestHandled; + + $socketCoroutinesRun++; + try { while (true) { - // Set read timeout for Keep-Alive - stream_set_timeout($client, 30); - - // Read HTTP request - simple 8KB block read for performance - $request = fread($client, 8192); - if ($request === false || empty(trim($request))) { - // Connection closed by client or timeout - break; - } - - $requestCount++; - $parsedRequest = parseHttpRequest($request); - - // Check if client wants to close connection - $shouldKeepAlive = !$parsedRequest['connection_close']; - - // Process request (now returns pre-encoded JSON body) - [$responseBody, $statusCode] = processHttpRequest($parsedRequest['uri']); - - // Build and send response - $response = buildHttpResponse($responseBody, $statusCode, $shouldKeepAlive); - - $bytesSent = fwrite($client, $response); - if ($bytesSent === false) { - throw new Exception("Failed to send response"); + $request = ''; + $totalBytes = 0; + + // Read HTTP request with byte counting + while (true) { + $chunk = fread($client, 1024); + + if ($chunk === false || $chunk === '') { + // Connection closed by client or read error + return; + } + + $request .= $chunk; + $totalBytes += strlen($chunk); + + // Check for request size limit + if ($totalBytes > 8192) { + // Request too large, close connection immediately + fclose($client); + $requestCount++; + $requestHandled++; + return; + } + + // Check if we have complete HTTP request (ends with \r\n\r\n) + if (strpos($request, "\r\n\r\n") !== false) { + break; + } } - - // Close connection if requested by client - if (!$shouldKeepAlive) { - break; + + if (empty(trim($request))) { + // Empty request, skip to next iteration + continue; } - - // Limit max requests per connection to prevent resource exhaustion - if ($requestCount >= 1000) { - break; + + $requestCount++; + + // Process request and send response + $shouldKeepAlive = processHttpRequest($client, $request); + + $requestHandled++; + + if ($shouldKeepAlive === false) { + // Write failed or connection should be closed + return; } + + // Continue to next request in keep-alive connection } - - } catch (Exception $e) { - echo "Connection error: " . $e->getMessage() . "\n"; + } finally { - // Clean up connection - unset($activeConnections[$connId]); + $socketCoroutinesFinished++; + // Always clean up the socket if (is_resource($client)) { fclose($client); } @@ -171,48 +188,63 @@ function handleConnection($client, $connId) { /** * HTTP Server with Keep-Alive support + * Simple coroutine-based implementation without stream_select */ function startHttpServer($host, $port) { - global $connectionId; - return spawn(function() use ($host, $port) { - global $connectionId; - + + global $socketCoroutines; + // Create server socket $server = stream_socket_server("tcp://$host:$port", $errno, $errstr, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN); if (!$server) { throw new Exception("Could not create server: $errstr ($errno)"); } - + + stream_context_set_option($server, 'socket', 'tcp_nodelay', true); + echo "Server listening on $host:$port\n"; echo "Try: curl http://$host:$port/\n"; - echo "Benchmark: wrk -t12 -c400 -d30s --http1.1 http://$host:$port/benchmark\n\n"; - + echo "Benchmark: wrk -t12 -c400 -d30s http://$host:$port/\n\n"; + + // Simple accept loop - much cleaner! while (true) { - // Accept new connections (this is async in async extension) + // Accept new connections $client = stream_socket_accept($server, 0); - if ($client) { - $connectionId++; - - // Handle connection in separate coroutine with Keep-Alive - spawn(handleConnection(...), $client, $connectionId); + $socketCoroutines++; + // Spawn a coroutine to handle this client's entire lifecycle + spawn(handleSocket(...), $client); } } - + fclose($server); }); } +spawn(function() { + + global $socketCoroutines, $socketCoroutinesRun, $socketCoroutinesFinished, $requestCount, $requestHandled; + + while(true) { + delay(2000); + echo "Sockets: $socketCoroutines\n"; + echo "Coroutines: $socketCoroutinesRun\n"; + echo "Finished: $socketCoroutinesFinished\n"; + echo "Request: $requestCount\n"; + echo "Handled: $requestHandled\n\n"; + } +}); // Start server try { $serverTask = startHttpServer($host, $port); - - // Run until interrupted - awaitAll([$serverTask]); - + await($serverTask); + } catch (Exception $e) { echo "Server error: " . $e->getMessage() . "\n"; - exit(1); -} \ No newline at end of file +} finally { + echo "Sockets: $socketCoroutines\n"; + echo "Coroutines: $socketCoroutinesRun\n"; + echo "Finished: $socketCoroutinesFinished\n"; +} diff --git a/config.m4 b/config.m4 index 95c0c0c..8def675 100644 --- a/config.m4 +++ b/config.m4 @@ -26,14 +26,14 @@ if test "$PHP_ASYNC" = "yes"; then AC_MSG_CHECKING(for libuv) if test -x "$PKG_CONFIG" && $PKG_CONFIG --exists libuv; then - dnl Require libuv >= 1.44.0 for UV_RUN_ONCE busy loop fix - if $PKG_CONFIG libuv --atleast-version 1.44.0; then + dnl Require libuv >= 1.45.0 for UV_RUN_ONCE busy loop fix + if $PKG_CONFIG libuv --atleast-version 1.45.0; then LIBUV_INCLINE=`$PKG_CONFIG libuv --cflags` LIBUV_LIBLINE=`$PKG_CONFIG libuv --libs` LIBUV_VERSION=`$PKG_CONFIG libuv --modversion` AC_MSG_RESULT(from pkgconfig: found version $LIBUV_VERSION) else - AC_MSG_ERROR(system libuv must be upgraded to version >= 1.44.0 (fixes UV_RUN_ONCE busy loop issue)) + AC_MSG_ERROR(system libuv must be upgraded to version >= 1.45.0 (fixes UV_RUN_ONCE busy loop issue)) fi PHP_EVAL_LIBLINE($LIBUV_LIBLINE, UV_SHARED_LIBADD) PHP_EVAL_INCLINE($LIBUV_INCLINE) diff --git a/config.w32 b/config.w32 index a076316..a2862da 100644 --- a/config.w32 +++ b/config.w32 @@ -23,7 +23,7 @@ if (PHP_ASYNC == "yes") { if (CHECK_HEADER_ADD_INCLUDE("libuv/uv.h", "CFLAGS_UV", PHP_PHP_BUILD + "\\include") && CHECK_LIB("libuv.lib", "libuv")) { - // Note: libuv >= 1.44.0 is required for UV_RUN_ONCE busy loop fix + // Note: libuv >= 1.45.0 is required for UV_RUN_ONCE busy loop fix // For Windows builds, manually verify libuv version meets requirements ADD_FLAG("LIBS", "libuv.lib Dbghelp.lib Userenv.lib"); @@ -32,6 +32,6 @@ if (PHP_ASYNC == "yes") { "'.\nTo compile PHP TRUE ASYNC with LibUV:\n" + "1. Copy files from 'libuv\\include' to '" + PHP_PHP_BUILD + "\\include\\libuv\\'\n" + "2. Build libuv.lib and copy it to '" + PHP_PHP_BUILD + "\\lib\\'\n" + - "3. IMPORTANT: Use libuv >= 1.44.0 (fixes UV_RUN_ONCE busy loop issue)"); + "3. IMPORTANT: Use libuv >= 1.45.0 (fixes UV_RUN_ONCE busy loop issue)"); } } diff --git a/coroutine.c b/coroutine.c index c3c38a8..50ae036 100644 --- a/coroutine.c +++ b/coroutine.c @@ -49,7 +49,10 @@ static void coroutine_event_start(zend_async_event_t *event); static void coroutine_event_stop(zend_async_event_t *event); static void coroutine_add_callback(zend_async_event_t *event, zend_async_event_callback_t *callback); static void coroutine_del_callback(zend_async_event_t *event, zend_async_event_callback_t *callback); -static bool coroutine_replay(zend_async_event_t *event, zend_async_event_callback_t *callback, zval *result, zend_object **exception); +static bool coroutine_replay(zend_async_event_t *event, + zend_async_event_callback_t *callback, + zval *result, + zend_object **exception); static zend_string *coroutine_info(zend_async_event_t *event); static void coroutine_dispose(zend_async_event_t *event); @@ -286,7 +289,8 @@ static HashTable *async_coroutine_object_gc(zend_object *object, zval **table, i async_fiber_context_t *fiber_context = coroutine->fiber_context; /* Check if we should traverse execution stack (similar to fibers) */ - if (fiber_context == NULL || (fiber_context->context.status != ZEND_FIBER_STATUS_SUSPENDED || !fiber_context->execute_data)) { + if (fiber_context == NULL || + (fiber_context->context.status != ZEND_FIBER_STATUS_SUSPENDED || !fiber_context->execute_data)) { zend_get_gc_buffer_use(buf, table, num); return NULL; } @@ -394,7 +398,7 @@ ZEND_STACK_ALIGNED void async_coroutine_execute(async_coroutine_t *coroutine) coroutine->coroutine.event.dispose(&coroutine->coroutine.event); - if(EXPECTED(ZEND_ASYNC_CURRENT_COROUTINE == &coroutine->coroutine)) { + if (EXPECTED(ZEND_ASYNC_CURRENT_COROUTINE == &coroutine->coroutine)) { ZEND_ASYNC_CURRENT_COROUTINE = NULL; } @@ -405,7 +409,7 @@ ZEND_STACK_ALIGNED void async_coroutine_execute(async_coroutine_t *coroutine) zend_error(E_ERROR, "Attempt to resume a coroutine that has not been resolved"); coroutine->coroutine.event.dispose(&coroutine->coroutine.event); - if(EXPECTED(ZEND_ASYNC_CURRENT_COROUTINE == &coroutine->coroutine)) { + if (EXPECTED(ZEND_ASYNC_CURRENT_COROUTINE == &coroutine->coroutine)) { ZEND_ASYNC_CURRENT_COROUTINE = NULL; } @@ -460,7 +464,7 @@ ZEND_STACK_ALIGNED void async_coroutine_execute(async_coroutine_t *coroutine) } zend_end_try(); - if(EXPECTED(ZEND_ASYNC_CURRENT_COROUTINE == &coroutine->coroutine)) { + if (EXPECTED(ZEND_ASYNC_CURRENT_COROUTINE == &coroutine->coroutine)) { ZEND_ASYNC_CURRENT_COROUTINE = NULL; } @@ -500,6 +504,8 @@ void async_coroutine_finalize(async_coroutine_t *coroutine) } bool do_bailout = false; + zend_object **exception_ptr = &EG(exception); + zend_object **prev_exception_ptr = &EG(prev_exception); zend_try { @@ -510,13 +516,13 @@ void async_coroutine_finalize(async_coroutine_t *coroutine) // call coroutines handlers zend_object *exception = NULL; - if (EG(exception)) { - if (EG(prev_exception)) { - zend_exception_set_previous(EG(exception), EG(prev_exception)); - EG(prev_exception) = NULL; + if (UNEXPECTED(*exception_ptr)) { + if (*prev_exception_ptr) { + zend_exception_set_previous(*exception_ptr, *prev_exception_ptr); + *prev_exception_ptr = NULL; } - exception = EG(exception); + exception = *exception_ptr; GC_ADDREF(exception); zend_clear_exception(); @@ -545,7 +551,7 @@ void async_coroutine_finalize(async_coroutine_t *coroutine) GC_ADDREF(exception); } - zend_exception_save(); + zend_exception_save_fast(exception_ptr, prev_exception_ptr); // Mark second parameter of zend_async_callbacks_notify as ZVAL ZEND_ASYNC_EVENT_SET_ZVAL_RESULT(&coroutine->coroutine.event); ZEND_COROUTINE_CLR_EXCEPTION_HANDLED(&coroutine->coroutine); @@ -569,7 +575,7 @@ void async_coroutine_finalize(async_coroutine_t *coroutine) dispose(&coroutine->coroutine); } - zend_exception_restore(); + zend_exception_restore_fast(exception_ptr, prev_exception_ptr); // If the exception was handled by any handler, we do not propagate it further. // Cancellation-type exceptions are considered handled in all cases and are not propagated further. @@ -611,7 +617,7 @@ void async_coroutine_finalize(async_coroutine_t *coroutine) } zend_end_try(); - if (UNEXPECTED(EG(exception) && (zend_is_graceful_exit(EG(exception)) || zend_is_unwind_exit(EG(exception))))) { + if (UNEXPECTED(*exception_ptr && (zend_is_graceful_exit(*exception_ptr) || zend_is_unwind_exit(*exception_ptr)))) { zend_clear_exception(); } @@ -692,12 +698,17 @@ void async_coroutine_resume(zend_coroutine_t *coroutine, zend_object *error, con return; } - if (UNEXPECTED(circular_buffer_push(&ASYNC_G(coroutine_queue), &coroutine, true)) == FAILURE) { + if (UNEXPECTED(circular_buffer_push_ptr_with_resize(&ASYNC_G(coroutine_queue), coroutine)) == FAILURE) { async_throw_error("Failed to enqueue coroutine"); return; } coroutine->waker->status = ZEND_ASYNC_WAKER_QUEUED; + + // Add to resumed_coroutines queue for event cleanup + if (ZEND_ASYNC_IS_SCHEDULER_CONTEXT) { + circular_buffer_push_ptr_with_resize(&ASYNC_G(resumed_coroutines), coroutine); + } } void async_coroutine_cancel(zend_coroutine_t *zend_coroutine, @@ -894,8 +905,7 @@ static zend_string *coroutine_info(zend_async_event_t *event) coroutine->std.handle, coroutine->coroutine.filename ? ZSTR_VAL(coroutine->coroutine.filename) : "", coroutine->coroutine.lineno, - coroutine->waker.filename ? ZSTR_VAL(coroutine->waker.filename) - : "", + coroutine->waker.filename ? ZSTR_VAL(coroutine->waker.filename) : "", coroutine->waker.lineno, ZSTR_VAL(zend_coroutine_name)); } else { @@ -983,11 +993,14 @@ static zend_result finally_handlers_iterator_handler(async_iterator_t *iterator, zval_ptr_dtor(&rv); ZVAL_UNDEF(&rv); + zend_object **exception_ptr = &EG(exception); + // Check for exceptions after handler execution - if (EG(exception)) { - zend_exception_save(); - zend_exception_restore(); - zend_object *current_exception = EG(exception); + if (UNEXPECTED(*exception_ptr)) { + zend_object **prev_exception_ptr = &EG(prev_exception); + zend_exception_save_fast(exception_ptr, prev_exception_ptr); + zend_exception_restore_fast(exception_ptr, prev_exception_ptr); + zend_object *current_exception = *exception_ptr; GC_ADDREF(current_exception); zend_clear_exception(); @@ -1338,8 +1351,7 @@ METHOD(getSuspendLocation) async_coroutine_t *coroutine = THIS_COROUTINE; if (coroutine->waker.filename) { - RETURN_STR(zend_strpprintf( - 0, "%s:%d", ZSTR_VAL(coroutine->waker.filename), coroutine->waker.lineno)); + RETURN_STR(zend_strpprintf(0, "%s:%d", ZSTR_VAL(coroutine->waker.filename), coroutine->waker.lineno)); } else { RETURN_STRING("unknown"); } diff --git a/coroutine.h b/coroutine.h index 9f2f659..b81711a 100644 --- a/coroutine.h +++ b/coroutine.h @@ -29,7 +29,7 @@ struct _async_fiber_context_s /* Active fiber VM stack */ zend_vm_stack vm_stack; - + /* Current Zend VM execute data */ zend_execute_data *execute_data; @@ -47,7 +47,7 @@ struct _async_coroutine_s /* Basic structure for coroutine. */ zend_coroutine_t coroutine; - + /* Embedded waker (always allocated, no malloc needed) */ zend_async_waker_t waker; diff --git a/exceptions.c b/exceptions.c index 9db0108..b326989 100644 --- a/exceptions.c +++ b/exceptions.c @@ -201,7 +201,8 @@ PHP_ASYNC_API ZEND_COLD zend_object *async_new_composite_exception(void) return Z_OBJ(composite); } -PHP_ASYNC_API void async_composite_exception_add_exception(zend_object *composite, zend_object *exception, bool transfer) +PHP_ASYNC_API void +async_composite_exception_add_exception(zend_object *composite, zend_object *exception, bool transfer) { if (composite == NULL || exception == NULL) { return; @@ -286,8 +287,10 @@ bool async_spawn_and_throw(zend_object *exception, zend_async_scope_t *scope, in */ zend_object *async_extract_exception(void) { - zend_exception_save(); - zend_exception_restore(); + zend_object **exception_ptr = &EG(exception); + zend_object **prev_exception_ptr = &EG(prev_exception); + zend_exception_save_fast(exception_ptr, prev_exception_ptr); + zend_exception_restore_fast(exception_ptr, prev_exception_ptr); zend_object *exception = EG(exception); GC_ADDREF(exception); zend_clear_exception(); @@ -306,10 +309,11 @@ zend_object *async_extract_exception(void) */ void async_apply_exception(zend_object **to_exception) { - if (UNEXPECTED(EG(exception) && - false == - (instanceof_function(EG(exception)->ce, ZEND_ASYNC_GET_CE(ZEND_ASYNC_EXCEPTION_CANCELLATION)) || - zend_is_graceful_exit(EG(exception)) || zend_is_unwind_exit(EG(exception))))) { + if (UNEXPECTED( + EG(exception) && + false == + (instanceof_function(EG(exception)->ce, ZEND_ASYNC_GET_CE(ZEND_ASYNC_EXCEPTION_CANCELLATION)) || + zend_is_graceful_exit(EG(exception)) || zend_is_unwind_exit(EG(exception))))) { zend_object *exception = async_extract_exception(); diff --git a/exceptions.h b/exceptions.h index db7ca7c..e59b263 100644 --- a/exceptions.h +++ b/exceptions.h @@ -39,7 +39,8 @@ PHP_ASYNC_API ZEND_COLD zend_object *async_throw_input_output(const char *format PHP_ASYNC_API ZEND_COLD zend_object *async_throw_timeout(const char *format, const zend_long timeout); PHP_ASYNC_API ZEND_COLD zend_object *async_throw_poll(const char *format, ...); PHP_ASYNC_API ZEND_COLD zend_object *async_new_composite_exception(void); -PHP_ASYNC_API void async_composite_exception_add_exception(zend_object *composite, zend_object *exception, bool transfer); +PHP_ASYNC_API void +async_composite_exception_add_exception(zend_object *composite, zend_object *exception, bool transfer); bool async_spawn_and_throw(zend_object *exception, zend_async_scope_t *scope, int32_t priority); void async_apply_exception_to_context(zend_object *exception); zend_object *async_extract_exception(void); diff --git a/internal/circular_buffer.h b/internal/circular_buffer.h index 9526af4..991e7bd 100644 --- a/internal/circular_buffer.h +++ b/internal/circular_buffer.h @@ -83,42 +83,51 @@ zend_result zval_circular_buffer_pop(circular_buffer_t *buffer, zval *value); /* Inline optimized functions - placed after all declarations */ /* Inline version for hot path performance */ -static zend_always_inline bool circular_buffer_is_not_empty(const circular_buffer_t *buffer) { - return buffer->head != buffer->tail; +static zend_always_inline bool circular_buffer_is_not_empty(const circular_buffer_t *buffer) +{ + return buffer->head != buffer->tail; +} + +static zend_always_inline void circular_buffer_clean(circular_buffer_t *buffer) +{ + buffer->head = buffer->tail; } /* Fast specialized version for pointer push (8 bytes) */ -static zend_always_inline zend_result circular_buffer_push_ptr(circular_buffer_t *buffer, void *ptr) { - // Check if buffer is full using bitwise AND (capacity is power of 2) - if (EXPECTED(((buffer->head + 1) & (buffer->capacity - 1)) != buffer->tail)) { - // Direct pointer assignment - no memcpy overhead - *(void**)((char*)buffer->data + buffer->head * sizeof(void*)) = ptr; - buffer->head = (buffer->head + 1) & (buffer->capacity - 1); - return SUCCESS; - } - return FAILURE; +static zend_always_inline zend_result circular_buffer_push_ptr(circular_buffer_t *buffer, void *ptr) +{ + // Check if buffer is full using bitwise AND (capacity is power of 2) + if (EXPECTED(((buffer->head + 1) & (buffer->capacity - 1)) != buffer->tail)) { + // Direct pointer assignment - no memcpy overhead + *(void **) ((char *) buffer->data + buffer->head * sizeof(void *)) = ptr; + buffer->head = (buffer->head + 1) & (buffer->capacity - 1); + return SUCCESS; + } + return FAILURE; } /* Fast specialized version for pointer pop (8 bytes) */ -static zend_always_inline zend_result circular_buffer_pop_ptr(circular_buffer_t *buffer, void **ptr) { - // Check if buffer is empty - if (EXPECTED(buffer->head != buffer->tail)) { - // Direct pointer read - no memcpy overhead - *ptr = *(void**)((char*)buffer->data + buffer->tail * sizeof(void*)); - buffer->tail = (buffer->tail + 1) & (buffer->capacity - 1); - return SUCCESS; - } - return FAILURE; +static zend_always_inline zend_result circular_buffer_pop_ptr(circular_buffer_t *buffer, void **ptr) +{ + // Check if buffer is empty + if (EXPECTED(buffer->head != buffer->tail)) { + // Direct pointer read - no memcpy overhead + *ptr = *(void **) ((char *) buffer->data + buffer->tail * sizeof(void *)); + buffer->tail = (buffer->tail + 1) & (buffer->capacity - 1); + return SUCCESS; + } + return FAILURE; } /* Smart wrapper for pointer push with resize fallback */ -static zend_always_inline zend_result circular_buffer_push_ptr_with_resize(circular_buffer_t *buffer, void *ptr) { - // Try fast path first (no resize) - if (EXPECTED(circular_buffer_push_ptr(buffer, ptr) == SUCCESS)) { - return SUCCESS; - } - // Fallback to slow path with resize - need address of ptr for memcpy - return circular_buffer_push(buffer, &ptr, true); +static zend_always_inline zend_result circular_buffer_push_ptr_with_resize(circular_buffer_t *buffer, void *ptr) +{ + // Try fast path first (no resize) + if (EXPECTED(circular_buffer_push_ptr(buffer, ptr) == SUCCESS)) { + return SUCCESS; + } + // Fallback to slow path with resize - need address of ptr for memcpy + return circular_buffer_push(buffer, &ptr, true); } -#endif // ASYNC_CIRCULAR_BUFFER_V2_H \ No newline at end of file +#endif // ASYNC_CIRCULAR_BUFFER_V2_H diff --git a/libuv_reactor.c b/libuv_reactor.c index e1643ec..1401ccc 100644 --- a/libuv_reactor.c +++ b/libuv_reactor.c @@ -48,6 +48,7 @@ static void libuv_cleanup_process_events(void); #define UVLOOP (&ASYNC_G(uvloop)) #define LIBUV_REACTOR ((zend_async_globals *) ASYNC_GLOBALS) #define LIBUV_REACTOR_VAR zend_async_globals *reactor = LIBUV_REACTOR; + #define LIBUV_REACTOR_VAR_FROM(var) zend_async_globals *reactor = (zend_async_globals *) var; #define WATCHER ASYNC_G(watcherThread) #define IF_EXCEPTION_STOP_REACTOR \ @@ -195,6 +196,35 @@ static void libuv_close_handle_cb(uv_handle_t *handle) /* }}} */ +/* {{{ libuv_close_poll_handle_cb */ +static void libuv_close_poll_handle_cb(uv_handle_t *handle) +{ + async_poll_event_t *poll = (async_poll_event_t *) handle->data; + + /* Check if PHP requested descriptor closure after event cleanup */ + if (ZEND_ASYNC_EVENT_SHOULD_CLOSE_FD(&poll->event.base)) { + if (poll->event.is_socket && ZEND_VALID_SOCKET(poll->event.socket)) { + /* Socket cleanup - just close, no blocking operations in LibUV callback */ +#ifdef PHP_WIN32 + closesocket(poll->event.socket); +#else + close(poll->event.socket); +#endif + } else if (!poll->event.is_socket && poll->event.file != ZEND_FD_NULL) { + /* File descriptor cleanup */ +#ifdef PHP_WIN32 + CloseHandle((HANDLE) poll->event.file); +#else + close(poll->event.file); +#endif + } + } + + pefree(poll, 0); +} + +/* }}} */ + /* {{{ libuv_add_callback */ static void libuv_add_callback(zend_async_event_t *event, zend_async_event_callback_t *callback) { @@ -215,19 +245,41 @@ static void libuv_remove_callback(zend_async_event_t *event, zend_async_event_ca /// Poll API ////////////////////////////////////////////////////////////////////////////// +/* Forward declaration */ +static zend_always_inline void +async_poll_notify_proxies(async_poll_event_t *poll, async_poll_event triggered_events, zend_object *exception); + /* {{{ on_poll_event */ static void on_poll_event(uv_poll_t *handle, int status, int events) { async_poll_event_t *poll = handle->data; zend_object *exception = NULL; - if (status < 0) { + if (status < 0 && status != UV_EBADF) { exception = async_new_exception(async_ce_input_output_exception, "Input output error: %s", uv_strerror(status)); } + // !WARNING! + // LibUV may return the UV_EBADF code when the remote host closes + // the connection while the descriptor is still present in the EventLoop. + // For POLL events, we handle this by ignoring the situation + // so that the coroutine receives the ASYNC_DISCONNECT flag. + // This code can be considered “incorrect”; however, this solution is acceptable. + // + if (UNEXPECTED(status == UV_EBADF)) { + events = ASYNC_DISCONNECT; + } + poll->event.triggered_events = events; - ZEND_ASYNC_CALLBACKS_NOTIFY(&poll->event.base, NULL, exception); + /* Check if there are active proxies */ + if (poll->proxies_count > 0) { + /* Notify all matching proxies */ + async_poll_notify_proxies(poll, events, exception); + } else { + /* Standard base event notification */ + ZEND_ASYNC_CALLBACKS_NOTIFY(&poll->event.base, NULL, exception); + } if (exception != NULL) { zend_object_release(exception); @@ -295,7 +347,181 @@ static void libuv_poll_dispose(zend_async_event_t *event) async_poll_event_t *poll = (async_poll_event_t *) (event); - uv_close((uv_handle_t *) &poll->uv_handle, libuv_close_handle_cb); + /* Free proxies array if exists */ + if (poll->proxies != NULL) { + pefree(poll->proxies, 0); + poll->proxies = NULL; + } + + /* Use poll-specific callback for poll events that may need descriptor cleanup */ + uv_close((uv_handle_t *) &poll->uv_handle, libuv_close_poll_handle_cb); +} + +/* }}} */ + +/* {{{ async_poll_notify_proxies */ +static zend_always_inline void +async_poll_notify_proxies(async_poll_event_t *poll, async_poll_event triggered_events, zend_object *exception) +{ + /* Process each proxy that matches triggered events */ + for (uint32_t i = 0; i < poll->proxies_count; i++) { + zend_async_poll_proxy_t *proxy = poll->proxies[i]; + + if ((triggered_events & proxy->events) != 0) { + /* Increase ref count to prevent disposal during processing */ + ZEND_ASYNC_EVENT_ADD_REF(&proxy->base); + + /* Calculate events relevant to this proxy */ + async_poll_event proxy_events = triggered_events & proxy->events; + + /* Set triggered events and notify callbacks */ + proxy->triggered_events = proxy_events; + ZEND_ASYNC_CALLBACKS_NOTIFY_FROM_HANDLER(&proxy->base, &proxy_events, exception); + + /* Release reference after processing */ + ZEND_ASYNC_EVENT_RELEASE(&proxy->base); + } + } +} + +/* }}} */ + +/* {{{ async_poll_add_proxy */ +static zend_always_inline void async_poll_add_proxy(async_poll_event_t *poll, zend_async_poll_proxy_t *proxy) +{ + if (poll->proxies == NULL) { + poll->proxies = (zend_async_poll_proxy_t **) pecalloc(4, sizeof(zend_async_poll_proxy_t *), 0); + poll->proxies_capacity = 2; + } + + if (poll->proxies_count == poll->proxies_capacity) { + poll->proxies_capacity *= 2; + poll->proxies = (zend_async_poll_proxy_t **) perealloc( + poll->proxies, poll->proxies_capacity * sizeof(zend_async_poll_proxy_t *), 0); + } + + poll->proxies[poll->proxies_count++] = proxy; +} + +/* }}} */ + +/* {{{ async_poll_remove_proxy */ +static zend_always_inline void async_poll_remove_proxy(async_poll_event_t *poll, zend_async_poll_proxy_t *proxy) +{ + for (uint32_t i = 0; i < poll->proxies_count; i++) { + if (poll->proxies[i] == proxy) { + /* Move last element to this position */ + poll->proxies[i] = poll->proxies[--poll->proxies_count]; + break; + } + } +} + +/* }}} */ + +/* {{{ async_poll_aggregate_events */ +static zend_always_inline async_poll_event async_poll_aggregate_events(async_poll_event_t *poll) +{ + async_poll_event aggregated = 0; + + for (uint32_t i = 0; i < poll->proxies_count; i++) { + aggregated |= poll->proxies[i]->events; + + /* Early exit if all possible events are set */ + if (aggregated == (ASYNC_READABLE | ASYNC_WRITABLE | ASYNC_DISCONNECT | ASYNC_PRIORITIZED)) { + break; + } + } + + return aggregated; +} + +/* }}} */ + +/* {{{ libuv_poll_proxy_start */ +static void libuv_poll_proxy_start(zend_async_event_t *event) +{ + EVENT_START_PROLOGUE(event); + + zend_async_poll_proxy_t *proxy = (zend_async_poll_proxy_t *) event; + async_poll_event_t *poll = (async_poll_event_t *) proxy->poll_event; + + /* Add proxy to the array */ + async_poll_add_proxy(poll, proxy); + + /* Check if all proxy events are already set in base event */ + if ((poll->event.events & proxy->events) != proxy->events) { + /* Add missing proxy events to base event */ + poll->event.events |= proxy->events; + + const int error = uv_poll_start(&poll->uv_handle, poll->event.events, on_poll_event); + + if (error < 0) { + async_throw_error("Failed to update poll handle events: %s", uv_strerror(error)); + return; + } + } + + ZEND_ASYNC_INCREASE_EVENT_COUNT; + event->loop_ref_count = 1; +} + +/* }}} */ + +/* {{{ libuv_poll_proxy_stop */ +static void libuv_poll_proxy_stop(zend_async_event_t *event) +{ + EVENT_STOP_PROLOGUE(event); + + zend_async_poll_proxy_t *proxy = (zend_async_poll_proxy_t *) event; + async_poll_event_t *poll = (async_poll_event_t *) proxy->poll_event; + + /* Remove proxy from the array */ + async_poll_remove_proxy(poll, proxy); + + /* Recalculate events from remaining proxies */ + async_poll_event new_events = async_poll_aggregate_events(poll); + + /* Update base event */ + if (poll->event.events != new_events && poll->event.base.ref_count > 1) { + poll->event.events = new_events; + + /* Restart with new events */ + const int error = uv_poll_start(&poll->uv_handle, new_events, on_poll_event); + + if (error < 0) { + async_throw_error("Failed to update poll handle events: %s", uv_strerror(error)); + } + } + + event->loop_ref_count = 0; + ZEND_ASYNC_DECREASE_EVENT_COUNT; +} + +/* }}} */ + +/* {{{ libuv_poll_proxy_dispose */ +static void libuv_poll_proxy_dispose(zend_async_event_t *event) +{ + if (ZEND_ASYNC_EVENT_REF(event) > 1) { + ZEND_ASYNC_EVENT_DEL_REF(event); + return; + } + + zend_async_poll_proxy_t *proxy = (zend_async_poll_proxy_t *) event; + async_poll_event_t *poll = (async_poll_event_t *) proxy->poll_event; + + if (event->loop_ref_count > 0) { + event->loop_ref_count = 1; + event->stop(event); + } + + zend_async_callbacks_free(event); + + /* Release reference to base poll event */ + ZEND_ASYNC_EVENT_RELEASE(&poll->event.base); + + pefree(proxy, 0); } /* }}} */ @@ -360,6 +586,38 @@ zend_async_poll_event_t *libuv_new_socket_event(zend_socket_t socket, async_poll /* }}} */ +/* {{{ libuv_new_poll_proxy_event */ +zend_async_poll_proxy_t * +libuv_new_poll_proxy_event(zend_async_poll_event_t *poll_event, async_poll_event events, size_t extra_size) +{ + START_REACTOR_OR_RETURN_NULL; + + zend_async_poll_proxy_t *proxy = pecalloc( + 1, extra_size != 0 ? sizeof(zend_async_poll_proxy_t) + extra_size : sizeof(zend_async_poll_proxy_t), 0); + + /* Set up proxy */ + proxy->poll_event = poll_event; + proxy->events = events; + + /* Add reference to base poll event */ + ZEND_ASYNC_EVENT_ADD_REF(&poll_event->base); + + /* Initialize base event structure */ + proxy->base.extra_offset = sizeof(zend_async_poll_proxy_t); + proxy->base.ref_count = 1; + + /* Initialize proxy methods */ + proxy->base.add_callback = libuv_add_callback; + proxy->base.del_callback = libuv_remove_callback; + proxy->base.start = libuv_poll_proxy_start; + proxy->base.stop = libuv_poll_proxy_stop; + proxy->base.dispose = libuv_poll_proxy_dispose; + + return proxy; +} + +/* }}} */ + ///////////////////////////////////////////////////////////////////////////////// /// Timer API ///////////////////////////////////////////////////////////////////////////////// @@ -2448,6 +2706,7 @@ void async_libuv_reactor_register(void) libuv_reactor_loop_alive, libuv_new_socket_event, libuv_new_poll_event, + libuv_new_poll_proxy_event, libuv_new_timer_event, libuv_new_signal_event, libuv_new_process_event, @@ -2461,4 +2720,4 @@ void async_libuv_reactor_register(void) libuv_new_trigger_event); zend_async_socket_listening_register(LIBUV_REACTOR_NAME, false, libuv_socket_listen); -} \ No newline at end of file +} diff --git a/libuv_reactor.h b/libuv_reactor.h index fa81fc6..9a9b983 100644 --- a/libuv_reactor.h +++ b/libuv_reactor.h @@ -46,6 +46,10 @@ struct _async_poll_event_t { zend_async_poll_event_t event; uv_poll_t uv_handle; + /* Array of active proxies for correct event aggregation */ + zend_async_poll_proxy_t **proxies; + uint32_t proxies_count; + uint32_t proxies_capacity; }; struct _async_timer_event_t diff --git a/php_async.h b/php_async.h index ad4c221..1670bbc 100644 --- a/php_async.h +++ b/php_async.h @@ -45,6 +45,8 @@ PHP_ASYNC_API extern zend_class_entry *async_ce_timeout; #define PHP_ASYNC_VERSION "0.4.0" #define PHP_ASYNC_NAME_VERSION "true async v0.4.0" +#define REACTOR_CHECK_INTERVAL (100 * 1000000) // ms in nanoseconds + typedef struct { // The first field must be a reference to a Zend object. @@ -71,6 +73,8 @@ ZEND_BEGIN_MODULE_GLOBALS(async) circular_buffer_t microtasks; /* Queue of coroutine_queue */ circular_buffer_t coroutine_queue; +/* Queue of resumed coroutines for event cleanup */ +circular_buffer_t resumed_coroutines; /* List of coroutines */ HashTable coroutines; /* The transfer structure is used to return to the main execution context. */ @@ -104,6 +108,9 @@ uv_async_t *uvloop_wakeup; circular_buffer_t *pid_queue; #endif +/* Reactor execution optimization */ +uint64_t last_reactor_tick; + #ifdef PHP_WIN32 #endif ZEND_END_MODULE_GLOBALS(async) diff --git a/php_async_api.h b/php_async_api.h index e547083..532b06a 100644 --- a/php_async_api.h +++ b/php_async_api.h @@ -17,17 +17,17 @@ #define PHP_ASYNC_API_H #ifdef PHP_WIN32 -# ifdef ASYNC_EXPORTS -# define PHP_ASYNC_API __declspec(dllexport) -# else -# define PHP_ASYNC_API __declspec(dllimport) -# endif +#ifdef ASYNC_EXPORTS +#define PHP_ASYNC_API __declspec(dllexport) #else -# if defined(__GNUC__) && __GNUC__ >= 4 -# define PHP_ASYNC_API __attribute__ ((visibility("default"))) -# else -# define PHP_ASYNC_API -# endif +#define PHP_ASYNC_API __declspec(dllimport) +#endif +#else +#if defined(__GNUC__) && __GNUC__ >= 4 +#define PHP_ASYNC_API __attribute__((visibility("default"))) +#else +#define PHP_ASYNC_API +#endif #endif #endif // PHP_ASYNC_API_H \ No newline at end of file diff --git a/scheduler.c b/scheduler.c index e76393f..5fa5a65 100644 --- a/scheduler.c +++ b/scheduler.c @@ -31,9 +31,9 @@ #define FIBER_DEBUG_LOG_ON false #define FIBER_DEBUG_SWITCH false #if FIBER_DEBUG_LOG_ON -# define FIBER_DEBUG(...) fprintf(stdout, __VA_ARGS__) +#define FIBER_DEBUG(...) fprintf(stdout, __VA_ARGS__) #else -# define FIBER_DEBUG(...) ((void)0) +#define FIBER_DEBUG(...) ((void) 0) #endif static zend_function root_function = { ZEND_INTERNAL_FUNCTION }; @@ -64,7 +64,7 @@ static void fiber_context_cleanup(zend_fiber_context *context); if (ZEND_ASYNC_GRACEFUL_SHUTDOWN) { \ finally_shutdown(); \ switch_to_scheduler(transfer); \ - zend_exception_restore(); \ + zend_exception_restore_fast(exception_ptr, prev_exception_ptr); \ return; \ } \ start_graceful_shutdown(); \ @@ -95,7 +95,7 @@ static void fiber_context_cleanup(zend_fiber_context *context) efree(fiber_context); } -async_fiber_context_t* async_fiber_context_create(void) +async_fiber_context_t *async_fiber_context_create(void) { async_fiber_context_t *context = ecalloc(1, sizeof(async_fiber_context_t)); @@ -112,7 +112,7 @@ async_fiber_context_t* async_fiber_context_create(void) static zend_always_inline void fiber_pool_init(void) { - circular_buffer_ctor(&ASYNC_G(fiber_context_pool), ASYNC_FIBER_POOL_SIZE, sizeof(async_fiber_context_t*), NULL); + circular_buffer_ctor(&ASYNC_G(fiber_context_pool), ASYNC_FIBER_POOL_SIZE, sizeof(async_fiber_context_t *), NULL); } static void fiber_pool_cleanup(void) @@ -122,12 +122,9 @@ static void fiber_pool_cleanup(void) zend_coroutine_t *coroutine = ZEND_ASYNC_CURRENT_COROUTINE; ZEND_ASYNC_CURRENT_COROUTINE = NULL; - while (circular_buffer_pop_ptr(&ASYNC_G(fiber_context_pool), (void**)&fiber_context) == SUCCESS) { + while (circular_buffer_pop_ptr(&ASYNC_G(fiber_context_pool), (void **) &fiber_context) == SUCCESS) { if (fiber_context != NULL) { - zend_fiber_transfer transfer = { - .context = &fiber_context->context, - .flags = 0 - }; + zend_fiber_transfer transfer = { .context = &fiber_context->context, .flags = 0 }; // If the current coroutine is NULL, this state explicitly tells the fiber to stop execution. // We set this value each time, since the switch_to_scheduler function may change it. @@ -228,10 +225,7 @@ static zend_always_inline void fiber_switch_context(async_coroutine_t *coroutine ZEND_ASSERT(fiber_context != NULL && "Fiber context is NULL in fiber_switch_context"); - zend_fiber_transfer transfer = { - .context = &fiber_context->context, - .flags = 0 - }; + zend_fiber_transfer transfer = { .context = &fiber_context->context, .flags = 0 }; #if FIBER_DEBUG_SWITCH zend_fiber_context *from = EG(current_fiber_context); @@ -302,11 +296,23 @@ static zend_always_inline void return_to_main(zend_fiber_transfer *transfer) /// COROUTINE QUEUE MANAGEMENT /////////////////////////////////////////////////////////// +static zend_always_inline void process_resumed_coroutines(void) +{ + circular_buffer_t *resumed_queue = &ASYNC_G(resumed_coroutines); + zend_coroutine_t *coroutine = NULL; + + while (circular_buffer_pop_ptr(resumed_queue, (void **) &coroutine) == SUCCESS) { + if (EXPECTED(coroutine != NULL && coroutine->waker != NULL)) { + ZEND_ASYNC_WAKER_CLEAN_EVENTS(coroutine->waker); + } + } +} + static zend_always_inline async_coroutine_t *next_coroutine(void) { async_coroutine_t *coroutine; - if (UNEXPECTED(circular_buffer_pop_ptr(&ASYNC_G(coroutine_queue), (void**)&coroutine) == FAILURE)) { + if (UNEXPECTED(circular_buffer_pop_ptr(&ASYNC_G(coroutine_queue), (void **) &coroutine) == FAILURE)) { ZEND_ASSERT("Failed to pop the coroutine from the pending queue."); return NULL; } @@ -369,7 +375,7 @@ static zend_always_inline switch_status execute_next_coroutine(void) // The coroutine doesn't have its own Fiber, // so we first need to allocate a Fiber context for it and then start it. - circular_buffer_pop_ptr(&ASYNC_G(fiber_context_pool), (void**)&async_coroutine->fiber_context); + circular_buffer_pop_ptr(&ASYNC_G(fiber_context_pool), (void **) &async_coroutine->fiber_context); if (async_coroutine->fiber_context == NULL) { async_coroutine->fiber_context = async_fiber_context_create(); @@ -397,7 +403,8 @@ static zend_always_inline switch_status execute_next_coroutine(void) #define AVAILABLE_FOR_COROUTINE (transfer != NULL) -static zend_always_inline switch_status execute_next_coroutine_from_fiber(zend_fiber_transfer *transfer, async_fiber_context_t *fiber_context) +static zend_always_inline switch_status execute_next_coroutine_from_fiber(zend_fiber_transfer *transfer, + async_fiber_context_t *fiber_context) { async_coroutine_t *async_coroutine = next_coroutine(); @@ -473,7 +480,7 @@ static zend_always_inline switch_status execute_next_coroutine_from_fiber(zend_f // (AVAILABLE_FOR_COROUTINE == false) // The coroutine doesn't have its own Fiber, // so we first need to allocate a Fiber context for it and then start it. - circular_buffer_pop_ptr(&ASYNC_G(fiber_context_pool), (void**)&async_coroutine->fiber_context); + circular_buffer_pop_ptr(&ASYNC_G(fiber_context_pool), (void **) &async_coroutine->fiber_context); if (async_coroutine->fiber_context == NULL) { async_coroutine->fiber_context = async_fiber_context_create(); @@ -560,7 +567,10 @@ static bool resolve_deadlocks(void) /////////////////////////////////////////////////////////// static void cancel_queued_coroutines(void) { - zend_exception_save(); + zend_object **exception = &EG(exception); + zend_object **prev_exception = &EG(prev_exception); + + zend_exception_save_fast(exception, prev_exception); // 1. Walk through all coroutines and cancel them if they are suspended. zval *current; @@ -586,15 +596,15 @@ static void cancel_queued_coroutines(void) ZEND_ASYNC_CANCEL(coroutine, cancellation_exception, false); } - if (EG(exception)) { - zend_exception_save(); + if (*exception) { + zend_exception_save_fast(exception, prev_exception); } } ZEND_HASH_FOREACH_END(); OBJ_RELEASE(cancellation_exception); - zend_exception_restore(); + zend_exception_restore_fast(exception, prev_exception); } void start_graceful_shutdown(void) @@ -674,6 +684,7 @@ static void async_scheduler_dtor(void) OBJ_RELEASE(&async_coroutine->std); zval_c_buffer_cleanup(&ASYNC_G(coroutine_queue)); + zval_c_buffer_cleanup(&ASYNC_G(resumed_coroutines)); zval_c_buffer_cleanup(&ASYNC_G(microtasks)); zval *current; @@ -1025,6 +1036,11 @@ void async_scheduler_coroutine_enqueue(zend_coroutine_t *coroutine) async_throw_error("Failed to enqueue coroutine"); } else { coroutine->waker->status = ZEND_ASYNC_WAKER_QUEUED; + + // Add to resumed_coroutines queue for event cleanup + if (ZEND_ASYNC_IS_SCHEDULER_CONTEXT) { + circular_buffer_push_ptr_with_resize(&ASYNC_G(resumed_coroutines), coroutine); + } } // @@ -1043,15 +1059,29 @@ static zend_always_inline void scheduler_next_tick(void) zend_fiber_transfer *transfer = NULL; ZEND_ASYNC_SCHEDULER_CONTEXT = true; + zend_object **exception_ptr = &EG(exception); + zend_object **prev_exception_ptr = &EG(prev_exception); + execute_microtasks(); TRY_HANDLE_SUSPEND_EXCEPTION(); - const bool has_handles = ZEND_ASYNC_REACTOR_EXECUTE(circular_buffer_is_not_empty(&ASYNC_G(coroutine_queue))); - TRY_HANDLE_SUSPEND_EXCEPTION(); + const uint64_t current_time = zend_hrtime(); + bool has_handles = true; - ZEND_ASYNC_SCHEDULER_CONTEXT = false; + if (UNEXPECTED(current_time - ASYNC_G(last_reactor_tick) > REACTOR_CHECK_INTERVAL)) { + ASYNC_G(last_reactor_tick) = current_time; + const circular_buffer_t *queue = &ASYNC_G(coroutine_queue); - TRY_HANDLE_SUSPEND_EXCEPTION(); + has_handles = ZEND_ASYNC_REACTOR_EXECUTE(circular_buffer_is_not_empty(queue)); + + if (circular_buffer_is_not_empty(&ASYNC_G(coroutine_queue))) { + process_resumed_coroutines(); + } + + TRY_HANDLE_SUSPEND_EXCEPTION(); + } + + ZEND_ASYNC_SCHEDULER_CONTEXT = false; const bool is_next_coroutine = circular_buffer_is_not_empty(&ASYNC_G(coroutine_queue)); @@ -1059,7 +1089,7 @@ static zend_always_inline void scheduler_next_tick(void) zend_hash_num_elements(&ASYNC_G(coroutines)) > 0 && circular_buffer_is_empty(&ASYNC_G(microtasks)) && resolve_deadlocks())) { switch_to_scheduler(transfer); - } + } if (EXPECTED(is_next_coroutine)) { // @@ -1079,7 +1109,10 @@ void async_scheduler_coroutine_suspend(void) // // Before suspending the coroutine, we save the current exception state. // - zend_exception_save(); + zend_object **exception_ptr = &EG(exception); + zend_object **prev_exception_ptr = &EG(prev_exception); + + zend_exception_save_fast(exception_ptr, prev_exception_ptr); /** * Note that the Scheduler is initialized after the first use of suspend, @@ -1088,8 +1121,8 @@ void async_scheduler_coroutine_suspend(void) if (UNEXPECTED(ZEND_ASYNC_SCHEDULER == NULL)) { async_scheduler_launch(); - if (UNEXPECTED(EG(exception))) { - zend_exception_restore(); + if (UNEXPECTED(*exception_ptr)) { + zend_exception_restore_fast(exception_ptr, prev_exception_ptr); return; } } @@ -1113,7 +1146,7 @@ void async_scheduler_coroutine_suspend(void) if (coroutine->waker->events.nNumOfElements == 0 && not_in_queue) { async_throw_error("The coroutine has no events to wait for"); zend_async_waker_clean(coroutine); - zend_exception_restore(); + zend_exception_restore_fast(exception_ptr, prev_exception_ptr); return; } @@ -1122,12 +1155,12 @@ void async_scheduler_coroutine_suspend(void) // If an exception occurs during the startup of the Waker object, // that exception belongs to the current coroutine, // which means we have the right to immediately return to the point from which we were called. - if (UNEXPECTED(EG(exception))) { + if (UNEXPECTED(*exception_ptr)) { // Before returning, We are required to properly destroy the Waker object. - zend_exception_save(); + zend_exception_save_fast(exception_ptr, prev_exception_ptr); stop_waker_events(coroutine->waker); zend_async_waker_clean(coroutine); - zend_exception_restore(); + zend_exception_restore_fast(exception_ptr, prev_exception_ptr); return; } @@ -1165,7 +1198,7 @@ void async_scheduler_coroutine_suspend(void) async_rethrow_exception(exception); } - zend_exception_restore(); + zend_exception_restore_fast(exception_ptr, prev_exception_ptr); } /////////////////////////////////////////////////////////// @@ -1212,12 +1245,12 @@ ZEND_STACK_ALIGNED void fiber_entry(zend_fiber_transfer *transfer) zend_first_try { - zend_vm_stack stack = (zend_vm_stack)vm_stack_memory; + zend_vm_stack stack = (zend_vm_stack) vm_stack_memory; // Initialize VM stack structure manually // see zend_vm_stack_init() stack->top = ZEND_VM_STACK_ELEMENTS(stack); - stack->end = (zval*)((char*)vm_stack_memory + ZEND_FIBER_VM_STACK_SIZE); + stack->end = (zval *) ((char *) vm_stack_memory + ZEND_FIBER_VM_STACK_SIZE); stack->prev = NULL; // we allocate space for the first call frame, thereby normalizing the stack @@ -1252,6 +1285,9 @@ ZEND_STACK_ALIGNED void fiber_entry(zend_fiber_transfer *transfer) bool was_executed = false; switch_status status = COROUTINE_NOT_EXISTS; + const circular_buffer_t *coroutine_queue = &ASYNC_G(coroutine_queue); + circular_buffer_t *resumed_coroutines = &ASYNC_G(resumed_coroutines); + do { TRY_HANDLE_EXCEPTION(); @@ -1260,11 +1296,18 @@ ZEND_STACK_ALIGNED void fiber_entry(zend_fiber_transfer *transfer) ZEND_ASYNC_SCHEDULER_CONTEXT = true; + ZEND_ASSERT(circular_buffer_is_not_empty(resumed_coroutines) == 0 && "resumed_coroutines should be 0"); + execute_microtasks(); TRY_HANDLE_EXCEPTION(); - has_next_coroutine = circular_buffer_is_not_empty(&ASYNC_G(coroutine_queue)); + has_next_coroutine = circular_buffer_count(coroutine_queue) > 0; has_handles = ZEND_ASYNC_REACTOR_EXECUTE(has_next_coroutine); + + if (circular_buffer_is_not_empty(resumed_coroutines)) { + process_resumed_coroutines(); + } + TRY_HANDLE_EXCEPTION(); ZEND_ASYNC_SCHEDULER_CONTEXT = false; @@ -1306,14 +1349,13 @@ ZEND_STACK_ALIGNED void fiber_entry(zend_fiber_transfer *transfer) if (UNEXPECTED(false == has_handles && false == was_executed && zend_hash_num_elements(&ASYNC_G(coroutines)) > 0 && - circular_buffer_is_empty(&ASYNC_G(coroutine_queue)) && + circular_buffer_is_empty(coroutine_queue) && circular_buffer_is_empty(&ASYNC_G(microtasks)) && resolve_deadlocks())) { break; - } + } } while (zend_hash_num_elements(&ASYNC_G(coroutines)) > 0 || circular_buffer_is_not_empty(&ASYNC_G(microtasks)) || ZEND_ASYNC_REACTOR_LOOP_ALIVE()); - } zend_catch { diff --git a/scheduler.h b/scheduler.h index b97b31d..ddc89bc 100644 --- a/scheduler.h +++ b/scheduler.h @@ -39,7 +39,7 @@ void async_scheduler_main_coroutine_suspend(void); void async_scheduler_coroutine_enqueue(zend_coroutine_t *coroutine); /* Fiber context creation */ -async_fiber_context_t* async_fiber_context_create(void); +async_fiber_context_t *async_fiber_context_create(void); END_EXTERN_C() diff --git a/tests/await/008-awaitFirstSuccess_basic.phpt b/tests/await/008-awaitFirstSuccess_basic.phpt index fd203f6..4d3219b 100644 --- a/tests/await/008-awaitFirstSuccess_basic.phpt +++ b/tests/await/008-awaitFirstSuccess_basic.phpt @@ -11,7 +11,7 @@ echo "start\n"; $coroutines = [ spawn(function() { - suspend(); + //suspend(); throw new RuntimeException("first error"); }), spawn(function() { diff --git a/tests/await/016-awaitAnyOf_basic.phpt b/tests/await/016-awaitAnyOf_basic.phpt index 93f465f..1735204 100644 --- a/tests/await/016-awaitAnyOf_basic.phpt +++ b/tests/await/016-awaitAnyOf_basic.phpt @@ -14,7 +14,6 @@ $coroutines = [ return "first"; }), spawn(function() { - suspend(); throw new RuntimeException("test exception"); }), spawn(function() { diff --git a/tests/await/043-awaitAnyOfOrFail_associative_array.phpt b/tests/await/043-awaitAnyOfOrFail_associative_array.phpt index b06102d..fe2caa1 100644 --- a/tests/await/043-awaitAnyOfOrFail_associative_array.phpt +++ b/tests/await/043-awaitAnyOfOrFail_associative_array.phpt @@ -30,7 +30,7 @@ $coroutines = [ echo "start\n"; -$results = awaitAnyOfOrFail(2, $coroutines); +$results = awaitAnyOfOrFail(3, $coroutines); echo "Keys preserved: " . (count(array_intersect(array_keys($results), ['slow', 'fast', 'medium', 'very_slow'])) == count($results) ? "YES" : "NO") . "\n"; diff --git a/tests/edge_cases/002-deadlock-with-catch.phpt b/tests/edge_cases/002-deadlock-with-catch.phpt index f13b1a6..8dbaf79 100644 --- a/tests/edge_cases/002-deadlock-with-catch.phpt +++ b/tests/edge_cases/002-deadlock-with-catch.phpt @@ -49,4 +49,5 @@ Warning: the coroutine was suspended in file: %s, line: %d will be canceled in U Warning: the coroutine was suspended in file: %s, line: %d will be canceled in Unknown on line %d Caught exception: Deadlock detected coroutine1 finished +Caught exception: Deadlock detected coroutine2 finished \ No newline at end of file diff --git a/tests/edge_cases/003-deadlock-with-zombie.phpt b/tests/edge_cases/003-deadlock-with-zombie.phpt index 371988a..0309800 100644 --- a/tests/edge_cases/003-deadlock-with-zombie.phpt +++ b/tests/edge_cases/003-deadlock-with-zombie.phpt @@ -54,5 +54,6 @@ Warning: the coroutine was suspended in file: %s, line: %d will be canceled in U Warning: the coroutine was suspended in file: %s, line: %d will be canceled in Unknown on line %d Caught exception: Deadlock detected +Caught exception: Deadlock detected coroutine1 finished coroutine2 finished \ No newline at end of file diff --git a/tests/edge_cases/010-deadlock-after-cancel-with-zombie.phpt b/tests/edge_cases/010-deadlock-after-cancel-with-zombie.phpt index 0932908..1f589ee 100644 --- a/tests/edge_cases/010-deadlock-after-cancel-with-zombie.phpt +++ b/tests/edge_cases/010-deadlock-after-cancel-with-zombie.phpt @@ -61,5 +61,6 @@ Warning: the coroutine was suspended in file: %s, line: %d will be canceled in % Warning: the coroutine was suspended in file: %s, line: %d will be canceled in %s on line %d Caught exception: Deadlock detected +Caught exception: Deadlock detected coroutine1 finished coroutine2 finished \ No newline at end of file diff --git a/tests/socket_ext/005-socket_accept_multiple.phpt b/tests/socket_ext/005-socket_accept_multiple.phpt index d9375bf..73d0a4e 100644 --- a/tests/socket_ext/005-socket_accept_multiple.phpt +++ b/tests/socket_ext/005-socket_accept_multiple.phpt @@ -10,7 +10,7 @@ if (!extension_loaded('sockets')) { $client) { $clientNum = $i + 1; socket_write($client, "Response to client $clientNum"); socket_close($client); } - + socket_close($socket); }); @@ -56,27 +56,32 @@ $server = spawn(function() use (&$port, &$output) { $clients = []; for ($i = 1; $i <= 3; $i++) { $clients[] = spawn(function() use (&$port, $i, &$output) { - while ($port === null) { - delay(1); + + for ($ii = 0; $ii < 3 && $port === null; $ii++) { + delay(10); } - + + if(empty($port)) { + throw new Exception("Server port is not provided..."); + } + // Small delay to stagger connections delay($i); - + $output[] = "Client$i: connecting"; $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); - + if (socket_connect($socket, '127.0.0.1', $port)) { $output[] = "Client$i: connected"; $data = socket_read($socket, 1024); $output[] = "Client$i: received '$data'"; } - + socket_close($socket); }); } -awaitAll(array_merge([$server], $clients)); +awaitAllOrFail(array_merge([$server], $clients)); // Sort and output results sort($output); @@ -106,4 +111,4 @@ Server: listening on port %d Server: waiting for client 1 Server: waiting for client 2 Server: waiting for client 3 -End \ No newline at end of file +End diff --git a/tests/stream/016-tcp_stream_socket_accept_timeout.phpt b/tests/stream/016-tcp_stream_socket_accept_timeout.phpt index 60b39be..7e8ada3 100644 --- a/tests/stream/016-tcp_stream_socket_accept_timeout.phpt +++ b/tests/stream/016-tcp_stream_socket_accept_timeout.phpt @@ -42,5 +42,5 @@ Server: starting Server: listening on port %d Server: accepting connections -Warning: stream_socket_accept(): %s +Warning: stream_socket_accept(): Accept failed: %s Server end diff --git a/tests/stream/017-stream_select_invalid_streams.phpt b/tests/stream/017-stream_select_invalid_streams.phpt new file mode 100644 index 0000000..d4ef6d7 --- /dev/null +++ b/tests/stream/017-stream_select_invalid_streams.phpt @@ -0,0 +1,33 @@ +--TEST-- +stream_select with invalid stream types +--FILE-- + +--EXPECTF-- +Testing stream_select with invalid streams +%a +Result: invalid streams test completed \ No newline at end of file diff --git a/tests/stream/018-stream_select_empty_arrays.phpt b/tests/stream/018-stream_select_empty_arrays.phpt new file mode 100644 index 0000000..292fbd3 --- /dev/null +++ b/tests/stream/018-stream_select_empty_arrays.phpt @@ -0,0 +1,27 @@ +--TEST-- +stream_select with empty arrays +--FILE-- + +--EXPECT-- +Testing stream_select with empty arrays +Result: 0 +Result: empty arrays test completed \ No newline at end of file diff --git a/tests/stream/019-stream_select_closed_streams.phpt b/tests/stream/019-stream_select_closed_streams.phpt new file mode 100644 index 0000000..82f6503 --- /dev/null +++ b/tests/stream/019-stream_select_closed_streams.phpt @@ -0,0 +1,48 @@ +--TEST-- +stream_select with closed streams +--FILE-- + +--EXPECTF-- +Testing stream_select with closed streams +Result: %d +Read array count: %d +Result: closed streams test completed \ No newline at end of file diff --git a/tests/stream/020-stream_select_zero_timeout.phpt b/tests/stream/020-stream_select_zero_timeout.phpt new file mode 100644 index 0000000..e7e2e9a --- /dev/null +++ b/tests/stream/020-stream_select_zero_timeout.phpt @@ -0,0 +1,48 @@ +--TEST-- +stream_select with zero timeout (immediate return) +--FILE-- + +--EXPECTF-- +Testing stream_select with zero timeout +Result: 0 +Elapsed time: %sms +Read array count: 0 +Result: zero timeout test completed \ No newline at end of file diff --git a/tests/stream/021-stream_select_microsecond_timeout.phpt b/tests/stream/021-stream_select_microsecond_timeout.phpt new file mode 100644 index 0000000..8ad9f62 --- /dev/null +++ b/tests/stream/021-stream_select_microsecond_timeout.phpt @@ -0,0 +1,46 @@ +--TEST-- +stream_select with microsecond timeout precision +--FILE-- + +--EXPECTF-- +Testing stream_select with microsecond timeout +Result: 0 +Elapsed time: %sms +Result: microsecond timeout test completed \ No newline at end of file diff --git a/tests/stream/022-stream_select_write_ready.phpt b/tests/stream/022-stream_select_write_ready.phpt new file mode 100644 index 0000000..873a40e --- /dev/null +++ b/tests/stream/022-stream_select_write_ready.phpt @@ -0,0 +1,48 @@ +--TEST-- +stream_select with write-ready streams +--FILE-- + $stream) { + echo "Stream $i is write-ready\n"; + } + + fclose($sock1); + fclose($sock2); + + return "write ready test completed"; +}); + +$result = await($coroutine); +echo "Result: $result\n"; + +?> +--EXPECTF-- +Testing stream_select write-ready streams +Write-ready streams: %d +Write array count: %d +%a +Result: write ready test completed \ No newline at end of file diff --git a/tests/stream/023-stream_select_large_sets.phpt b/tests/stream/023-stream_select_large_sets.phpt new file mode 100644 index 0000000..6acfd22 --- /dev/null +++ b/tests/stream/023-stream_select_large_sets.phpt @@ -0,0 +1,88 @@ +--TEST-- +stream_select with large stream sets +--FILE-- + +--EXPECTF-- +Testing stream_select with large stream sets +Creating 10 socket pairs +Created 10 socket pairs +Select with 10 streams +Result: %d in %sms +Ready streams: %d +Streams with data: %d +Result: large sets test completed \ No newline at end of file diff --git a/tests/stream/024-stream_select_remote_disconnect.phpt b/tests/stream/024-stream_select_remote_disconnect.phpt new file mode 100644 index 0000000..ace26f5 --- /dev/null +++ b/tests/stream/024-stream_select_remote_disconnect.phpt @@ -0,0 +1,215 @@ +--TEST-- +stream_select behavior when remote client disconnects after sending data +--SKIPIF-- + +--FILE-- + ['pipe', 'r'], // stdin + 1 => ['pipe', 'w'], // stdout + 2 => ['pipe', 'w'], // stderr + ]; + + $options = ["suppress_errors" => true]; + if (PHP_OS_FAMILY === 'Windows') { + // On Windows, prevent creation of a console window + $options["bypass_shell"] = true; + } + + $process = proc_open($cmd, $descriptorspec, $pipes, null, null, $options); + if (!$process) { + echo "Failed to start client process\n"; + return false; + } + + return [$process, $pipes]; +} + +function cleanup_client_process($process, $pipes) { + if (is_resource($pipes[1])) { + fclose($pipes[1]); + } + if (is_resource($pipes[2])) { + fclose($pipes[2]); + } + + // Give the process time to finish, especially on Windows + if (PHP_OS_FAMILY === 'Windows') { + usleep(50000); // 50ms delay + } + + $status = proc_get_status($process); + if ($status && $status['running']) { + proc_terminate($process); + // On Windows, termination might need more time + if (PHP_OS_FAMILY === 'Windows') { + usleep(100000); // 100ms delay + } + } + + $exit_code = proc_close($process); + + // On Windows, exit code 255 might be normal for terminated processes + if (PHP_OS_FAMILY === 'Windows' && $exit_code === 255) { + $exit_code = 0; + } + + return $exit_code; +} + +echo "Testing stream_select with remote disconnect scenario\n"; + +$server_coroutine = spawn(function() { + // Create server socket + $server = stream_socket_server("tcp://127.0.0.1:0", $errno, $errstr); + if (!$server) { + echo "Failed to create server: $errstr\n"; + return "server failed"; + } + + // Get server address and port + $server_name = stream_socket_get_name($server, false); + $port = parse_url("tcp://$server_name", PHP_URL_PORT); + echo "Server listening on port: $port\n"; + + // Start client process using separate script + $client_result = start_tcp_client_process($port); + if (!$client_result) { + fclose($server); + return "client process failed"; + } + + list($client_process, $pipes) = $client_result; + + // Close stdin, keep stdout/stderr for reading + fclose($pipes[0]); + + // Server waits for connection + echo "Server: waiting for connection\n"; + $read = [$server]; + $write = $except = null; + + $result = stream_select($read, $write, $except, 3); + if ($result === 0) { + echo "Server: timeout waiting for connection\n"; + cleanup_client_process($client_process, $pipes); + fclose($server); + return "server timeout"; + } + + echo "Server: accepting connection\n"; + $client_socket = stream_socket_accept($server, 1); + if (!$client_socket) { + echo "Server: failed to accept connection\n"; + cleanup_client_process($client_process, $pipes); + fclose($server); + return "accept failed"; + } + + echo "Server: connection accepted\n"; + + // Critical test: monitor client socket with stream_select + echo "Server: monitoring client socket with stream_select\n"; + + // First select - should detect incoming data + $read = [$client_socket]; + $write = $except = null; + + $result1 = stream_select($read, $write, $except, 3); + echo "Server: first stream_select result: $result1\n"; + + if ($result1 > 0 && count($read) > 0) { + + stream_set_timeout($client_socket, 1); + + $data = fread($client_socket, 1024); + echo "Server: received data: '" . trim($data) . "'\n"; + + // Continue monitoring for disconnection + echo "Server: continuing to monitor for disconnection\n"; + $read = [$client_socket]; + $write = $except = null; + + // This is the critical test - detect disconnection via stream_select + $result2 = stream_select($read, $write, $except, 3); + echo "Server: second stream_select result: $result2\n"; + echo "Server: ready streams after disconnect: " . count($read) . "\n"; + + if ($result2 > 0 && count($read) > 0) { + // Try to read - should detect disconnection + $disconnect_data = fread($client_socket, 1024); + if ($disconnect_data === false) { + echo "Server: detected disconnection (fread returned false)\n"; + } elseif ($disconnect_data === '') { + echo "Server: detected disconnection (fread returned empty string)\n"; + } else { + echo "Server: unexpected data on disconnect: '$disconnect_data'\n"; + } + + // Check stream metadata + $meta = stream_get_meta_data($client_socket); + echo "Server: stream EOF: " . ($meta['eof'] ? "yes" : "no") . "\n"; + } else { + echo "Server: no disconnect event detected within timeout\n"; + } + } + + // Read client process output + $client_output = stream_get_contents($pipes[1]); + $client_errors = stream_get_contents($pipes[2]); + + echo "Client output:\n$client_output"; + if (!empty($client_errors)) { + echo "Client errors:\n$client_errors"; + } + + // Cleanup + fclose($client_socket); + fclose($server); + + $exit_code = cleanup_client_process($client_process, $pipes); + echo "Client process exit code: $exit_code\n"; + + return "server completed"; +}); + +$result = await($server_coroutine); +echo "Test result: $result\n"; + +?> +--EXPECTF-- +Testing stream_select with remote disconnect scenario +Server listening on port: %d +Server: waiting for connection +Server: accepting connection +Server: connection accepted +Server: monitoring client socket with stream_select +Server: first stream_select result: 1 +Server: received data: 'Hello from external process' +Server: continuing to monitor for disconnection +Server: second stream_select result: %d +Server: ready streams after disconnect: %d +Server: detected disconnection %s +Server: stream EOF: %s +Client output: +Client process: connecting to port %d +Client process: connected, sending data +Client process: closing connection abruptly +Client process: exited +Client process exit code: 0 +Test result: server completed \ No newline at end of file diff --git a/tests/stream/025-ssl_stream_socket_accept_timeout.phpt b/tests/stream/025-ssl_stream_socket_accept_timeout.phpt new file mode 100644 index 0000000..8a72c5f --- /dev/null +++ b/tests/stream/025-ssl_stream_socket_accept_timeout.phpt @@ -0,0 +1,70 @@ +--TEST-- +SSL Stream: stream_socket_accept() with SSL and timeout +--SKIPIF-- + +--FILE-- + [ + 'local_cert' => $cert_file, + 'local_pk' => $key_file, + 'verify_peer' => false, + 'allow_self_signed' => true, + ] + ]); + + echo "SSL Server: starting SSL server\n"; + $socket = stream_socket_server("ssl://127.0.0.1:0", $errno, $errstr, STREAM_SERVER_BIND|STREAM_SERVER_LISTEN, $context); + if (!$socket) { + echo "SSL Server: failed to start - $errstr ($errno)\n"; + return; + } + + $address = stream_socket_get_name($socket, false); + echo "SSL Server: listening on $address\n"; + + echo "SSL Server: accepting with timeout\n"; + // This should use network_async_accept_incoming() in async mode + // instead of the old inefficient php_poll2_async() + $client = @stream_socket_accept($socket, 1); // 1 second timeout + + if ($client === false) { + echo "SSL Server: timeout occurred as expected\n"; + } else { + echo "SSL Server: unexpected client connection\n"; + fclose($client); + } + + fclose($socket); + echo "SSL Server: finished\n"; +}); + +awaitAll([$server]); + +echo "End SSL accept timeout test\n"; + +?> +--EXPECTF-- +Start SSL accept timeout test +SSL Server: creating SSL context +SSL Server: starting SSL server +SSL Server: listening on %s:%d +SSL Server: accepting with timeout +SSL Server: timeout occurred as expected +SSL Server: finished +End SSL accept timeout test diff --git a/tests/stream/026-ssl_client_server_async.phpt b/tests/stream/026-ssl_client_server_async.phpt new file mode 100644 index 0000000..b5ab0c5 --- /dev/null +++ b/tests/stream/026-ssl_client_server_async.phpt @@ -0,0 +1,127 @@ +--TEST-- +SSL Stream: full SSL client-server async communication +--SKIPIF-- + +--FILE-- + [ + 'local_cert' => $cert_file, + 'local_pk' => $key_file, + 'verify_peer' => false, + 'allow_self_signed' => true, + 'crypto_method' => STREAM_CRYPTO_METHOD_TLS_SERVER, + ] + ]); + + $socket = stream_socket_server("ssl://127.0.0.1:0", $errno, $errstr, STREAM_SERVER_BIND|STREAM_SERVER_LISTEN, $context); + if (!$socket) { + echo "SSL Server: failed to create socket - $errstr\n"; + return; + } + + $server_address = stream_socket_get_name($socket, false); + // Client needs ssl:// prefix to connect via SSL + $address = 'ssl://' . $server_address; + echo "SSL Server: listening on $server_address\n"; + + echo "SSL Server: waiting for SSL connection\n"; + // This should use network_async_accept_incoming() instead of php_poll2_async() + $client = stream_socket_accept($socket, 10); // 10 second timeout + + if (!$client) { + echo "SSL Server: failed to accept client\n"; + return; + } + + $output[] = "SSL Server: client connected"; + + $data = fread($client, 1024); + $output[] = "SSL Server: received '$data'"; + + fwrite($client, "Hello from SSL server"); + $output[] = "SSL Server: response sent"; + + fclose($client); + fclose($socket); +}); + +// SSL Client coroutine +$client = spawn(function() use (&$address, &$output) { + // Wait for server to set address + while ($address === null) { + delay(10); + } + + echo "SSL Client: connecting to $address\n"; + + $context = stream_context_create([ + 'ssl' => [ + 'verify_peer' => false, + 'verify_peer_name' => false, + 'allow_self_signed' => true, + 'crypto_method' => STREAM_CRYPTO_METHOD_TLS_CLIENT, + ] + ]); + + $sock = stream_socket_client($address, $errno, $errstr, 30, STREAM_CLIENT_CONNECT, $context); + if (!$sock) { + echo "SSL Client: failed to connect - $errstr ($errno)\n"; + return; + } + + $output[] = "SSL Client: connected successfully"; + + fwrite($sock, "Hello from SSL client"); + $output[] = "SSL Client: sent request"; + + $response = fread($sock, 1024); + $output[] = "SSL Client: received '$response'"; + + fclose($sock); +}); + +awaitAllOrFail([$server, $client]); + +// Sort output for deterministic results +sort($output); +foreach ($output as $message) { + echo $message . "\n"; +} + +echo "End SSL client-server test\n"; + +?> +--EXPECTF-- +Start SSL client-server test +SSL Server: creating SSL context +SSL Server: listening on %s:%d +SSL Server: waiting for SSL connection +SSL Client: connecting to %s:%d +SSL Client: connected successfully +SSL Client: received 'Hello from SSL server' +SSL Client: sent request +SSL Server: client connected +SSL Server: received 'Hello from SSL client' +SSL Server: response sent +End SSL client-server test diff --git a/tests/stream/027-ssl_concurrent_accept.phpt b/tests/stream/027-ssl_concurrent_accept.phpt new file mode 100644 index 0000000..09ef38e --- /dev/null +++ b/tests/stream/027-ssl_concurrent_accept.phpt @@ -0,0 +1,112 @@ +--TEST-- +SSL Stream: concurrent SSL accept operations without EventLoop conflicts +--SKIPIF-- + +--FILE-- + [ + 'local_cert' => $cert_file, + 'local_pk' => $key_file, + 'verify_peer' => false, + 'allow_self_signed' => true, + ] + ]); + + $socket = stream_socket_server("ssl://127.0.0.1:0", $errno, $errstr, STREAM_SERVER_BIND|STREAM_SERVER_LISTEN, $context); + if (!$socket) { + $monitor->cancel(); + $output[] = "SSL Server $id: failed to start - $errstr"; + return; + } + + $address = stream_socket_get_name($socket, false); + $output[] = "SSL Server $id: listening on $address"; + + $servers_ready++; + + // All servers try to accept concurrently + // This tests that network_async_accept_incoming() doesn't cause EventLoop conflicts + // which was the main issue with php_poll2_async() + $client = @stream_socket_accept($socket, 2); // 2 second timeout + + if ($client === false) { + $output[] = "SSL Server $id: timeout occurred"; + } else { + $output[] = "SSL Server $id: client connected"; + fclose($client); + } + + fclose($socket); + $servers_completed++; + }); +} + +echo "Creating multiple concurrent SSL servers\n"; + +// Create 3 concurrent SSL servers +// This is the key test - multiple SSL accepts should work without EventLoop conflicts +$server1 = create_ssl_server(1, $cert_file, $key_file, $monitor, $servers_ready, $servers_completed, $output); +$server2 = create_ssl_server(2, $cert_file, $key_file, $monitor, $servers_ready, $servers_completed, $output); +$server3 = create_ssl_server(3, $cert_file, $key_file, $monitor, $servers_ready, $servers_completed, $output); + +// Monitor coroutine +$monitor = spawn(function() use (&$servers_ready, &$servers_completed) { + echo "Monitor: waiting for servers to be ready\n"; + + while ($servers_ready < 3) { + delay(10); + } + + echo "Monitor: all servers ready, waiting for completion\n"; + + while ($servers_completed < 3) { + delay(10); + } + + echo "Monitor: all servers completed\n"; +}); + +awaitAllOrFail([$server1, $server2, $server3, $monitor]); + +// Sort output for deterministic results +sort($output); +foreach ($output as $message) { + echo $message . "\n"; +} + +echo "End SSL concurrent accept test\n"; + +?> +--EXPECTF-- +Start SSL concurrent accept test +Creating multiple concurrent SSL servers +Monitor: waiting for servers to be ready +Monitor: all servers ready, waiting for completion +Monitor: all servers completed +SSL Server 1: listening on %s:%d +SSL Server 1: timeout occurred +SSL Server 2: listening on %s:%d +SSL Server 2: timeout occurred +SSL Server 3: listening on %s:%d +SSL Server 3: timeout occurred +End SSL concurrent accept test diff --git a/tests/stream/028-udp_basic_operations.phpt b/tests/stream/028-udp_basic_operations.phpt new file mode 100644 index 0000000..0667606 --- /dev/null +++ b/tests/stream/028-udp_basic_operations.phpt @@ -0,0 +1,99 @@ +--TEST-- +UDP basic operations with stream_socket_recvfrom/sendto in async context +--FILE-- + +--EXPECTF-- +Start UDP basic operations test +Server: creating UDP socket +Server: listening on udp://127.0.0.1:%d +Server: waiting for UDP data +Client: connecting to udp://127.0.0.1:%d +Client: sent 21 bytes +Server: received 'Hello from UDP client' from 127.0.0.1:%d +Server: sent 21 bytes response +Client: received 'Hello from UDP server' +End UDP basic operations test \ No newline at end of file diff --git a/tests/stream/029-udp_concurrent_operations.phpt b/tests/stream/029-udp_concurrent_operations.phpt new file mode 100644 index 0000000..15e2c97 --- /dev/null +++ b/tests/stream/029-udp_concurrent_operations.phpt @@ -0,0 +1,167 @@ +--TEST-- +Concurrent UDP operations with multiple servers and clients in async context +--FILE-- + +--EXPECTF-- +Start concurrent UDP operations test +Server1: creating UDP socket +Server2: creating UDP socket +Server1: listening on udp://127.0.0.1:%d +Server2: listening on udp://127.0.0.1:%d +Server1: waiting for UDP data +Server2: waiting for UDP data +Client1: connecting to udp://127.0.0.1:%d +Client1: sent 22 bytes +Client2: connecting to udp://127.0.0.1:%d +Client2: sent 22 bytes +Server1: received 'Hello from UDP client1' from 127.0.0.1:%d +Server2: received 'Hello from UDP client2' from 127.0.0.1:%d +Server1: sent 22 bytes response +Server2: sent 22 bytes response +Client1: received 'Hello from UDP server1' +Client2: received 'Hello from UDP server2' +End concurrent UDP operations test \ No newline at end of file diff --git a/tests/stream/030-udp_timeout_operations.phpt b/tests/stream/030-udp_timeout_operations.phpt new file mode 100644 index 0000000..9395b84 --- /dev/null +++ b/tests/stream/030-udp_timeout_operations.phpt @@ -0,0 +1,68 @@ +--TEST-- +UDP timeout operations with stream_socket_recvfrom in async context +--FILE-- + +--EXPECTF-- +Start UDP timeout operations test +Server: creating UDP socket +Server: listening on udp://127.0.0.1:%d +Server: set timeout to 0.2 seconds +Server: waiting for UDP data (should timeout) +Server: operation timed out +End UDP timeout operations test \ No newline at end of file diff --git a/tests/stream/031-tcp_timeout_operations.phpt b/tests/stream/031-tcp_timeout_operations.phpt new file mode 100644 index 0000000..7938ab0 --- /dev/null +++ b/tests/stream/031-tcp_timeout_operations.phpt @@ -0,0 +1,113 @@ +--TEST-- +TCP timeout operations with fread/fwrite in async context +--FILE-- + +--EXPECTF-- +Start TCP timeout operations test +Server: creating TCP socket +Server: listening on tcp://127.0.0.1:%d +Server: waiting for client connection +Client: connecting to tcp://127.0.0.1:%d +Server: client connected +Client: connected to server +Server: set read timeout to 0.2 seconds +Server: reading from client (should timeout) +Server: read operation timed out +End TCP timeout operations test \ No newline at end of file diff --git a/tests/stream/ssl_test_cert.pem b/tests/stream/ssl_test_cert.pem new file mode 100644 index 0000000..d1bd8ac --- /dev/null +++ b/tests/stream/ssl_test_cert.pem @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDEzCCAfugAwIBAgIUM6QzqVbtcWFuFC3tUezaYP0p/WkwDQYJKoZIhvcNAQEL +BQAwGTEXMBUGA1UEAwwOYXN5bmMtdGVzdC5kZXYwHhcNMjUwOTEwMjAxNjQzWhcN +MjYwOTEwMjAxNjQzWjAZMRcwFQYDVQQDDA5hc3luYy10ZXN0LmRldjCCASIwDQYJ +KoZIhvcNAQEBBQADggEPADCCAQoCggEBALS+/rtv9RfG2G+JJrX3IVoTKWcHMrld +yn7qP0oFDW8epymmKjmmnCH7Y4t9i/3U6e+wPlpiO6xGcLOyRgqyv0X/FvDACtM1 +ymFXI2kIjySIcaa5yyESMFuR13xLXRqaYIiz68uK1ttjR4XFZADSIUC0QJ3S6caY +GwXBcUTOoPFxDPA5luB7gOSRavniGw/EU/ZC4FgV7qxo64CHbDZZBMWWlganPSh8 +DBO4CHQO5ZtoFlHMPktzHFZFDyZaZNhtuibqg8DNNW21YkfpGQWmgk3J2/3bGdoh +TQ9nGWQELndRi+0npGkVb5DXrRyz/ChlzhPlNjB2wPr2m6Xvz8y8Om0CAwEAAaNT +MFEwHQYDVR0OBBYEFN++t8je1cBxmZ72HtaSsQbAB4sJMB8GA1UdIwQYMBaAFN++ +t8je1cBxmZ72HtaSsQbAB4sJMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQEL +BQADggEBAG1+/NXvBiADSFigpYbGNPZfNsBLEOsIr7FyeIDVJm9wgf6HHUdRcif5 +O7iROqMzxoaxDkeNvma39VWcAVEaNqG+HVD+grRdWEvTqJT55hcTlCn/RSaTpPMB +QcgS2h/el+VlHMBo1MozD5+5XeNfyk1zGsU/YH4I1ffWc+uP8l68Vr8Li71e2Ldv +ZL8FITD5e3oKj5p2G9qb1bqadZqvGaPfHRgElk8MPDCGzHmJynN6d+W0gMltM9CP +KLueRgg/K677uCvGPJP3jjBqPr4FgpmnZXsLArzl9PiLrJJ/M6IDmKFLIv0Cu9Nf +uLR0cglXQ2Tq5SvmfIj03jS7R16Gy1U= +-----END CERTIFICATE----- \ No newline at end of file diff --git a/tests/stream/ssl_test_key.pem b/tests/stream/ssl_test_key.pem new file mode 100644 index 0000000..2471429 --- /dev/null +++ b/tests/stream/ssl_test_key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQC0vv67b/UXxthv +iSa19yFaEylnBzK5Xcp+6j9KBQ1vHqcppio5ppwh+2OLfYv91OnvsD5aYjusRnCz +skYKsr9F/xbwwArTNcphVyNpCI8kiHGmucshEjBbkdd8S10ammCIs+vLitbbY0eF +xWQA0iFAtECd0unGmBsFwXFEzqDxcQzwOZbge4DkkWr54hsPxFP2QuBYFe6saOuA +h2w2WQTFlpYGpz0ofAwTuAh0DuWbaBZRzD5LcxxWRQ8mWmTYbbom6oPAzTVttWJH +6RkFpoJNydv92xnaIU0PZxlkBC53UYvtJ6RpFW+Q160cs/woZc4T5TYwdsD69pul +78/MvDptAgMBAAECggEABtVBpBLNEHBAAz3YIAD1WqAAbLJLXjg7GfLMU1JyYF6n +fl39zET0M7fhQSDAVIEVs8eh7W2eFYE3bLoZbV+2KdtFVSZHD0nbtADE7rxk5o8R +9nYHq+ExwTerDqCt85eEFsCJvx3C/Nnf/LqqcS2AfFKHYgmoAaec14Opcirgz4yg +p6/ABbIShFT5Yn3EcdczBNrWlxsZ8tnKHpQFnwZwKh1eKf7dIRUbdGLUM50+fa7g +rL0cl9PdK7rBOR5LBjCZUv3DkW3GyuW1/2DFv0DTjIcrW6+YF/1ow9smlbYhIsei +QklBP7+vh63CUyMLEf6QHI458xGl9t9pDclmT7FPsQKBgQDoZCRIkfJsQomoHaMM +Fx+SQ2T6p0ke368PtRXo4D7o6nF0bXJ4uEPq4sFIQP+0xPOBY3MROP4PBDx1eV0H +sdffCfCnfISnoZKkUbvOQDFzbCiLtpDKTY71pS0A7gyU1N/2paozyxs3xl0GMksb +2l80ocfPWikqDP+F03nHlDbLCwKBgQDHG7SfftdBbz8hujHxVitnl89oAso5Eq8Y +WNHf7prP/8VMe4HlwZMAOH9+UIChsprGfAJo/JZtKoeNH9r2WokC1SfNovTLuSeG +zBB+GdX6Pdi1PiP6nfC+nCmkUPGvXO3hS0KNCARghpBux3fbkY5byhJ6yjGLI034 +Gx0+lM87ZwKBgHkwqB9URSkh9em/MuU+Nc+v57wzexVnr0Kwu/FK6GPMx0fhP74m +0fxvLj7A7tjVkOtb8oj7wLoSCnl0xggaPapp459kd0V4JCIfIaKopWE8+VQK7C0k +DzaZYgPHILaI4RceQ8lo1RPcFW0C01p+IgIvkCTZLvhn+OVQaISlDYILAoGAcFld +zjHQXIfdY7agv8ETtNygl9wbJ6E3U9Gqe2UzzfJQ7hsy7OYRgKpgpnHeY19Ynm8T +HRKJ/wdkfWlgMGpdrU+BqjMtVlcfypwTIlSJvS5wvbRWsO+2DJgplyJlfcI+KEZD +Qzkm3yCPFzNOmoLDhV+8lbTJx+0f7cO++LUXSjkCgYBwl8zEEmxtaFXh7MlB+bdp +oWyyjYBYi38ppg0LaJLt744KSX/SCwJm0lrBdAMS4KVnyLyLCrhkFpKZmfaLoiy2 +/+1X6hVUYCC+gys512sg3up+h0nbNp8eW8XCmqzDL3XS4r9CSRatelMdDpAmusnU +qEGp5GoqyrUfxJZ8BywxeQ== +-----END PRIVATE KEY----- \ No newline at end of file diff --git a/tests/stream/stream_helper.php b/tests/stream/stream_helper.php index f1510c1..b8202bb 100644 --- a/tests/stream/stream_helper.php +++ b/tests/stream/stream_helper.php @@ -2,7 +2,7 @@ /** * Cross-platform socket pair creation helper - * + * * @return array|false Array of two socket resources or false on failure */ function create_socket_pair() { @@ -13,4 +13,25 @@ function create_socket_pair() { // Unix/Linux - use UNIX domain with STREAM_IPPROTO_IP return stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); } +} + +/** + * Helper function to wait for server address with retry logic + * Makes tests more reliable by avoiding race conditions + * + * @param object $server_coroutine The server coroutine to get result from + * @param int $max_attempts Maximum number of retry attempts (default: 5) + * @param int $delay_ms Delay between attempts in milliseconds (default: 10) + * @return string Server address + * @throws Exception If server address cannot be obtained after max attempts + */ +function wait_for_server_address($server_coroutine, $max_attempts = 5, $delay_ms = 10) { + for ($attempts = 0; $attempts < $max_attempts; $attempts++) { + \Async\delay($delay_ms); + $address = $server_coroutine->getResult(); + if ($address) { + return $address; + } + } + throw new Exception("Failed to get server address after $max_attempts attempts"); } \ No newline at end of file diff --git a/tests/stream/tcp_client_disconnect.php b/tests/stream/tcp_client_disconnect.php new file mode 100644 index 0000000..11fb809 --- /dev/null +++ b/tests/stream/tcp_client_disconnect.php @@ -0,0 +1,42 @@ + + */ + +if ($argc < 2) { + echo "Usage: php tcp_client_disconnect.php \n"; + exit(1); +} + +$port = (int)$argv[1]; + +echo "Client process: connecting to port $port\n"; + +// Set appropriate timeout for different platforms +$timeout = (PHP_OS_FAMILY === 'Windows') ? 5 : 2; + +$client = stream_socket_client("tcp://127.0.0.1:$port", $errno, $errstr, $timeout); +if (!$client) { + echo "Client process: failed to connect: $errstr ($errno)\n"; + exit(1); +} + +echo "Client process: connected, sending data\n"; +fwrite($client, "Hello from external process\n"); +fflush($client); + +// Pause to simulate processing - longer on Windows for stability +$pause_time = (PHP_OS_FAMILY === 'Windows') ? 150000 : 100000; +usleep($pause_time); + +echo "Client process: closing connection abruptly\n"; +fclose($client); + +// On Windows, give extra time for cleanup +if (PHP_OS_FAMILY === 'Windows') { + usleep(50000); // 50ms additional cleanup time +} + +echo "Client process: exited\n"; +exit(0); \ No newline at end of file diff --git a/zend_common.c b/zend_common.c index 8a42c2e..533eb93 100644 --- a/zend_common.c +++ b/zend_common.c @@ -108,16 +108,19 @@ uint32_t zend_current_exception_get_line(void) zend_object *zend_exception_merge(zend_object *exception, bool to_previous, bool transfer_error) { - zend_exception_save(); - zend_exception_restore(); + zend_object **exception_ptr = &EG(exception); + zend_object **prev_exception_ptr = &EG(prev_exception); + + zend_exception_save_fast(exception_ptr, prev_exception_ptr); + zend_exception_restore_fast(exception_ptr, prev_exception_ptr); if (exception == NULL) { - exception = EG(exception); - EG(exception) = NULL; + exception = *exception_ptr; + *exception_ptr = NULL; return exception; } - if (EG(exception) == NULL) { + if (*exception_ptr == NULL) { return exception; } @@ -127,12 +130,12 @@ zend_object *zend_exception_merge(zend_object *exception, bool to_previous, bool if (false == transfer_error) { GC_ADDREF(exception); } - zend_exception_set_previous(EG(exception), exception); - exception = EG(exception); - EG(exception) = NULL; + zend_exception_set_previous(*exception_ptr, exception); + exception = *exception_ptr; + *exception_ptr = NULL; } else { - zend_exception_set_previous(exception, EG(exception)); - EG(exception) = NULL; + zend_exception_set_previous(exception, *exception_ptr); + *exception_ptr = NULL; } return exception;