diff --git a/src/loop.zig b/src/loop.zig index ac27100..c9cf26e 100644 --- a/src/loop.zig +++ b/src/loop.zig @@ -39,8 +39,9 @@ fn report(comptime fmt: []const u8, args: anytype) void { // I/O APIs based on async/await might be added in the future. pub const SingleThreaded = struct { alloc: std.mem.Allocator, // TODO: unmanaged version ? - io: *IO, - events_nb: *usize, + io: IO, + js_events_nb: usize, + zig_events_nb: usize, cbk_error: bool = false, // js_ctx_id is incremented each time the loop is reset for JS. @@ -63,20 +64,25 @@ pub const SingleThreaded = struct { pub const SendError = IO.SendError; pub fn init(alloc: std.mem.Allocator) !Self { - const io = try alloc.create(IO); - errdefer alloc.destroy(io); - - io.* = try IO.init(32, 0); - const events_nb = try alloc.create(usize); - events_nb.* = 0; - return Self{ .alloc = alloc, .io = io, .events_nb = events_nb }; + return Self{ + .alloc = alloc, + .io = try IO.init(32, 0), + .js_events_nb = 0, + .zig_events_nb = 0, + }; } pub fn deinit(self: *Self) void { + // run tail events. We do run the tail events to ensure all the + // contexts are correcly free. + while (self.eventsNb(.js) > 0 or self.eventsNb(.zig) > 0) { + self.io.run_for_ns(10 * std.time.ns_per_ms) catch |err| { + log.err("deinit run tail events: {any}", .{err}); + break; + }; + } self.cancelAll(); self.io.deinit(); - self.alloc.destroy(self.io); - self.alloc.destroy(self.events_nb); } // Retrieve all registred I/O events completed by OS kernel, @@ -85,7 +91,7 @@ pub const SingleThreaded = struct { // Note that I/O events callbacks might register more I/O events // on the go when they are executed (ie. nested I/O events). pub fn run(self: *Self) !void { - while (self.eventsNb() > 0) { + while (self.eventsNb(.js) > 0) { try self.io.run_for_ns(10 * std.time.ns_per_ms); // at each iteration we might have new events registred by previous callbacks } @@ -98,21 +104,30 @@ pub const SingleThreaded = struct { } } + const Event = enum { js, zig }; + + fn eventsPtr(self: *Self, comptime event: Event) *usize { + return switch (event) { + .zig => &self.zig_events_nb, + .js => &self.js_events_nb, + }; + } + // Register events atomically // - add 1 event and return previous value - fn addEvent(self: *Self) usize { - return @atomicRmw(usize, self.events_nb, .Add, 1, .acq_rel); + fn addEvent(self: *Self, comptime event: Event) usize { + return @atomicRmw(usize, self.eventsPtr(event), .Add, 1, .acq_rel); } // - remove 1 event and return previous value - fn removeEvent(self: *Self) usize { - return @atomicRmw(usize, self.events_nb, .Sub, 1, .acq_rel); + fn removeEvent(self: *Self, comptime event: Event) usize { + return @atomicRmw(usize, self.eventsPtr(event), .Sub, 1, .acq_rel); } // - get the number of current events - fn eventsNb(self: *Self) usize { - return @atomicLoad(usize, self.events_nb, .seq_cst); + fn eventsNb(self: *Self, comptime event: Event) usize { + return @atomicLoad(usize, self.eventsPtr(event), .seq_cst); } - fn resetEvents(self: *Self) void { - @atomicStore(usize, self.events_nb, 0, .unordered); + fn resetEvents(self: *Self, comptime event: Event) void { + @atomicStore(usize, self.eventsPtr(event), 0, .unordered); } fn freeCbk(self: *Self, completion: *IO.Completion, ctx: anytype) void { @@ -136,18 +151,20 @@ pub const SingleThreaded = struct { completion: *IO.Completion, result: IO.TimeoutError!void, ) void { - defer ctx.loop.freeCbk(completion, ctx); + defer { + const old_events_nb = ctx.loop.removeEvent(.js); + if (builtin.is_test) { + report("timeout done, remaining events: {d}", .{old_events_nb - 1}); + } + + ctx.loop.freeCbk(completion, ctx); + } // If the loop's context id has changed, don't call the js callback // function. The callback's memory has already be cleaned and the // events nb reset. if (ctx.js_ctx_id != ctx.loop.js_ctx_id) return; - const old_events_nb = ctx.loop.removeEvent(); - if (builtin.is_test) { - report("timeout done, remaining events: {d}", .{old_events_nb - 1}); - } - // TODO: return the error to the callback result catch |err| { switch (err) { @@ -175,7 +192,7 @@ pub const SingleThreaded = struct { .js_cbk = js_cbk, .js_ctx_id = self.js_ctx_id, }; - const old_events_nb = self.addEvent(); + const old_events_nb = self.addEvent(.js); self.io.timeout(*ContextTimeout, ctx, timeoutCallback, completion, nanoseconds); if (builtin.is_test) { report("start timeout {d} for {d} nanoseconds", .{ old_events_nb + 1, nanoseconds }); @@ -195,18 +212,20 @@ pub const SingleThreaded = struct { completion: *IO.Completion, result: IO.CancelOneError!void, ) void { - defer ctx.loop.freeCbk(completion, ctx); + defer { + const old_events_nb = ctx.loop.removeEvent(.js); + if (builtin.is_test) { + report("cancel done, remaining events: {d}", .{old_events_nb - 1}); + } + + ctx.loop.freeCbk(completion, ctx); + } // If the loop's context id has changed, don't call the js callback // function. The callback's memory has already be cleaned and the // events nb reset. if (ctx.js_ctx_id != ctx.loop.js_ctx_id) return; - const old_events_nb = ctx.loop.removeEvent(); - if (builtin.is_test) { - report("cancel done, remaining events: {d}", .{old_events_nb - 1}); - } - // TODO: return the error to the callback result catch |err| { switch (err) { @@ -237,28 +256,27 @@ pub const SingleThreaded = struct { .js_ctx_id = self.js_ctx_id, }; - const old_events_nb = self.addEvent(); + const old_events_nb = self.addEvent(.js); self.io.cancel_one(*ContextCancel, ctx, cancelCallback, completion, comp_cancel); if (builtin.is_test) { report("cancel {d}", .{old_events_nb + 1}); } } - pub fn cancelAll(self: *Self) void { - self.resetEvents(); + fn cancelAll(self: *Self) void { + self.resetEvents(.js); + self.resetEvents(.zig); self.io.cancel_all(); } // Reset all existing JS callbacks. pub fn resetJS(self: *Self) void { self.js_ctx_id += 1; - self.resetEvents(); } // Reset all existing Zig callbacks. pub fn resetZig(self: *Self) void { self.zig_ctx_id += 1; - self.resetEvents(); } // IO callbacks APIs @@ -275,7 +293,7 @@ pub const SingleThreaded = struct { socket: std.posix.socket_t, address: std.net.Address, ) void { - const old_events_nb = self.addEvent(); + const old_events_nb = self.addEvent(.js); self.io.connect(*Ctx, ctx, cbk, completion, socket, address); if (builtin.is_test) { report("start connect {d}", .{old_events_nb + 1}); @@ -283,7 +301,7 @@ pub const SingleThreaded = struct { } pub fn onConnect(self: *Self, _: ConnectError!void) void { - const old_events_nb = self.removeEvent(); + const old_events_nb = self.removeEvent(.js); if (builtin.is_test) { report("connect done, remaining events: {d}", .{old_events_nb - 1}); } @@ -300,7 +318,7 @@ pub const SingleThreaded = struct { socket: std.posix.socket_t, buf: []const u8, ) void { - const old_events_nb = self.addEvent(); + const old_events_nb = self.addEvent(.js); self.io.send(*Ctx, ctx, cbk, completion, socket, buf); if (builtin.is_test) { report("start send {d}", .{old_events_nb + 1}); @@ -308,7 +326,7 @@ pub const SingleThreaded = struct { } pub fn onSend(self: *Self, _: SendError!usize) void { - const old_events_nb = self.removeEvent(); + const old_events_nb = self.removeEvent(.js); if (builtin.is_test) { report("send done, remaining events: {d}", .{old_events_nb - 1}); } @@ -325,7 +343,7 @@ pub const SingleThreaded = struct { socket: std.posix.socket_t, buf: []u8, ) void { - const old_events_nb = self.addEvent(); + const old_events_nb = self.addEvent(.js); self.io.recv(*Ctx, ctx, cbk, completion, socket, buf); if (builtin.is_test) { report("start recv {d}", .{old_events_nb + 1}); @@ -333,7 +351,7 @@ pub const SingleThreaded = struct { } pub fn onRecv(self: *Self, _: RecvError!usize) void { - const old_events_nb = self.removeEvent(); + const old_events_nb = self.removeEvent(.js); if (builtin.is_test) { report("recv done, remaining events: {d}", .{old_events_nb - 1}); } @@ -356,19 +374,16 @@ pub const SingleThreaded = struct { completion: *IO.Completion, result: IO.TimeoutError!void, ) void { - defer ctx.loop.freeCbk(completion, ctx); + defer { + _ = ctx.loop.removeEvent(.zig); + ctx.loop.freeCbk(completion, ctx); + } // If the loop's context id has changed, don't call the js callback // function. The callback's memory has already be cleaned and the // events nb reset. if (ctx.zig_ctx_id != ctx.loop.zig_ctx_id) return; - // We don't remove event here b/c we don't want the main loop to wait for - // the timeout is done. - // This is mainly due b/c the usage of zigTimeout is used to process - // background tasks. - //_ = ctx.loop.removeEvent(); - result catch |err| { switch (err) { error.Canceled => {}, @@ -403,11 +418,7 @@ pub const SingleThreaded = struct { }.wrapper, }; - // We don't add event here b/c we don't want the main loop to wait for - // the timeout is done. - // This is mainly due b/c the usage of zigTimeout is used to process - // background tasks. - // _ = self.addEvent(); + _ = self.addEvent(.zig); self.io.timeout(*ContextZigTimeout, ctxtimeout, zigTimeoutCallback, completion, nanoseconds); }