Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **PDO PgSQL**: Non-blocking query execution for PostgreSQL PDO driver
- **PostgreSQL**: Concurrent `pg_*` query execution with separate connections per async context
- **`Async\iterate()` function**: Iterates over an iterable, calling the callback for each element with optional concurrency limit. Supports `cancelPending` parameter (default: `true`) that controls whether coroutines spawned inside the callback are cancelled or awaited after iteration completes.
- **`Async\watch_filesystem()` function: Watch filesystem changes with efficient event handling. Supports recursive watching, event filtering, and callback invocation with event details.
- **`Async\FileSystemWatcher` class**: Persistent filesystem watcher with `foreach` iteration support, suspend/resume on new events, two storage modes (coalesce with HashTable deduplication, raw with circular buffer), `close()`/`isClosed()` lifecycle, and `Awaitable` interface via `ZEND_ASYNC_EVENT_REF_FIELDS` pattern. Replaces the one-shot `Async\watch_filesystem()` function.

### Changed
- **Hidden Events**: Added `ZEND_ASYNC_EVENT_F_HIDDEN` flag for events excluded from deadlock detection
Expand Down
223 changes: 3 additions & 220 deletions async.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "channel.h"
#include "pool.h"
#include "task_group.h"
#include "fs_watcher.h"
#include "iterator.h"
#include "async_API.h"
#include "zend_enum.h"
Expand All @@ -42,7 +43,6 @@ zend_class_entry *async_ce_completable = NULL;
zend_class_entry *async_ce_timeout = NULL;
zend_class_entry *async_ce_circuit_breaker_state = NULL;
zend_class_entry *async_ce_circuit_breaker = NULL;
zend_class_entry *async_ce_filesystem_event = NULL;
zend_class_entry *async_ce_circuit_breaker_strategy = NULL;

///////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -980,220 +980,6 @@ PHP_FUNCTION(Async_exec)
}
*/

///////////////////////////////////////////////////////////////
/// watch_filesystem
///////////////////////////////////////////////////////////////

typedef struct {
zend_async_event_callback_t base;
zend_future_t *future;
zend_async_filesystem_event_t *fs_event;
zend_async_event_callback_t *cancel_cb;
} watch_fs_callback_t;

typedef struct {
zend_async_event_callback_t base;
zend_future_t *future;
zend_async_filesystem_event_t *fs_event;
watch_fs_callback_t *fs_cb;
zend_object *cancellation;
} watch_fs_cancel_callback_t;

static void watch_fs_callback_dispose(zend_async_event_callback_t *callback, zend_async_event_t *event)
{
efree(callback);
}

static void watch_fs_cancel_callback_dispose(zend_async_event_callback_t *callback, zend_async_event_t *event)
{
watch_fs_cancel_callback_t *cb = (watch_fs_cancel_callback_t *)callback;
zend_object *cancellation = cb->cancellation;
cb->cancellation = NULL;
if (cancellation) {
OBJ_RELEASE(cancellation);
}
efree(cb);
}

static void watch_fs_cleanup(zend_async_filesystem_event_t *fs_event)
{
if (EXPECTED(!ZEND_ASYNC_EVENT_IS_CLOSED(&fs_event->base))) {
fs_event->base.stop(&fs_event->base);
}
fs_event->base.dispose(&fs_event->base);
}

static void watch_fs_on_cancel(
zend_async_event_t *event, zend_async_event_callback_t *callback, void *result, zend_object *exception)
{
const watch_fs_cancel_callback_t *cb = (watch_fs_cancel_callback_t *)callback;
zend_future_t *future = cb->future;
zend_async_filesystem_event_t *fs_event = cb->fs_event;

/* Detach cross-reference */
if (cb->fs_cb != NULL) {
cb->fs_cb->cancel_cb = NULL;
}

if (ZEND_ASYNC_EVENT_IS_CLOSED(&future->event)) {
return;
}

if (exception != NULL) {
ZEND_FUTURE_REJECT(future, exception);
} else {
zend_object *cancel_ex = async_new_exception(
async_ce_cancellation_exception, "Filesystem watch cancelled");
ZEND_FUTURE_REJECT(future, cancel_ex);
zend_object_release(cancel_ex);
}

watch_fs_cleanup(fs_event);
}

static void watch_fs_on_event(
zend_async_event_t *event, zend_async_event_callback_t *callback, void *result, zend_object *exception)
{
const watch_fs_callback_t *cb = (watch_fs_callback_t *)callback;
zend_future_t *future = cb->future;
zend_async_filesystem_event_t *fs_event = cb->fs_event;

/* Remove cancellation callback if registered */
if (cb->cancel_cb != NULL) {
watch_fs_cancel_callback_t *cancel = (watch_fs_cancel_callback_t *)cb->cancel_cb;
cancel->fs_cb = NULL;
}

if (UNEXPECTED(ZEND_ASYNC_EVENT_IS_CLOSED(&future->event))) {
goto cleanup;
}

if (UNEXPECTED(exception != NULL)) {
ZEND_FUTURE_REJECT(future, exception);
goto cleanup;
}

/* Create FileSystemEvent object and set readonly properties via property slots */
zval event_obj;
object_init_ex(&event_obj, async_ce_filesystem_event);
zend_object *obj = Z_OBJ(event_obj);

/* Property slot 0: path */
ZVAL_STR_COPY(OBJ_PROP_NUM(obj, 0), fs_event->path);

/* Property slot 1: filename */
if (fs_event->triggered_filename != NULL) {
ZVAL_STR_COPY(OBJ_PROP_NUM(obj, 1), fs_event->triggered_filename);
} else {
ZVAL_NULL(OBJ_PROP_NUM(obj, 1));
}

/* Property slot 2: renamed */
ZVAL_BOOL(OBJ_PROP_NUM(obj, 2), (fs_event->triggered_events & UV_RENAME) != 0);

/* Property slot 3: changed */
ZVAL_BOOL(OBJ_PROP_NUM(obj, 3), (fs_event->triggered_events & UV_CHANGE) != 0);

ZEND_FUTURE_COMPLETE(future, &event_obj);
zval_ptr_dtor(&event_obj);

cleanup:
watch_fs_cleanup(fs_event);
}

PHP_FUNCTION(Async_watch_filesystem)
{
zend_string *path = NULL;
bool recursive = false;
zend_object *cancellation = NULL;

ZEND_PARSE_PARAMETERS_START(1, 3)
Z_PARAM_STR(path)
Z_PARAM_OPTIONAL
Z_PARAM_BOOL(recursive)
Z_PARAM_OBJ_OF_CLASS_OR_NULL(cancellation, async_ce_completable)
ZEND_PARSE_PARAMETERS_END();

SCHEDULER_LAUNCH;

const unsigned int flags = recursive ? UV_FS_EVENT_RECURSIVE : 0;

zend_async_filesystem_event_t *fs_event = ZEND_ASYNC_NEW_FILESYSTEM_EVENT(path, flags);

if (UNEXPECTED(fs_event == NULL)) {
RETURN_THROWS();
}

/* Create the future */
zend_future_t *future = ZEND_ASYNC_NEW_FUTURE(false);

if (UNEXPECTED(future == NULL)) {
fs_event->base.dispose(&fs_event->base);
RETURN_THROWS();
}

/* Create and register fs event callback */
watch_fs_callback_t *cb = ecalloc(1, sizeof(watch_fs_callback_t));
cb->base.ref_count = 0;
cb->base.callback = watch_fs_on_event;
cb->base.dispose = watch_fs_callback_dispose;
cb->future = future;
cb->fs_event = fs_event;
cb->cancel_cb = NULL;

fs_event->base.add_callback(&fs_event->base, &cb->base);

/* Register cancellation callback */
if (cancellation != NULL) {
zend_async_event_t *cancel_event = ZEND_ASYNC_OBJECT_TO_EVENT(cancellation);

if (UNEXPECTED(ZEND_ASYNC_EVENT_IS_CLOSED(cancel_event))) {
/* Already cancelled — reject immediately */
zend_object *cancel_ex = async_new_exception(
async_ce_cancellation_exception, "Filesystem watch cancelled");
ZEND_FUTURE_REJECT(future, cancel_ex);
zend_object_release(cancel_ex);
fs_event->base.dispose(&fs_event->base);
RETURN_OBJ(ZEND_ASYNC_NEW_FUTURE_OBJ(future));
}

watch_fs_cancel_callback_t *cancel_cb = ecalloc(1, sizeof(watch_fs_cancel_callback_t));
cancel_cb->base.ref_count = 0;
cancel_cb->base.callback = watch_fs_on_cancel;
cancel_cb->base.dispose = watch_fs_cancel_callback_dispose;
cancel_cb->future = future;
cancel_cb->fs_event = fs_event;
cancel_cb->fs_cb = cb;
cancel_cb->cancellation = cancellation;
GC_ADDREF(cancellation);

cb->cancel_cb = &cancel_cb->base;

cancel_event->add_callback(cancel_event, &cancel_cb->base);
}

/* Start watching */
if (UNEXPECTED(!fs_event->base.start(&fs_event->base))) {
fs_event->base.dispose(&fs_event->base);
ZEND_FUTURE_SET_USED(future);
future->event.dispose(&future->event);
RETURN_THROWS();
}

if (cancellation != NULL) {
zend_async_event_t *cancel_event = ZEND_ASYNC_OBJECT_TO_EVENT(cancellation);

if (UNEXPECTED(false == cancel_event->start(cancel_event))) {
fs_event->base.dispose(&fs_event->base);
ZEND_FUTURE_SET_USED(future);
future->event.dispose(&future->event);
RETURN_THROWS();
}
}

RETURN_OBJ(ZEND_ASYNC_NEW_FUTURE_OBJ(future));
}

PHP_METHOD(Async_Timeout, __construct)
{
async_throw_error("Timeout cannot be constructed directly");
Expand Down Expand Up @@ -1248,10 +1034,7 @@ void async_register_awaitable_ce(void)
async_ce_completable = register_class_Async_Completable(async_ce_awaitable);
}

void async_register_filesystem_event_ce(void)
{
async_ce_filesystem_event = register_class_Async_FileSystemEvent();
}


void async_register_circuit_breaker_ce(void)
{
Expand Down Expand Up @@ -1408,7 +1191,7 @@ ZEND_MINIT_FUNCTION(async)
async_register_context_ce();
async_register_exceptions_ce();
async_register_channel_ce();
async_register_filesystem_event_ce();
async_register_fs_watcher_ce();
async_register_circuit_breaker_ce();
async_register_pool_ce();
async_register_task_group_ce();
Expand Down
21 changes: 0 additions & 21 deletions async.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -121,27 +121,6 @@ function exec(
): Future {}
*/

/**
* Represents a filesystem event detected by watch_filesystem().
*
* @strict-properties
* @not-serializable
*/
final readonly class FileSystemEvent
{
public string $path;
public ?string $filename;
public bool $renamed;
public bool $changed;
}

/**
* Watch a file or directory for filesystem changes.
* Returns a Future that resolves with a FileSystemEvent on the first detected change.
*
* @return Future<FileSystemEvent>
*/
function watch_filesystem(string $path, bool $recursive = false, ?Completable $cancellation = null): Future {}

/**
* Circuit breaker states.
Expand Down
40 changes: 0 additions & 40 deletions async_arginfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,6 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_Async_iterate, 0, 2, IS_VOID, 0)
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, cancelPending, _IS_BOOL, 0, "true")
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_Async_watch_filesystem, 0, 1, Async\\Future, 0)
ZEND_ARG_TYPE_INFO(0, path, IS_STRING, 0)
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, recursive, _IS_BOOL, 0, "false")
ZEND_ARG_OBJ_INFO_WITH_DEFAULT_VALUE(0, cancellation, Async\\Completable, 1, "null")
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_Async_graceful_shutdown, 0, 0, IS_VOID, 0)
ZEND_ARG_OBJ_INFO_WITH_DEFAULT_VALUE(0, cancellationError, Async\\AsyncCancellation, 1, "null")
Expand Down Expand Up @@ -154,7 +149,6 @@ ZEND_FUNCTION(Async_current_coroutine);
ZEND_FUNCTION(Async_root_context);
ZEND_FUNCTION(Async_get_coroutines);
ZEND_FUNCTION(Async_iterate);
ZEND_FUNCTION(Async_watch_filesystem);
ZEND_FUNCTION(Async_graceful_shutdown);
ZEND_METHOD(Async_Timeout, __construct);
ZEND_METHOD(Async_Timeout, cancel);
Expand All @@ -181,7 +175,6 @@ static const zend_function_entry ext_functions[] = {
ZEND_RAW_FENTRY(ZEND_NS_NAME("Async", "root_context"), zif_Async_root_context, arginfo_Async_root_context, 0, NULL, NULL)
ZEND_RAW_FENTRY(ZEND_NS_NAME("Async", "get_coroutines"), zif_Async_get_coroutines, arginfo_Async_get_coroutines, 0, NULL, NULL)
ZEND_RAW_FENTRY(ZEND_NS_NAME("Async", "iterate"), zif_Async_iterate, arginfo_Async_iterate, 0, NULL, NULL)
ZEND_RAW_FENTRY(ZEND_NS_NAME("Async", "watch_filesystem"), zif_Async_watch_filesystem, arginfo_Async_watch_filesystem, 0, NULL, NULL)
ZEND_RAW_FENTRY(ZEND_NS_NAME("Async", "graceful_shutdown"), zif_Async_graceful_shutdown, arginfo_Async_graceful_shutdown, 0, NULL, NULL)
ZEND_FE_END
};
Expand Down Expand Up @@ -248,39 +241,6 @@ static zend_class_entry *register_class_Async_Timeout(zend_class_entry *class_en
return class_entry;
}

static zend_class_entry *register_class_Async_FileSystemEvent(void)
{
zend_class_entry ce, *class_entry;

INIT_NS_CLASS_ENTRY(ce, "Async", "FileSystemEvent", NULL);
class_entry = zend_register_internal_class_with_flags(&ce, NULL, ZEND_ACC_FINAL|ZEND_ACC_READONLY_CLASS|ZEND_ACC_NO_DYNAMIC_PROPERTIES);

zval property_path_default_value;
ZVAL_UNDEF(&property_path_default_value);
zend_string *property_path_name = zend_string_init("path", sizeof("path") - 1, 1);
zend_declare_typed_property(class_entry, property_path_name, &property_path_default_value, ZEND_ACC_PUBLIC|ZEND_ACC_READONLY, NULL, (zend_type) ZEND_TYPE_INIT_MASK(MAY_BE_STRING));
zend_string_release(property_path_name);

zval property_filename_default_value;
ZVAL_UNDEF(&property_filename_default_value);
zend_string *property_filename_name = zend_string_init("filename", sizeof("filename") - 1, 1);
zend_declare_typed_property(class_entry, property_filename_name, &property_filename_default_value, ZEND_ACC_PUBLIC|ZEND_ACC_READONLY, NULL, (zend_type) ZEND_TYPE_INIT_MASK(MAY_BE_STRING|MAY_BE_NULL));
zend_string_release(property_filename_name);

zval property_renamed_default_value;
ZVAL_UNDEF(&property_renamed_default_value);
zend_string *property_renamed_name = zend_string_init("renamed", sizeof("renamed") - 1, 1);
zend_declare_typed_property(class_entry, property_renamed_name, &property_renamed_default_value, ZEND_ACC_PUBLIC|ZEND_ACC_READONLY, NULL, (zend_type) ZEND_TYPE_INIT_MASK(MAY_BE_BOOL));
zend_string_release(property_renamed_name);

zval property_changed_default_value;
ZVAL_UNDEF(&property_changed_default_value);
zend_string *property_changed_name = zend_string_init("changed", sizeof("changed") - 1, 1);
zend_declare_typed_property(class_entry, property_changed_name, &property_changed_default_value, ZEND_ACC_PUBLIC|ZEND_ACC_READONLY, NULL, (zend_type) ZEND_TYPE_INIT_MASK(MAY_BE_BOOL));
zend_string_release(property_changed_name);

return class_entry;
}

static zend_class_entry *register_class_Async_CircuitBreakerState(void)
{
Expand Down
4 changes: 2 additions & 2 deletions config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ if test "$PHP_ASYNC" = "yes"; then
dnl Register extension source files.
PHP_NEW_EXTENSION([async],
[async.c coroutine.c scope.c scheduler.c exceptions.c iterator.c async_API.c \
context.c libuv_reactor.c future.c channel.c pool.c task_group.c \
context.c libuv_reactor.c future.c channel.c pool.c task_group.c fs_watcher.c \
internal/allocator.c internal/circular_buffer.c \
zend_common.c],
$ext_shared)

dnl Optionally install headers (if desired for public use).
PHP_INSTALL_HEADERS([ext/async],
[php_async.h coroutine.h scope.h scheduler.h exceptions.h iterator.h async_API.h context.h future.h channel.h pool.h task_group.h])
[php_async.h coroutine.h scope.h scheduler.h exceptions.h iterator.h async_API.h context.h future.h channel.h pool.h task_group.h fs_watcher.h])


AC_PATH_PROG(PKG_CONFIG, pkg-config, no)
Expand Down
3 changes: 2 additions & 1 deletion config.w32
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ARG_ENABLE('async', 'Enable True Async', 'no');

if (PHP_ASYNC == "yes") {

EXTENSION("async", "async.c coroutine.c scope.c scheduler.c exceptions.c iterator.c async_API.c zend_common.c context.c libuv_reactor.c future.c channel.c pool.c task_group.c");
EXTENSION("async", "async.c coroutine.c scope.c scheduler.c exceptions.c iterator.c async_API.c zend_common.c context.c libuv_reactor.c future.c channel.c pool.c task_group.c fs_watcher.c");
ADD_SOURCES("ext/async/internal", "allocator.c circular_buffer.c");

ADD_FLAG("CFLAGS", "/D PHP_ASYNC");
Expand All @@ -23,6 +23,7 @@ if (PHP_ASYNC == "yes") {
PHP_INSTALL_HEADERS("ext/async", "channel.h");
PHP_INSTALL_HEADERS("ext/async", "pool.h");
PHP_INSTALL_HEADERS("ext/async", "task_group.h");
PHP_INSTALL_HEADERS("ext/async", "fs_watcher.h");

if (CHECK_HEADER_ADD_INCLUDE("libuv/uv.h", "CFLAGS_UV", PHP_PHP_BUILD + "\\include")
&& CHECK_LIB("libuv.lib", "libuv")) {
Expand Down
Loading
Loading