Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 76 additions & 48 deletions src/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,29 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

const std = @import("std");
const builtin = @import("builtin");

const jsruntime = @import("jsruntime");
const Completion = jsruntime.IO.Completion;
const AcceptError = jsruntime.IO.AcceptError;
const RecvError = jsruntime.IO.RecvError;
const SendError = jsruntime.IO.SendError;
const CloseError = jsruntime.IO.CloseError;
const CancelError = jsruntime.IO.CancelError;
const TimeoutError = jsruntime.IO.TimeoutError;

const MsgBuffer = @import("msg.zig").MsgBuffer;
const Browser = @import("browser/browser.zig").Browser;
const cdp = @import("cdp/cdp.zig");

const NoError = error{NoError};
const IOError = AcceptError || RecvError || SendError || CloseError || TimeoutError;
const IOError = AcceptError || RecvError || SendError || CloseError || TimeoutError || CancelError;
const Error = IOError || std.fmt.ParseIntError || cdp.Error || NoError;

const TimeoutCheck = std.time.ns_per_ms * 100;

const log = std.log.scoped(.server);
const isLinux = builtin.target.os.tag == .linux;

// I/O Main
// --------
Expand All @@ -55,6 +58,7 @@ pub const Ctx = struct {
err: ?Error = null,

// I/O fields
accept_completion: *Completion,
conn_completion: *Completion,
timeout_completion: *Completion,
timeout: u64,
Expand All @@ -76,12 +80,14 @@ pub const Ctx = struct {
completion: *Completion,
result: AcceptError!std.posix.socket_t,
) void {
std.debug.assert(completion == self.conn_completion);
std.debug.assert(completion == self.acceptCompletion());

self.conn_socket = result catch |err| {
log.err("accept error: {any}", .{err});
self.err = err;
return;
};
log.info("client connected", .{});

// set connection timestamp and timeout
self.last_active = std.time.Instant.now() catch |err| {
Expand Down Expand Up @@ -111,6 +117,11 @@ pub const Ctx = struct {
std.debug.assert(completion == self.conn_completion);

const size = result catch |err| {
if (err == error.Canceled) {
log.debug("read canceled", .{});
return;
}
log.err("read error: {any}", .{err});
self.err = err;
return;
};
Expand Down Expand Up @@ -169,6 +180,7 @@ pub const Ctx = struct {
std.debug.assert(completion == self.timeout_completion);

_ = result catch |err| {
log.err("timeout error: {any}", .{err});
self.err = err;
return;
};
Expand All @@ -185,21 +197,9 @@ pub const Ctx = struct {
};

if (now.since(self.last_active.?) > self.timeout) {
// closing
log.debug("conn timeout, closing...", .{});

// NOTE: we should cancel the current read
// but it seems that's just closing the connection is enough
// (and cancel does not work on MacOS)

// close current connection
self.loop.io.close(
*Ctx,
self,
Ctx.closeCbk,
self.timeout_completion,
self.conn_socket,
);
log.debug("conn timeout, closing...", .{});
self.cancelAndClose();
return;
}

Expand All @@ -213,36 +213,17 @@ pub const Ctx = struct {
);
}

fn closeCbk(self: *Ctx, completion: *Completion, result: CloseError!void) void {
_ = completion;
// NOTE: completion can be either self.conn_completion or self.timeout_completion
fn cancelCbk(self: *Ctx, completion: *Completion, result: CancelError!void) void {
std.debug.assert(completion == self.accept_completion);

_ = result catch |err| {
log.err("cancel error: {any}", .{err});
self.err = err;
return;
};
log.debug("cancel done", .{});

// conn is closed
self.last_active = null;

// restart a new browser session in case of re-connect
if (!self.sessionNew) {
self.newSession() catch |err| {
log.err("new session error: {any}", .{err});
return;
};
}

log.info("accepting new conn...", .{});

// continue accepting incoming requests
self.loop.io.accept(
*Ctx,
self,
Ctx.acceptCbk,
self.conn_completion,
self.accept_socket,
);
self.close();
}

// shortcuts
Expand All @@ -263,6 +244,15 @@ pub const Ctx = struct {
return self.browser.session.env;
}

inline fn acceptCompletion(self: *Ctx) *Completion {
// NOTE: the logical completion to use here is the accept_completion
// as the pipe_connection can be used simulteanously by a recv I/O operation.
// But on MacOS (kqueue) the recv I/O operation on a closed socket leads to a panic
// so we use the pipe_connection to avoid this problem
if (isLinux) return self.accept_completion;
return self.conn_completion;
}

// actions
// -------

Expand All @@ -272,13 +262,7 @@ pub const Ctx = struct {
if (std.mem.eql(u8, cmd, "close")) {
// close connection
log.info("close cmd, closing conn...", .{});
self.loop.io.close(
*Ctx,
self,
Ctx.closeCbk,
self.conn_completion,
self.conn_socket,
);
self.cancelAndClose();
return error.Closed;
}

Expand All @@ -303,6 +287,47 @@ pub const Ctx = struct {
}
}

fn cancelAndClose(self: *Ctx) void {
if (isLinux) { // cancel is only available on Linux
self.loop.io.cancel(
*Ctx,
self,
Ctx.cancelCbk,
self.accept_completion,
self.conn_completion,
);
} else {
self.close();
}
}

fn close(self: *Ctx) void {
std.posix.close(self.conn_socket);

// conn is closed
log.debug("connection closed", .{});
self.last_active = null;

// restart a new browser session in case of re-connect
if (!self.sessionNew) {
self.newSession() catch |err| {
log.err("new session error: {any}", .{err});
return;
};
}

log.info("accepting new conn...", .{});

// continue accepting incoming requests
self.loop.io.accept(
*Ctx,
self,
Ctx.acceptCbk,
self.acceptCompletion(),
self.accept_socket,
);
}

fn newSession(self: *Ctx) !void {
try self.browser.newSession(self.alloc(), self.loop);
try self.browser.session.initInspector(
Expand Down Expand Up @@ -388,6 +413,7 @@ const Send = struct {

fn asyncCbk(self: *Send, _: *Completion, result: SendError!usize) void {
_ = result catch |err| {
log.err("send error: {any}", .{err});
self.ctx.err = err;
return;
};
Expand Down Expand Up @@ -425,6 +451,7 @@ pub fn listen(
defer msg_buf.deinit(loop.alloc);

// create I/O completions
var accept_completion: Completion = undefined;
var conn_completion: Completion = undefined;
var timeout_completion: Completion = undefined;

Expand All @@ -438,6 +465,7 @@ pub fn listen(
.msg_buf = &msg_buf,
.accept_socket = server_socket,
.timeout = timeout,
.accept_completion = &accept_completion,
.conn_completion = &conn_completion,
.timeout_completion = &timeout_completion,
};
Expand All @@ -449,7 +477,7 @@ pub fn listen(

// accepting connection asynchronously on internal server
log.info("accepting new conn...", .{});
loop.io.accept(*Ctx, &ctx, Ctx.acceptCbk, ctx.conn_completion, ctx.accept_socket);
loop.io.accept(*Ctx, &ctx, Ctx.acceptCbk, ctx.acceptCompletion(), ctx.accept_socket);

// infinite loop on I/O events, either:
// - cmd from incoming connection on server socket
Expand Down