Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 67 additions & 56 deletions src/loop.zig
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ fn report(comptime fmt: []const u8, args: anytype) void {
// I/O APIs based on async/await might be added in the future.
pub const SingleThreaded = struct {
alloc: std.mem.Allocator, // TODO: unmanaged version ?
io: *IO,
events_nb: *usize,
io: IO,
js_events_nb: usize,
zig_events_nb: usize,
cbk_error: bool = false,

// js_ctx_id is incremented each time the loop is reset for JS.
Expand All @@ -63,20 +64,25 @@ pub const SingleThreaded = struct {
pub const SendError = IO.SendError;

pub fn init(alloc: std.mem.Allocator) !Self {
const io = try alloc.create(IO);
errdefer alloc.destroy(io);

io.* = try IO.init(32, 0);
const events_nb = try alloc.create(usize);
events_nb.* = 0;
return Self{ .alloc = alloc, .io = io, .events_nb = events_nb };
return Self{
.alloc = alloc,
.io = try IO.init(32, 0),
.js_events_nb = 0,
.zig_events_nb = 0,
};
}

pub fn deinit(self: *Self) void {
// run tail events. We do run the tail events to ensure all the
// contexts are correcly free.
while (self.eventsNb(.js) > 0 or self.eventsNb(.zig) > 0) {
self.io.run_for_ns(10 * std.time.ns_per_ms) catch |err| {
log.err("deinit run tail events: {any}", .{err});
break;
};
}
self.cancelAll();
self.io.deinit();
self.alloc.destroy(self.io);
self.alloc.destroy(self.events_nb);
}

// Retrieve all registred I/O events completed by OS kernel,
Expand All @@ -85,7 +91,7 @@ pub const SingleThreaded = struct {
// Note that I/O events callbacks might register more I/O events
// on the go when they are executed (ie. nested I/O events).
pub fn run(self: *Self) !void {
while (self.eventsNb() > 0) {
while (self.eventsNb(.js) > 0) {
try self.io.run_for_ns(10 * std.time.ns_per_ms);
// at each iteration we might have new events registred by previous callbacks
}
Expand All @@ -98,21 +104,30 @@ pub const SingleThreaded = struct {
}
}

const Event = enum { js, zig };

fn eventsPtr(self: *Self, comptime event: Event) *usize {
return switch (event) {
.zig => &self.zig_events_nb,
.js => &self.js_events_nb,
};
}

// Register events atomically
// - add 1 event and return previous value
fn addEvent(self: *Self) usize {
return @atomicRmw(usize, self.events_nb, .Add, 1, .acq_rel);
fn addEvent(self: *Self, comptime event: Event) usize {
return @atomicRmw(usize, self.eventsPtr(event), .Add, 1, .acq_rel);
}
// - remove 1 event and return previous value
fn removeEvent(self: *Self) usize {
return @atomicRmw(usize, self.events_nb, .Sub, 1, .acq_rel);
fn removeEvent(self: *Self, comptime event: Event) usize {
return @atomicRmw(usize, self.eventsPtr(event), .Sub, 1, .acq_rel);
}
// - get the number of current events
fn eventsNb(self: *Self) usize {
return @atomicLoad(usize, self.events_nb, .seq_cst);
fn eventsNb(self: *Self, comptime event: Event) usize {
return @atomicLoad(usize, self.eventsPtr(event), .seq_cst);
}
fn resetEvents(self: *Self) void {
@atomicStore(usize, self.events_nb, 0, .unordered);
fn resetEvents(self: *Self, comptime event: Event) void {
@atomicStore(usize, self.eventsPtr(event), 0, .unordered);
}

fn freeCbk(self: *Self, completion: *IO.Completion, ctx: anytype) void {
Expand All @@ -136,18 +151,20 @@ pub const SingleThreaded = struct {
completion: *IO.Completion,
result: IO.TimeoutError!void,
) void {
defer ctx.loop.freeCbk(completion, ctx);
defer {
const old_events_nb = ctx.loop.removeEvent(.js);
if (builtin.is_test) {
report("timeout done, remaining events: {d}", .{old_events_nb - 1});
}

ctx.loop.freeCbk(completion, ctx);
}

// If the loop's context id has changed, don't call the js callback
// function. The callback's memory has already be cleaned and the
// events nb reset.
if (ctx.js_ctx_id != ctx.loop.js_ctx_id) return;

const old_events_nb = ctx.loop.removeEvent();
if (builtin.is_test) {
report("timeout done, remaining events: {d}", .{old_events_nb - 1});
}

// TODO: return the error to the callback
result catch |err| {
switch (err) {
Expand Down Expand Up @@ -175,7 +192,7 @@ pub const SingleThreaded = struct {
.js_cbk = js_cbk,
.js_ctx_id = self.js_ctx_id,
};
const old_events_nb = self.addEvent();
const old_events_nb = self.addEvent(.js);
self.io.timeout(*ContextTimeout, ctx, timeoutCallback, completion, nanoseconds);
if (builtin.is_test) {
report("start timeout {d} for {d} nanoseconds", .{ old_events_nb + 1, nanoseconds });
Expand All @@ -195,18 +212,20 @@ pub const SingleThreaded = struct {
completion: *IO.Completion,
result: IO.CancelOneError!void,
) void {
defer ctx.loop.freeCbk(completion, ctx);
defer {
const old_events_nb = ctx.loop.removeEvent(.js);
if (builtin.is_test) {
report("cancel done, remaining events: {d}", .{old_events_nb - 1});
}

ctx.loop.freeCbk(completion, ctx);
}

// If the loop's context id has changed, don't call the js callback
// function. The callback's memory has already be cleaned and the
// events nb reset.
if (ctx.js_ctx_id != ctx.loop.js_ctx_id) return;

const old_events_nb = ctx.loop.removeEvent();
if (builtin.is_test) {
report("cancel done, remaining events: {d}", .{old_events_nb - 1});
}

// TODO: return the error to the callback
result catch |err| {
switch (err) {
Expand Down Expand Up @@ -237,28 +256,27 @@ pub const SingleThreaded = struct {
.js_ctx_id = self.js_ctx_id,
};

const old_events_nb = self.addEvent();
const old_events_nb = self.addEvent(.js);
self.io.cancel_one(*ContextCancel, ctx, cancelCallback, completion, comp_cancel);
if (builtin.is_test) {
report("cancel {d}", .{old_events_nb + 1});
}
}

pub fn cancelAll(self: *Self) void {
self.resetEvents();
fn cancelAll(self: *Self) void {
self.resetEvents(.js);
self.resetEvents(.zig);
self.io.cancel_all();
}

// Reset all existing JS callbacks.
pub fn resetJS(self: *Self) void {
self.js_ctx_id += 1;
self.resetEvents();
}

// Reset all existing Zig callbacks.
pub fn resetZig(self: *Self) void {
self.zig_ctx_id += 1;
self.resetEvents();
}

// IO callbacks APIs
Expand All @@ -275,15 +293,15 @@ pub const SingleThreaded = struct {
socket: std.posix.socket_t,
address: std.net.Address,
) void {
const old_events_nb = self.addEvent();
const old_events_nb = self.addEvent(.js);
self.io.connect(*Ctx, ctx, cbk, completion, socket, address);
if (builtin.is_test) {
report("start connect {d}", .{old_events_nb + 1});
}
}

pub fn onConnect(self: *Self, _: ConnectError!void) void {
const old_events_nb = self.removeEvent();
const old_events_nb = self.removeEvent(.js);
if (builtin.is_test) {
report("connect done, remaining events: {d}", .{old_events_nb - 1});
}
Expand All @@ -300,15 +318,15 @@ pub const SingleThreaded = struct {
socket: std.posix.socket_t,
buf: []const u8,
) void {
const old_events_nb = self.addEvent();
const old_events_nb = self.addEvent(.js);
self.io.send(*Ctx, ctx, cbk, completion, socket, buf);
if (builtin.is_test) {
report("start send {d}", .{old_events_nb + 1});
}
}

pub fn onSend(self: *Self, _: SendError!usize) void {
const old_events_nb = self.removeEvent();
const old_events_nb = self.removeEvent(.js);
if (builtin.is_test) {
report("send done, remaining events: {d}", .{old_events_nb - 1});
}
Expand All @@ -325,15 +343,15 @@ pub const SingleThreaded = struct {
socket: std.posix.socket_t,
buf: []u8,
) void {
const old_events_nb = self.addEvent();
const old_events_nb = self.addEvent(.js);
self.io.recv(*Ctx, ctx, cbk, completion, socket, buf);
if (builtin.is_test) {
report("start recv {d}", .{old_events_nb + 1});
}
}

pub fn onRecv(self: *Self, _: RecvError!usize) void {
const old_events_nb = self.removeEvent();
const old_events_nb = self.removeEvent(.js);
if (builtin.is_test) {
report("recv done, remaining events: {d}", .{old_events_nb - 1});
}
Expand All @@ -356,19 +374,16 @@ pub const SingleThreaded = struct {
completion: *IO.Completion,
result: IO.TimeoutError!void,
) void {
defer ctx.loop.freeCbk(completion, ctx);
defer {
_ = ctx.loop.removeEvent(.zig);
ctx.loop.freeCbk(completion, ctx);
}

// If the loop's context id has changed, don't call the js callback
// function. The callback's memory has already be cleaned and the
// events nb reset.
if (ctx.zig_ctx_id != ctx.loop.zig_ctx_id) return;

// We don't remove event here b/c we don't want the main loop to wait for
// the timeout is done.
// This is mainly due b/c the usage of zigTimeout is used to process
// background tasks.
//_ = ctx.loop.removeEvent();

result catch |err| {
switch (err) {
error.Canceled => {},
Expand Down Expand Up @@ -403,11 +418,7 @@ pub const SingleThreaded = struct {
}.wrapper,
};

// We don't add event here b/c we don't want the main loop to wait for
// the timeout is done.
// This is mainly due b/c the usage of zigTimeout is used to process
// background tasks.
// _ = self.addEvent();
_ = self.addEvent(.zig);

self.io.timeout(*ContextZigTimeout, ctxtimeout, zigTimeoutCallback, completion, nanoseconds);
}
Expand Down