Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 56 additions & 55 deletions src/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

const std = @import("std");
const builtin = @import("builtin");

const jsruntime = @import("jsruntime");
const Completion = jsruntime.IO.Completion;
Expand All @@ -37,6 +38,7 @@ const Error = IOError || std.fmt.ParseIntError || cdp.Error || NoError;
const TimeoutCheck = std.time.ns_per_ms * 100;

const log = std.log.scoped(.server);
const IsLinux = builtin.target.os.tag == .linux;

// I/O Main
// --------
Expand All @@ -55,6 +57,7 @@ pub const Ctx = struct {
err: ?Error = null,

// I/O fields
accept_completion: *Completion,
conn_completion: *Completion,
timeout_completion: *Completion,
timeout: u64,
Expand All @@ -76,12 +79,14 @@ pub const Ctx = struct {
completion: *Completion,
result: AcceptError!std.posix.socket_t,
) void {
std.debug.assert(completion == self.conn_completion);
std.debug.assert(completion == self.acceptCompletion());

self.conn_socket = result catch |err| {
log.err("accept error: {any}", .{err});
self.err = err;
return;
};
log.info("client connected", .{});

// set connection timestamp and timeout
self.last_active = std.time.Instant.now() catch |err| {
Expand Down Expand Up @@ -111,6 +116,12 @@ pub const Ctx = struct {
std.debug.assert(completion == self.conn_completion);

const size = result catch |err| {
if (err == error.FileDescriptorInvalid and self.isClosed()) {
// connection has been closed, do nothing
log.debug("recv on closed conn", .{});
return;
}
log.err("read error: {any}", .{err});
self.err = err;
return;
};
Expand Down Expand Up @@ -169,6 +180,7 @@ pub const Ctx = struct {
std.debug.assert(completion == self.timeout_completion);

_ = result catch |err| {
log.err("timeout error: {any}", .{err});
self.err = err;
return;
};
Expand All @@ -185,21 +197,9 @@ pub const Ctx = struct {
};

if (now.since(self.last_active.?) > self.timeout) {
// closing
log.debug("conn timeout, closing...", .{});

// NOTE: we should cancel the current read
// but it seems that's just closing the connection is enough
// (and cancel does not work on MacOS)

// close current connection
self.loop.io.close(
*Ctx,
self,
Ctx.closeCbk,
self.timeout_completion,
self.conn_socket,
);
log.debug("conn timeout, closing...", .{});
self.close();
return;
}

Expand All @@ -213,38 +213,6 @@ pub const Ctx = struct {
);
}

fn closeCbk(self: *Ctx, completion: *Completion, result: CloseError!void) void {
_ = completion;
// NOTE: completion can be either self.conn_completion or self.timeout_completion

_ = result catch |err| {
self.err = err;
return;
};

// conn is closed
self.last_active = null;

// restart a new browser session in case of re-connect
if (!self.sessionNew) {
self.newSession() catch |err| {
log.err("new session error: {any}", .{err});
return;
};
}

log.info("accepting new conn...", .{});

// continue accepting incoming requests
self.loop.io.accept(
*Ctx,
self,
Ctx.acceptCbk,
self.conn_completion,
self.accept_socket,
);
}

// shortcuts
// ---------

Expand All @@ -263,6 +231,15 @@ pub const Ctx = struct {
return self.browser.session.env;
}

inline fn acceptCompletion(self: *Ctx) *Completion {
// NOTE: the logical completion to use here is the accept_completion
// as the pipe_connection can be used simulteanously by a recv I/O operation.
// But on MacOS (kqueue) the recv I/O operation on a closed socket leads to a panic
// so we use the pipe_connection to avoid this problem
if (IsLinux) return self.accept_completion;
return self.conn_completion;
}

// actions
// -------

Expand All @@ -272,13 +249,7 @@ pub const Ctx = struct {
if (std.mem.eql(u8, cmd, "close")) {
// close connection
log.info("close cmd, closing conn...", .{});
self.loop.io.close(
*Ctx,
self,
Ctx.closeCbk,
self.conn_completion,
self.conn_socket,
);
self.close();
return error.Closed;
}

Expand All @@ -303,6 +274,33 @@ pub const Ctx = struct {
}
}

fn close(self: *Ctx) void {
std.posix.close(self.conn_socket);

// conn is closed
log.info("connection closed", .{});
self.last_active = null;

// restart a new browser session in case of re-connect
if (!self.sessionNew) {
self.newSession() catch |err| {
log.err("new session error: {any}", .{err});
return;
};
}

log.info("accepting new conn...", .{});

// continue accepting incoming requests
self.loop.io.accept(
*Ctx,
self,
Ctx.acceptCbk,
self.acceptCompletion(),
self.accept_socket,
);
}

fn newSession(self: *Ctx) !void {
try self.browser.newSession(self.alloc(), self.loop);
try self.browser.session.initInspector(
Expand Down Expand Up @@ -388,6 +386,7 @@ const Send = struct {

fn asyncCbk(self: *Send, _: *Completion, result: SendError!usize) void {
_ = result catch |err| {
log.err("send error: {any}", .{err});
self.ctx.err = err;
return;
};
Expand Down Expand Up @@ -425,6 +424,7 @@ pub fn listen(
defer msg_buf.deinit(loop.alloc);

// create I/O completions
var accept_completion: Completion = undefined;
var conn_completion: Completion = undefined;
var timeout_completion: Completion = undefined;

Expand All @@ -438,6 +438,7 @@ pub fn listen(
.msg_buf = &msg_buf,
.accept_socket = server_socket,
.timeout = timeout,
.accept_completion = &accept_completion,
.conn_completion = &conn_completion,
.timeout_completion = &timeout_completion,
};
Expand All @@ -449,7 +450,7 @@ pub fn listen(

// accepting connection asynchronously on internal server
log.info("accepting new conn...", .{});
loop.io.accept(*Ctx, &ctx, Ctx.acceptCbk, ctx.conn_completion, ctx.accept_socket);
loop.io.accept(*Ctx, &ctx, Ctx.acceptCbk, ctx.acceptCompletion(), ctx.accept_socket);

// infinite loop on I/O events, either:
// - cmd from incoming connection on server socket
Expand Down