From 52c26a2d5ae8c14f7bdd521617385bd68ee3f051 Mon Sep 17 00:00:00 2001 From: Pierre Tachoire Date: Fri, 28 Mar 2025 12:33:42 +0100 Subject: [PATCH 1/3] loop: ensure all contexts are free on loop's deinit We add a new zig event count and a loop during deinit to ensure all contexts are correcly free. --- src/loop.zig | 126 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 76 insertions(+), 50 deletions(-) diff --git a/src/loop.zig b/src/loop.zig index ac27100..defcb8d 100644 --- a/src/loop.zig +++ b/src/loop.zig @@ -40,7 +40,8 @@ fn report(comptime fmt: []const u8, args: anytype) void { pub const SingleThreaded = struct { alloc: std.mem.Allocator, // TODO: unmanaged version ? io: *IO, - events_nb: *usize, + js_events_nb: *usize, + zig_events_nb: *usize, cbk_error: bool = false, // js_ctx_id is incremented each time the loop is reset for JS. @@ -67,16 +68,36 @@ pub const SingleThreaded = struct { errdefer alloc.destroy(io); io.* = try IO.init(32, 0); - const events_nb = try alloc.create(usize); - events_nb.* = 0; - return Self{ .alloc = alloc, .io = io, .events_nb = events_nb }; + + const js_events_nb = try alloc.create(usize); + js_events_nb.* = 0; + const zig_events_nb = try alloc.create(usize); + zig_events_nb.* = 0; + + return Self{ + .alloc = alloc, + .io = io, + .js_events_nb = js_events_nb, + .zig_events_nb = zig_events_nb, + }; } pub fn deinit(self: *Self) void { + // run tail events. We do run the tail events to ensure all the + // contexts are correcly free. + while (self.eventsNb(.js) > 0 or self.eventsNb(.zig) > 0) { + self.io.run_for_ns(10 * std.time.ns_per_ms) catch |err| { + log.err("deinit run tail events: {any}", .{err}); + break; + }; + } + self.cancelAll(); + self.io.deinit(); self.alloc.destroy(self.io); - self.alloc.destroy(self.events_nb); + self.alloc.destroy(self.js_events_nb); + self.alloc.destroy(self.zig_events_nb); } // Retrieve all registred I/O events completed by OS kernel, @@ -85,7 +106,7 @@ pub const SingleThreaded = struct { // Note that I/O events callbacks might register more I/O events // on the go when they are executed (ie. nested I/O events). pub fn run(self: *Self) !void { - while (self.eventsNb() > 0) { + while (self.eventsNb(.js) > 0) { try self.io.run_for_ns(10 * std.time.ns_per_ms); // at each iteration we might have new events registred by previous callbacks } @@ -98,21 +119,30 @@ pub const SingleThreaded = struct { } } + const Event = enum { js, zig }; + + fn eventsPtr(self: *const Self, event: Event) *usize { + return switch (event) { + .zig => self.zig_events_nb, + .js => self.js_events_nb, + }; + } + // Register events atomically // - add 1 event and return previous value - fn addEvent(self: *Self) usize { - return @atomicRmw(usize, self.events_nb, .Add, 1, .acq_rel); + fn addEvent(self: *Self, event: Event) usize { + return @atomicRmw(usize, self.eventsPtr(event), .Add, 1, .acq_rel); } // - remove 1 event and return previous value - fn removeEvent(self: *Self) usize { - return @atomicRmw(usize, self.events_nb, .Sub, 1, .acq_rel); + fn removeEvent(self: *Self, event: Event) usize { + return @atomicRmw(usize, self.eventsPtr(event), .Sub, 1, .acq_rel); } // - get the number of current events - fn eventsNb(self: *Self) usize { - return @atomicLoad(usize, self.events_nb, .seq_cst); + fn eventsNb(self: *Self, event: Event) usize { + return @atomicLoad(usize, self.eventsPtr(event), .seq_cst); } - fn resetEvents(self: *Self) void { - @atomicStore(usize, self.events_nb, 0, .unordered); + fn resetEvents(self: *Self, event: Event) void { + @atomicStore(usize, self.eventsPtr(event), 0, .unordered); } fn freeCbk(self: *Self, completion: *IO.Completion, ctx: anytype) void { @@ -136,18 +166,20 @@ pub const SingleThreaded = struct { completion: *IO.Completion, result: IO.TimeoutError!void, ) void { - defer ctx.loop.freeCbk(completion, ctx); + defer { + const old_events_nb = ctx.loop.removeEvent(.js); + if (builtin.is_test) { + report("timeout done, remaining events: {d}", .{old_events_nb - 1}); + } + + ctx.loop.freeCbk(completion, ctx); + } // If the loop's context id has changed, don't call the js callback // function. The callback's memory has already be cleaned and the // events nb reset. if (ctx.js_ctx_id != ctx.loop.js_ctx_id) return; - const old_events_nb = ctx.loop.removeEvent(); - if (builtin.is_test) { - report("timeout done, remaining events: {d}", .{old_events_nb - 1}); - } - // TODO: return the error to the callback result catch |err| { switch (err) { @@ -175,7 +207,7 @@ pub const SingleThreaded = struct { .js_cbk = js_cbk, .js_ctx_id = self.js_ctx_id, }; - const old_events_nb = self.addEvent(); + const old_events_nb = self.addEvent(.js); self.io.timeout(*ContextTimeout, ctx, timeoutCallback, completion, nanoseconds); if (builtin.is_test) { report("start timeout {d} for {d} nanoseconds", .{ old_events_nb + 1, nanoseconds }); @@ -195,18 +227,20 @@ pub const SingleThreaded = struct { completion: *IO.Completion, result: IO.CancelOneError!void, ) void { - defer ctx.loop.freeCbk(completion, ctx); + defer { + const old_events_nb = ctx.loop.removeEvent(.js); + if (builtin.is_test) { + report("cancel done, remaining events: {d}", .{old_events_nb - 1}); + } + + ctx.loop.freeCbk(completion, ctx); + } // If the loop's context id has changed, don't call the js callback // function. The callback's memory has already be cleaned and the // events nb reset. if (ctx.js_ctx_id != ctx.loop.js_ctx_id) return; - const old_events_nb = ctx.loop.removeEvent(); - if (builtin.is_test) { - report("cancel done, remaining events: {d}", .{old_events_nb - 1}); - } - // TODO: return the error to the callback result catch |err| { switch (err) { @@ -237,28 +271,27 @@ pub const SingleThreaded = struct { .js_ctx_id = self.js_ctx_id, }; - const old_events_nb = self.addEvent(); + const old_events_nb = self.addEvent(.js); self.io.cancel_one(*ContextCancel, ctx, cancelCallback, completion, comp_cancel); if (builtin.is_test) { report("cancel {d}", .{old_events_nb + 1}); } } - pub fn cancelAll(self: *Self) void { - self.resetEvents(); + fn cancelAll(self: *Self) void { + self.resetEvents(.js); + self.resetEvents(.zig); self.io.cancel_all(); } // Reset all existing JS callbacks. pub fn resetJS(self: *Self) void { self.js_ctx_id += 1; - self.resetEvents(); } // Reset all existing Zig callbacks. pub fn resetZig(self: *Self) void { self.zig_ctx_id += 1; - self.resetEvents(); } // IO callbacks APIs @@ -275,7 +308,7 @@ pub const SingleThreaded = struct { socket: std.posix.socket_t, address: std.net.Address, ) void { - const old_events_nb = self.addEvent(); + const old_events_nb = self.addEvent(.js); self.io.connect(*Ctx, ctx, cbk, completion, socket, address); if (builtin.is_test) { report("start connect {d}", .{old_events_nb + 1}); @@ -283,7 +316,7 @@ pub const SingleThreaded = struct { } pub fn onConnect(self: *Self, _: ConnectError!void) void { - const old_events_nb = self.removeEvent(); + const old_events_nb = self.removeEvent(.js); if (builtin.is_test) { report("connect done, remaining events: {d}", .{old_events_nb - 1}); } @@ -300,7 +333,7 @@ pub const SingleThreaded = struct { socket: std.posix.socket_t, buf: []const u8, ) void { - const old_events_nb = self.addEvent(); + const old_events_nb = self.addEvent(.js); self.io.send(*Ctx, ctx, cbk, completion, socket, buf); if (builtin.is_test) { report("start send {d}", .{old_events_nb + 1}); @@ -308,7 +341,7 @@ pub const SingleThreaded = struct { } pub fn onSend(self: *Self, _: SendError!usize) void { - const old_events_nb = self.removeEvent(); + const old_events_nb = self.removeEvent(.js); if (builtin.is_test) { report("send done, remaining events: {d}", .{old_events_nb - 1}); } @@ -325,7 +358,7 @@ pub const SingleThreaded = struct { socket: std.posix.socket_t, buf: []u8, ) void { - const old_events_nb = self.addEvent(); + const old_events_nb = self.addEvent(.js); self.io.recv(*Ctx, ctx, cbk, completion, socket, buf); if (builtin.is_test) { report("start recv {d}", .{old_events_nb + 1}); @@ -333,7 +366,7 @@ pub const SingleThreaded = struct { } pub fn onRecv(self: *Self, _: RecvError!usize) void { - const old_events_nb = self.removeEvent(); + const old_events_nb = self.removeEvent(.js); if (builtin.is_test) { report("recv done, remaining events: {d}", .{old_events_nb - 1}); } @@ -356,19 +389,16 @@ pub const SingleThreaded = struct { completion: *IO.Completion, result: IO.TimeoutError!void, ) void { - defer ctx.loop.freeCbk(completion, ctx); + defer { + _ = ctx.loop.removeEvent(.zig); + ctx.loop.freeCbk(completion, ctx); + } // If the loop's context id has changed, don't call the js callback // function. The callback's memory has already be cleaned and the // events nb reset. if (ctx.zig_ctx_id != ctx.loop.zig_ctx_id) return; - // We don't remove event here b/c we don't want the main loop to wait for - // the timeout is done. - // This is mainly due b/c the usage of zigTimeout is used to process - // background tasks. - //_ = ctx.loop.removeEvent(); - result catch |err| { switch (err) { error.Canceled => {}, @@ -403,11 +433,7 @@ pub const SingleThreaded = struct { }.wrapper, }; - // We don't add event here b/c we don't want the main loop to wait for - // the timeout is done. - // This is mainly due b/c the usage of zigTimeout is used to process - // background tasks. - // _ = self.addEvent(); + _ = self.addEvent(.zig); self.io.timeout(*ContextZigTimeout, ctxtimeout, zigTimeoutCallback, completion, nanoseconds); } From 8f9be2d338d4daf544d5d0d270887ba5a30f3e17 Mon Sep 17 00:00:00 2001 From: Pierre Tachoire Date: Fri, 28 Mar 2025 12:36:10 +0100 Subject: [PATCH 2/3] loop: remove useless pointers --- src/loop.zig | 33 +++++++++------------------------ 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/src/loop.zig b/src/loop.zig index defcb8d..b360ac0 100644 --- a/src/loop.zig +++ b/src/loop.zig @@ -39,9 +39,9 @@ fn report(comptime fmt: []const u8, args: anytype) void { // I/O APIs based on async/await might be added in the future. pub const SingleThreaded = struct { alloc: std.mem.Allocator, // TODO: unmanaged version ? - io: *IO, - js_events_nb: *usize, - zig_events_nb: *usize, + io: IO, + js_events_nb: usize, + zig_events_nb: usize, cbk_error: bool = false, // js_ctx_id is incremented each time the loop is reset for JS. @@ -64,21 +64,11 @@ pub const SingleThreaded = struct { pub const SendError = IO.SendError; pub fn init(alloc: std.mem.Allocator) !Self { - const io = try alloc.create(IO); - errdefer alloc.destroy(io); - - io.* = try IO.init(32, 0); - - const js_events_nb = try alloc.create(usize); - js_events_nb.* = 0; - const zig_events_nb = try alloc.create(usize); - zig_events_nb.* = 0; - return Self{ .alloc = alloc, - .io = io, - .js_events_nb = js_events_nb, - .zig_events_nb = zig_events_nb, + .io = try IO.init(32, 0), + .js_events_nb = 0, + .zig_events_nb = 0, }; } @@ -91,13 +81,8 @@ pub const SingleThreaded = struct { break; }; } - self.cancelAll(); - self.io.deinit(); - self.alloc.destroy(self.io); - self.alloc.destroy(self.js_events_nb); - self.alloc.destroy(self.zig_events_nb); } // Retrieve all registred I/O events completed by OS kernel, @@ -121,10 +106,10 @@ pub const SingleThreaded = struct { const Event = enum { js, zig }; - fn eventsPtr(self: *const Self, event: Event) *usize { + fn eventsPtr(self: *Self, event: Event) *usize { return switch (event) { - .zig => self.zig_events_nb, - .js => self.js_events_nb, + .zig => &self.zig_events_nb, + .js => &self.js_events_nb, }; } From 84a2d05f40ce92160d87e9d6bc66c45f9de55513 Mon Sep 17 00:00:00 2001 From: Pierre Tachoire Date: Fri, 28 Mar 2025 13:22:49 +0100 Subject: [PATCH 3/3] loop: use comptime known event --- src/loop.zig | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/loop.zig b/src/loop.zig index b360ac0..c9cf26e 100644 --- a/src/loop.zig +++ b/src/loop.zig @@ -106,7 +106,7 @@ pub const SingleThreaded = struct { const Event = enum { js, zig }; - fn eventsPtr(self: *Self, event: Event) *usize { + fn eventsPtr(self: *Self, comptime event: Event) *usize { return switch (event) { .zig => &self.zig_events_nb, .js => &self.js_events_nb, @@ -115,18 +115,18 @@ pub const SingleThreaded = struct { // Register events atomically // - add 1 event and return previous value - fn addEvent(self: *Self, event: Event) usize { + fn addEvent(self: *Self, comptime event: Event) usize { return @atomicRmw(usize, self.eventsPtr(event), .Add, 1, .acq_rel); } // - remove 1 event and return previous value - fn removeEvent(self: *Self, event: Event) usize { + fn removeEvent(self: *Self, comptime event: Event) usize { return @atomicRmw(usize, self.eventsPtr(event), .Sub, 1, .acq_rel); } // - get the number of current events - fn eventsNb(self: *Self, event: Event) usize { + fn eventsNb(self: *Self, comptime event: Event) usize { return @atomicLoad(usize, self.eventsPtr(event), .seq_cst); } - fn resetEvents(self: *Self, event: Event) void { + fn resetEvents(self: *Self, comptime event: Event) void { @atomicStore(usize, self.eventsPtr(event), 0, .unordered); }