diff --git a/src/app.zig b/src/app.zig index 2501d8322..803c55f08 100644 --- a/src/app.zig +++ b/src/app.zig @@ -52,7 +52,8 @@ pub const App = struct { .telemetry = undefined, .app_dir_path = app_dir_path, .notification = notification, - .http_client = try HttpClient.init(allocator, 5, .{ + .http_client = try HttpClient.init(allocator, .{ + .max_concurrent = 3, .http_proxy = config.http_proxy, .tls_verify_host = config.tls_verify_host, }), diff --git a/src/browser/html/window.zig b/src/browser/html/window.zig index bb902ccf7..02381971c 100644 --- a/src/browser/html/window.zig +++ b/src/browser/html/window.zig @@ -163,50 +163,33 @@ pub const Window = struct { return &self.performance; } - // Tells the browser you wish to perform an animation. It requests the browser to call a user-supplied callback function before the next repaint. - // fn callback(timestamp: f64) - // Returns the request ID, that uniquely identifies the entry in the callback list. - pub fn _requestAnimationFrame( - self: *Window, - callback: Function, - ) !u32 { - // We immediately execute the callback, but this may not be correct TBD. - // Since: When multiple callbacks queued by requestAnimationFrame() begin to fire in a single frame, each receives the same timestamp even though time has passed during the computation of every previous callback's workload. - var result: Function.Result = undefined; - callback.tryCall(void, .{self.performance._now()}, &result) catch { - log.debug(.user_script, "callback error", .{ - .err = result.exception, - .stack = result.stack, - .source = "requestAnimationFrame", - }); - }; - return 99; // not unique, but user cannot make assumptions about it. cancelAnimationFrame will be too late anyway. + pub fn _requestAnimationFrame(self: *Window, cbk: Function, page: *Page) !u32 { + return self.createTimeout(cbk, 5, page, .{ .animation_frame = true }); } - // Cancels an animation frame request previously scheduled through requestAnimationFrame(). - // This is a no-op since _requestAnimationFrame immediately executes the callback. - pub fn _cancelAnimationFrame(_: *Window, request_id: u32) void { - _ = request_id; + pub fn _cancelAnimationFrame(self: *Window, id: u32, page: *Page) !void { + const kv = self.timers.fetchRemove(id) orelse return; + return page.loop.cancel(kv.value.loop_id); } // TODO handle callback arguments. pub fn _setTimeout(self: *Window, cbk: Function, delay: ?u32, page: *Page) !u32 { - return self.createTimeout(cbk, delay, page, false); + return self.createTimeout(cbk, delay, page, .{}); } // TODO handle callback arguments. pub fn _setInterval(self: *Window, cbk: Function, delay: ?u32, page: *Page) !u32 { - return self.createTimeout(cbk, delay, page, true); + return self.createTimeout(cbk, delay, page, .{ .repeat = true }); } pub fn _clearTimeout(self: *Window, id: u32, page: *Page) !void { const kv = self.timers.fetchRemove(id) orelse return; - try page.loop.cancel(kv.value.loop_id); + return page.loop.cancel(kv.value.loop_id); } pub fn _clearInterval(self: *Window, id: u32, page: *Page) !void { const kv = self.timers.fetchRemove(id) orelse return; - try page.loop.cancel(kv.value.loop_id); + return page.loop.cancel(kv.value.loop_id); } pub fn _matchMedia(_: *const Window, media: []const u8, page: *Page) !MediaQueryList { @@ -216,10 +199,14 @@ pub const Window = struct { }; } - fn createTimeout(self: *Window, cbk: Function, delay_: ?u32, page: *Page, comptime repeat: bool) !u32 { + const CreateTimeoutOpts = struct { + repeat: bool = false, + animation_frame: bool = false, + }; + fn createTimeout(self: *Window, cbk: Function, delay_: ?u32, page: *Page, comptime opts: CreateTimeoutOpts) !u32 { const delay = delay_ orelse 0; if (delay > 5000) { - log.warn(.user_script, "long timeout ignored", .{ .delay = delay, .interval = repeat }); + log.warn(.user_script, "long timeout ignored", .{ .delay = delay, .interval = opts.repeat }); // self.timer_id is u30, so the largest value we can generate is // 1_073_741_824. Returning 2_000_000_000 makes sure that clients // can call cancelTimer/cancelInterval without breaking anything. @@ -250,7 +237,8 @@ pub const Window = struct { .window = self, .timer_id = timer_id, .node = .{ .func = TimerCallback.run }, - .repeat = if (repeat) delay_ms else null, + .repeat = if (opts.repeat) delay_ms else null, + .animation_frame = opts.animation_frame, }; callback.loop_id = try page.loop.timeout(delay_ms, &callback.node); @@ -300,13 +288,23 @@ const TimerCallback = struct { // if the event should be repeated repeat: ?u63 = null, + animation_frame: bool = false, + window: *Window, fn run(node: *Loop.CallbackNode, repeat_delay: *?u63) void { const self: *TimerCallback = @fieldParentPtr("node", node); var result: Function.Result = undefined; - self.cbk.tryCall(void, .{}, &result) catch { + + var call: anyerror!void = undefined; + if (self.animation_frame) { + call = self.cbk.tryCall(void, .{self.window.performance._now()}, &result); + } else { + call = self.cbk.tryCall(void, .{}, &result); + } + + call catch { log.debug(.user_script, "callback error", .{ .err = result.exception, .stack = result.stack, diff --git a/src/browser/page.zig b/src/browser/page.zig index 83f1136b4..7044a4d9b 100644 --- a/src/browser/page.zig +++ b/src/browser/page.zig @@ -113,7 +113,9 @@ pub const Page = struct { .cookie_jar = &session.cookie_jar, .microtask_node = .{ .func = microtaskCallback }, .window_clicked_event_node = .{ .func = windowClicked }, - .request_factory = browser.http_client.requestFactory(browser.notification), + .request_factory = browser.http_client.requestFactory(.{ + .notification = browser.notification, + }), .scope = undefined, .module_map = .empty, }; @@ -205,58 +207,63 @@ pub const Page = struct { // redirect) self.url = request_url; - // load the data - var request = try self.newHTTPRequest(opts.method, &self.url, .{ .navigation = true }); - defer request.deinit(); - request.body = opts.body; - request.notification = notification; + { + // block exists to limit the lifetime of the request, which holds + // onto a connection + var request = try self.newHTTPRequest(opts.method, &self.url, .{ .navigation = true }); + defer request.deinit(); - notification.dispatch(.page_navigate, &.{ - .opts = opts, - .url = &self.url, - .timestamp = timestamp(), - }); + request.body = opts.body; + request.notification = notification; - var response = try request.sendSync(.{}); + notification.dispatch(.page_navigate, &.{ + .opts = opts, + .url = &self.url, + .timestamp = timestamp(), + }); - // would be different than self.url in the case of a redirect - self.url = try URL.fromURI(arena, request.request_uri); + var response = try request.sendSync(.{}); - const header = response.header; - try session.cookie_jar.populateFromResponse(&self.url.uri, &header); + // would be different than self.url in the case of a redirect + self.url = try URL.fromURI(arena, request.request_uri); - // TODO handle fragment in url. - try self.window.replaceLocation(.{ .url = try self.url.toWebApi(arena) }); + const header = response.header; + try session.cookie_jar.populateFromResponse(&self.url.uri, &header); - const content_type = header.get("content-type"); + // TODO handle fragment in url. + try self.window.replaceLocation(.{ .url = try self.url.toWebApi(arena) }); - const mime: Mime = blk: { - if (content_type) |ct| { - break :blk try Mime.parse(arena, ct); - } - break :blk Mime.sniff(try response.peek()); - } orelse .unknown; + const content_type = header.get("content-type"); - log.info(.http, "navigation", .{ - .status = header.status, - .content_type = content_type, - .charset = mime.charset, - .url = request_url, - }); + const mime: Mime = blk: { + if (content_type) |ct| { + break :blk try Mime.parse(arena, ct); + } + break :blk Mime.sniff(try response.peek()); + } orelse .unknown; + + log.info(.http, "navigation", .{ + .status = header.status, + .content_type = content_type, + .charset = mime.charset, + .url = request_url, + }); + + if (!mime.isHTML()) { + var arr: std.ArrayListUnmanaged(u8) = .{}; + while (try response.next()) |data| { + try arr.appendSlice(arena, try arena.dupe(u8, data)); + } + // save the body into the page. + self.raw_data = arr.items; + return; + } - if (mime.isHTML()) { - self.raw_data = null; try self.loadHTMLDoc(&response, mime.charset orelse "utf-8"); - try self.processHTMLDoc(); - } else { - var arr: std.ArrayListUnmanaged(u8) = .{}; - while (try response.next()) |data| { - try arr.appendSlice(arena, try arena.dupe(u8, data)); - } - // save the body into the page. - self.raw_data = arr.items; } + try self.processHTMLDoc(); + notification.dispatch(.page_navigated, &.{ .url = &self.url, .timestamp = timestamp(), diff --git a/src/browser/session.zig b/src/browser/session.zig index 44fce9d3d..5d550a5ec 100644 --- a/src/browser/session.zig +++ b/src/browser/session.zig @@ -72,7 +72,7 @@ pub const Session = struct { pub fn deinit(self: *Session) void { if (self.page != null) { - self.removePage(); + self.removePage() catch {}; } self.cookie_jar.deinit(); self.storage_shed.deinit(); @@ -104,14 +104,35 @@ pub const Session = struct { return page; } - pub fn removePage(self: *Session) void { + pub fn removePage(self: *Session) !void { // Inform CDP the page is going to be removed, allowing other worlds to remove themselves before the main one self.browser.notification.dispatch(.page_remove, .{}); std.debug.assert(self.page != null); - // Reset all existing callbacks. - self.browser.app.loop.reset(); + + // Cleanup is a bit sensitive. We could still have inflight I/O. For + // example, we could have an XHR request which is still in the connect + // phase. It's important that we clean these up, as they're holding onto + // limited resources (like our fixed-sized http state pool). + // + // First thing we do, is endScope() which will execute the destructor + // of any type that registered a destructor (e.g. XMLHttpRequest). + // This will shutdown any pending sockets, which begins our cleaning + // processed self.executor.endScope(); + + // Second thing we do is reset the loop. This increments the loop ctx_id + // so that any "stale" timeouts we process will get ignored. We need to + // do this BEFORE running the loop because, at this point, things like + // window.setTimeout and running microtasks should be ignored + self.browser.app.loop.reset(); + + // Finally, we run the loop. Because of the reset just above, this will + // ignore any timeouts. And, because of the endScope about this, it + // should ensure that the http requests detect the shutdown socket and + // release their resources. + try self.browser.app.loop.run(); + self.page = null; // clear netsurf memory arena. @@ -143,7 +164,7 @@ pub const Session = struct { // the final URL, possibly following redirects) const url = try self.page.?.url.resolve(self.transfer_arena, url_string); - self.removePage(); + try self.removePage(); var page = try self.createPage(); return page.navigate(url, opts); } diff --git a/src/browser/xhr/xhr.zig b/src/browser/xhr/xhr.zig index d65f9db83..7c1087a45 100644 --- a/src/browser/xhr/xhr.zig +++ b/src/browser/xhr/xhr.zig @@ -30,6 +30,7 @@ const Mime = @import("../mime.zig").Mime; const parser = @import("../netsurf.zig"); const http = @import("../../http/client.zig"); const Page = @import("../page.zig").Page; +const Loop = @import("../../runtime/loop.zig").Loop; const CookieJar = @import("../storage/storage.zig").CookieJar; // XHR interfaces @@ -78,6 +79,7 @@ const XMLHttpRequestBodyInit = union(enum) { pub const XMLHttpRequest = struct { proto: XMLHttpRequestEventTarget = XMLHttpRequestEventTarget{}, + loop: *Loop, arena: Allocator, request: ?*http.Request = null, @@ -91,6 +93,7 @@ pub const XMLHttpRequest = struct { sync: bool = true, err: ?anyerror = null, last_dispatch: i64 = 0, + request_body: ?[]const u8 = null, cookie_jar: *CookieJar, // the URI of the page where this request is originating from @@ -241,12 +244,13 @@ pub const XMLHttpRequest = struct { pub fn constructor(page: *Page) !XMLHttpRequest { const arena = page.arena; return .{ + .url = null, .arena = arena, + .loop = page.loop, .headers = Headers.init(arena), .response_headers = Headers.init(arena), .method = undefined, .state = .unsent, - .url = null, .origin_url = &page.url, .cookie_jar = page.cookie_jar, }; @@ -422,10 +426,23 @@ pub const XMLHttpRequest = struct { log.debug(.http, "request", .{ .method = self.method, .url = self.url, .source = "xhr" }); self.send_flag = true; + if (body) |b| { + self.request_body = try self.arena.dupe(u8, b); + } - self.request = try page.request_factory.create(self.method, &self.url.?.uri); - var request = self.request.?; - errdefer request.deinit(); + try page.request_factory.initAsync( + page.arena, + self.method, + &self.url.?.uri, + self, + onHttpRequestReady, + self.loop, + ); + } + + fn onHttpRequestReady(ctx: *anyopaque, request: *http.Request) !void { + // on error, our caller will cleanup request + const self: *XMLHttpRequest = @alignCast(@ptrCast(ctx)); for (self.headers.list.items) |hdr| { try request.addHeader(hdr.name, hdr.value, .{}); @@ -433,7 +450,7 @@ pub const XMLHttpRequest = struct { { var arr: std.ArrayListUnmanaged(u8) = .{}; - try self.cookie_jar.forRequest(&self.url.?.uri, arr.writer(page.arena), .{ + try self.cookie_jar.forRequest(&self.url.?.uri, arr.writer(self.arena), .{ .navigation = false, .origin_uri = &self.origin_url.uri, }); @@ -447,14 +464,15 @@ pub const XMLHttpRequest = struct { // if the request method is GET or HEAD. // https://xhr.spec.whatwg.org/#the-send()-method // var used_body: ?XMLHttpRequestBodyInit = null; - if (body) |b| { + if (self.request_body) |b| { if (self.method != .GET and self.method != .HEAD) { - request.body = try page.arena.dupe(u8, b); + request.body = b; try request.addHeader("Content-Type", "text/plain; charset=UTF-8", .{}); } } - try request.sendAsync(page.loop, self, .{}); + try request.sendAsync(self.loop, self, .{}); + self.request = request; } pub fn onHttpResponse(self: *XMLHttpRequest, progress_: anyerror!http.Progress) !void { @@ -522,7 +540,7 @@ pub const XMLHttpRequest = struct { log.info(.http, "request complete", .{ .source = "xhr", .url = self.url, - .status = progress.header.status, + .status = self.response_status, }); // Not that the request is done, the http/client will free the request diff --git a/src/cdp/domains/input.zig b/src/cdp/domains/input.zig index b6897dc24..170098ff8 100644 --- a/src/cdp/domains/input.zig +++ b/src/cdp/domains/input.zig @@ -90,7 +90,7 @@ fn clickNavigate(cmd: anytype, uri: std.Uri) !void { .disposition = "currentTab", }, .{ .session_id = bc.session_id.? }); - bc.session.removePage(); + try bc.session.removePage(); _ = try bc.session.createPage(null); try @import("page.zig").navigateToUrl(cmd, url, false); diff --git a/src/cdp/domains/target.zig b/src/cdp/domains/target.zig index f6c197ce6..77b46d0a0 100644 --- a/src/cdp/domains/target.zig +++ b/src/cdp/domains/target.zig @@ -220,7 +220,7 @@ fn closeTarget(cmd: anytype) !void { bc.session_id = null; } - bc.session.removePage(); + try bc.session.removePage(); if (bc.isolated_world) |*world| { world.deinit(); bc.isolated_world = null; diff --git a/src/http/client.zig b/src/http/client.zig index 81b11cd72..83536c4ec 100644 --- a/src/http/client.zig +++ b/src/http/client.zig @@ -54,16 +54,17 @@ pub const Client = struct { request_pool: std.heap.MemoryPool(Request), const Opts = struct { - tls_verify_host: bool = true, + max_concurrent: usize = 3, http_proxy: ?std.Uri = null, + tls_verify_host: bool = true, max_idle_connection: usize = 10, }; - pub fn init(allocator: Allocator, max_concurrent: usize, opts: Opts) !Client { + pub fn init(allocator: Allocator, opts: Opts) !Client { var root_ca: tls.config.CertBundle = if (builtin.is_test) .{} else try tls.config.CertBundle.fromSystem(allocator); errdefer root_ca.deinit(allocator); - const state_pool = try StatePool.init(allocator, max_concurrent); + const state_pool = try StatePool.init(allocator, opts.max_concurrent); errdefer state_pool.deinit(allocator); const connection_manager = ConnectionManager.init(allocator, opts.max_idle_connection); @@ -92,12 +93,61 @@ pub const Client = struct { } pub fn request(self: *Client, method: Request.Method, uri: *const Uri) !*Request { - const state = self.state_pool.acquire(); + const state = self.state_pool.acquireWait(); + errdefer self.state_pool.release(state); - errdefer { - state.reset(); - self.state_pool.release(state); - } + const req = try self.request_pool.create(); + errdefer self.request_pool.destroy(req); + + req.* = try Request.init(self, state, method, uri); + return req; + } + + pub fn initAsync( + self: *Client, + arena: Allocator, + method: Request.Method, + uri: *const Uri, + ctx: *anyopaque, + callback: AsyncQueue.Callback, + loop: *Loop, + opts: RequestOpts, + ) !void { + if (self.state_pool.acquireOrNull()) |state| { + // if we have state ready, we can skip the loop and immediately + // kick this request off. + return self.asyncRequestReady(method, uri, ctx, callback, state, opts); + } + + // This cannot be a client-owned MemoryPool. The page can end before + // this is ever completed (and the check callback will never be called). + // As long as the loop doesn't guarantee that callbacks will be called, + // this _has_ to be the page arena. + const queue = try arena.create(AsyncQueue); + queue.* = .{ + .ctx = ctx, + .uri = uri, + .opts = opts, + .client = self, + .method = method, + .callback = callback, + .node = .{ .func = AsyncQueue.check }, + }; + _ = try loop.timeout(10 * std.time.ns_per_ms, &queue.node); + } + + // Either called directly from initAsync (if we have a state ready) + // Or from when the AsyncQueue(T) is ready. + fn asyncRequestReady( + self: *Client, + method: Request.Method, + uri: *const Uri, + ctx: *anyopaque, + callback: AsyncQueue.Callback, + state: *State, + opts: RequestOpts, + ) !void { + errdefer self.state_pool.release(state); // We need the request on the heap, because it can have a longer lifetime // than the code making the request. That sounds odd, but consider the @@ -110,26 +160,78 @@ pub const Client = struct { errdefer self.request_pool.destroy(req); req.* = try Request.init(self, state, method, uri); - return req; + if (opts.notification) |notification| { + req.notification = notification; + } + + errdefer req.deinit(); + try callback(ctx, req); } - pub fn requestFactory(self: *Client, notification: ?*Notification) RequestFactory { + pub fn requestFactory(self: *Client, opts: RequestOpts) RequestFactory { return .{ + .opts = opts, .client = self, - .notification = notification, }; } }; +const RequestOpts = struct { + notification: ?*Notification = null, +}; + // A factory for creating requests with a given set of options. pub const RequestFactory = struct { client: *Client, - notification: ?*Notification, + opts: RequestOpts, - pub fn create(self: RequestFactory, method: Request.Method, uri: *const Uri) !*Request { - var req = try self.client.request(method, uri); - req.notification = self.notification; - return req; + pub fn initAsync( + self: RequestFactory, + arena: Allocator, + method: Request.Method, + uri: *const Uri, + ctx: *anyopaque, + callback: AsyncQueue.Callback, + loop: *Loop, + ) !void { + return self.client.initAsync(arena, method, uri, ctx, callback, loop, self.opts); + } +}; + +const AsyncQueue = struct { + ctx: *anyopaque, + method: Request.Method, + uri: *const Uri, + client: *Client, + opts: RequestOpts, + node: Loop.CallbackNode, + callback: Callback, + + const Callback = *const fn (*anyopaque, *Request) anyerror!void; + + fn check(node: *Loop.CallbackNode, repeat_delay: *?u63) void { + const self: *AsyncQueue = @fieldParentPtr("node", node); + self._check(repeat_delay) catch |err| { + log.err(.http_client, "async queue check", .{ .err = err }); + }; + } + + fn _check(self: *AsyncQueue, repeat_delay: *?u63) !void { + const client = self.client; + const state = client.state_pool.acquireOrNull() orelse { + // re-run this function in 10 milliseconds + repeat_delay.* = 10 * std.time.ns_per_ms; + return; + }; + + try client.asyncRequestReady( + self.method, + self.uri, + self.ctx, + self.callback, + state, + self.opts, + ); } }; @@ -321,7 +423,6 @@ pub const Request = struct { pub fn deinit(self: *Request) void { self.releaseConnection(); - _ = self._state.reset(); self._client.state_pool.release(self._state); self._client.request_pool.destroy(self); } @@ -576,6 +677,7 @@ pub const Request = struct { if (self._connection_from_keepalive) { // we're already connected + async_handler.pending_connect = false; return async_handler.conn.connected(); } @@ -814,6 +916,7 @@ fn AsyncHandler(comptime H: type, comptime L: type) type { shutdown: bool = false, pending_write: bool = false, pending_receive: bool = false, + pending_connect: bool = true, const Self = @This(); const SendQueue = std.DoublyLinkedList([]const u8); @@ -828,6 +931,7 @@ fn AsyncHandler(comptime H: type, comptime L: type) type { wait, done, need_more, + handler_error, }; fn deinit(self: *Self) void { @@ -837,10 +941,15 @@ fn AsyncHandler(comptime H: type, comptime L: type) type { fn abort(ctx: *anyopaque) void { var self: *Self = @alignCast(@ptrCast(ctx)); self.shutdown = true; + posix.shutdown(self.request._connection.?.socket, .both) catch {}; self.maybeShutdown(); } fn connected(self: *Self, _: *IO.Completion, result: IO.ConnectError!void) void { + self.pending_connect = false; + if (self.shutdown) { + return self.maybeShutdown(); + } result catch |err| return self.handleError("Connection failed", err); self.conn.connected() catch |err| { self.handleError("connected handler error", err); @@ -966,12 +1075,35 @@ fn AsyncHandler(comptime H: type, comptime L: type) type { switch (status) { .wait => {}, .need_more => self.receive(), + .handler_error => { + // handler should never have been called if we're redirecting + std.debug.assert(self.redirect == null); + self.request.requestCompleted(self.reader.response); + self.deinit(); + return; + }, .done => { const redirect = self.redirect orelse { + var handler = self.handler; self.request.requestCompleted(self.reader.response); self.deinit(); + + // Emit the done chunk. We expect the caller to do + // processing once the full request is completed. By + // emiting this AFTER we've relreased the connection, + // we free the connection and its state for re-use. + // If we don't do this this way, we can end up with + // _a lot_ of pending request/states. + // DO NOT USE `self` here, it's no longer valid. + handler.onHttpResponse(.{ + .data = null, + .done = true, + .first = false, + .header = .{}, + }) catch {}; return; }; + self.request.redirectAsync(redirect, self.loop, self.handler) catch |err| { self.handleError("Setup async redirect", err); return; @@ -984,7 +1116,7 @@ fn AsyncHandler(comptime H: type, comptime L: type) type { fn maybeShutdown(self: *Self) void { std.debug.assert(self.shutdown); - if (self.pending_write or self.pending_receive) { + if (self.pending_write or self.pending_receive or self.pending_connect) { return; } @@ -1113,25 +1245,26 @@ fn AsyncHandler(comptime H: type, comptime L: type) type { self.handleError("decompression error", err); return .done; }; + self.handler.onHttpResponse(.{ .data = chunk, .first = first, - .done = next == null, + .done = false, .header = reader.response, - }) catch return .done; + }) catch return .handler_error; first = false; } } - } else if (result.data != null or done or would_be_first) { + } else if (result.data != null or would_be_first) { // If we have data. Or if the request is done. Or if this is the // first time we have a complete header. Emit the chunk. self.handler.onHttpResponse(.{ - .done = done, + .done = false, .data = result.data, .first = would_be_first, .header = reader.response, - }) catch return .done; + }) catch return .handler_error; } if (done == true) { @@ -2322,7 +2455,7 @@ const State = struct { } fn reset(self: *State) void { - _ = self.arena.reset(.{ .retain_with_limit = 1024 * 1024 }); + _ = self.arena.reset(.{ .retain_with_limit = 64 * 1024 }); } fn deinit(self: *State) void { @@ -2375,10 +2508,11 @@ const StatePool = struct { allocator.free(self.states); } - pub fn acquire(self: *StatePool) *State { + pub fn acquireWait(self: *StatePool) *State { + const states = self.states; + self.mutex.lock(); while (true) { - const states = self.states; const available = self.available; if (available == 0) { self.cond.wait(&self.mutex); @@ -2392,13 +2526,33 @@ const StatePool = struct { } } - pub fn release(self: *StatePool, state: *State) void { + pub fn acquireOrNull(self: *StatePool) ?*State { + const states = self.states; + self.mutex.lock(); + defer self.mutex.unlock(); + + const available = self.available; + if (available == 0) { + return null; + } + + const index = available - 1; + const state = states[index]; + self.available = index; + return state; + } + + pub fn release(self: *StatePool, state: *State) void { + state.reset(); var states = self.states; + + self.mutex.lock(); const available = self.available; states[available] = state; self.available = available + 1; self.mutex.unlock(); + self.cond.signal(); } }; @@ -2799,11 +2953,19 @@ test "HttpClient: sync GET redirect" { } test "HttpClient: async connect error" { + defer testing.reset(); var loop = try Loop.init(testing.allocator); defer loop.deinit(); const Handler = struct { + loop: *Loop, reset: *Thread.ResetEvent, + + fn requestReady(ctx: *anyopaque, req: *Request) !void { + const self: *@This() = @alignCast(@ptrCast(ctx)); + try req.sendAsync(self.loop, self, .{}); + } + fn onHttpResponse(self: *@This(), res: anyerror!Progress) !void { _ = res catch |err| { if (err == error.ConnectionRefused) { @@ -2821,14 +2983,29 @@ test "HttpClient: async connect error" { var client = try testClient(); defer client.deinit(); + var handler = Handler{ + .loop = &loop, + .reset = &reset, + }; + const uri = try Uri.parse("HTTP://127.0.0.1:9920"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&loop, Handler{ .reset = &reset }, .{}); + try client.initAsync( + testing.arena_allocator, + .GET, + &uri, + &handler, + Handler.requestReady, + &loop, + .{}, + ); + try loop.io.run_for_ns(std.time.ns_per_ms); try reset.timedWait(std.time.ns_per_s); } test "HttpClient: async no body" { + defer testing.reset(); + var client = try testClient(); defer client.deinit(); @@ -2836,8 +3013,7 @@ test "HttpClient: async no body" { defer handler.deinit(); const uri = try Uri.parse("HTTP://127.0.0.1:9582/http_client/simple"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{}); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); try handler.waitUntilDone(); const res = handler.response; @@ -2847,6 +3023,8 @@ test "HttpClient: async no body" { } test "HttpClient: async with body" { + defer testing.reset(); + var client = try testClient(); defer client.deinit(); @@ -2854,8 +3032,7 @@ test "HttpClient: async with body" { defer handler.deinit(); const uri = try Uri.parse("HTTP://127.0.0.1:9582/http_client/echo"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{}); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); try handler.waitUntilDone(); const res = handler.response; @@ -2870,6 +3047,8 @@ test "HttpClient: async with body" { } test "HttpClient: async with gzip body" { + defer testing.reset(); + var client = try testClient(); defer client.deinit(); @@ -2877,8 +3056,7 @@ test "HttpClient: async with gzip body" { defer handler.deinit(); const uri = try Uri.parse("HTTP://127.0.0.1:9582/http_client/gzip"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{}); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); try handler.waitUntilDone(); const res = handler.response; @@ -2892,6 +3070,8 @@ test "HttpClient: async with gzip body" { } test "HttpClient: async redirect" { + defer testing.reset(); + var client = try testClient(); defer client.deinit(); @@ -2899,8 +3079,7 @@ test "HttpClient: async redirect" { defer handler.deinit(); const uri = try Uri.parse("HTTP://127.0.0.1:9582/http_client/redirect"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{}); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); // Called twice on purpose. The initial GET resutls in the # of pending // events to reach 0. This causes our `run_for_ns` to return. But we then @@ -2921,6 +3100,7 @@ test "HttpClient: async redirect" { } test "HttpClient: async tls no body" { + defer testing.reset(); var client = try testClient(); defer client.deinit(); for (0..5) |_| { @@ -2928,8 +3108,7 @@ test "HttpClient: async tls no body" { defer handler.deinit(); const uri = try Uri.parse("HTTPs://127.0.0.1:9581/http_client/simple"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{ .tls_verify_host = false }); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); try handler.waitUntilDone(); const res = handler.response; @@ -2945,6 +3124,7 @@ test "HttpClient: async tls no body" { } test "HttpClient: async tls with body x" { + defer testing.reset(); for (0..5) |_| { var client = try testClient(); defer client.deinit(); @@ -2953,8 +3133,7 @@ test "HttpClient: async tls with body x" { defer handler.deinit(); const uri = try Uri.parse("HTTPs://127.0.0.1:9581/http_client/body"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{ .tls_verify_host = false }); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); try handler.waitUntilDone(); const res = handler.response; @@ -2969,6 +3148,7 @@ test "HttpClient: async tls with body x" { } test "HttpClient: async redirect from TLS to Plaintext" { + defer testing.reset(); for (0..1) |_| { var client = try testClient(); defer client.deinit(); @@ -2977,8 +3157,7 @@ test "HttpClient: async redirect from TLS to Plaintext" { defer handler.deinit(); const uri = try Uri.parse("https://127.0.0.1:9581/http_client/redirect/insecure"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{ .tls_verify_host = false }); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); try handler.waitUntilDone(); const res = handler.response; @@ -2994,6 +3173,7 @@ test "HttpClient: async redirect from TLS to Plaintext" { } test "HttpClient: async redirect plaintext to TLS" { + defer testing.reset(); for (0..5) |_| { var client = try testClient(); defer client.deinit(); @@ -3002,8 +3182,7 @@ test "HttpClient: async redirect plaintext to TLS" { defer handler.deinit(); const uri = try Uri.parse("http://127.0.0.1:9582/http_client/redirect/secure"); - var req = try client.request(.GET, &uri); - try req.sendAsync(&handler.loop, &handler, .{ .tls_verify_host = false }); + try client.initAsync(testing.arena_allocator, .GET, &uri, &handler, CaptureHandler.requestReady, &handler.loop, .{}); try handler.waitUntilDone(); const res = handler.response; @@ -3125,6 +3304,11 @@ const CaptureHandler = struct { self.loop.deinit(); } + fn requestReady(ctx: *anyopaque, req: *Request) !void { + const self: *CaptureHandler = @alignCast(@ptrCast(ctx)); + try req.sendAsync(&self.loop, self, .{ .tls_verify_host = false }); + } + fn onHttpResponse(self: *CaptureHandler, progress_: anyerror!Progress) !void { self.process(progress_) catch |err| { std.debug.print("capture handler error: {}\n", .{err}); @@ -3135,7 +3319,8 @@ const CaptureHandler = struct { const progress = try progress_; const allocator = self.response.arena.allocator(); try self.response.body.appendSlice(allocator, progress.data orelse ""); - if (progress.done) { + if (progress.first) { + std.debug.assert(!progress.done); self.response.status = progress.header.status; try self.response.headers.ensureTotalCapacity(allocator, progress.header.headers.items.len); for (progress.header.headers.items) |header| { @@ -3144,6 +3329,9 @@ const CaptureHandler = struct { .value = try allocator.dupe(u8, header.value), }); } + } + + if (progress.done) { self.reset.set(); } } @@ -3202,5 +3390,5 @@ fn testReader(state: *State, res: *TestResponse, data: []const u8) !void { } fn testClient() !Client { - return try Client.init(testing.allocator, 1, .{}); + return try Client.init(testing.allocator, .{ .max_concurrent = 1 }); } diff --git a/src/runtime/loop.zig b/src/runtime/loop.zig index 0a954e189..a1af200ef 100644 --- a/src/runtime/loop.zig +++ b/src/runtime/loop.zig @@ -34,9 +34,11 @@ pub const Loop = struct { alloc: std.mem.Allocator, // TODO: unmanaged version ? io: IO, - // Used to track how many callbacks are to be called and wait until all - // event are finished. - events_nb: usize, + // number of pending network events we have + pending_network_count: usize, + + // number of pending timeout events we have + pending_timeout_count: usize, // Used to stop repeating timeouts when loop.run is called. stopping: bool, @@ -66,8 +68,9 @@ pub const Loop = struct { .alloc = alloc, .cancelled = .{}, .io = try IO.init(32, 0), - .events_nb = 0, .stopping = false, + .pending_network_count = 0, + .pending_timeout_count = 0, .timeout_pool = MemoryPool(ContextTimeout).init(alloc), .event_callback_pool = MemoryPool(EventCallbackContext).init(alloc), }; @@ -78,7 +81,7 @@ pub const Loop = struct { // run tail events. We do run the tail events to ensure all the // contexts are correcly free. - while (self.eventsNb() > 0) { + while (self.hasPendinEvents()) { self.io.run_for_ns(10 * std.time.ns_per_ms) catch |err| { log.err(.loop, "deinit", .{ .err = err }); break; @@ -93,6 +96,21 @@ pub const Loop = struct { self.cancelled.deinit(self.alloc); } + // We can shutdown once all the pending network IO is complete. + // In debug mode we also wait until al the pending timeouts are complete + // but we only do this so that the `timeoutCallback` can free all allocated + // memory and we won't report a leak. + fn hasPendinEvents(self: *const Self) bool { + if (self.pending_network_count > 0) { + return true; + } + + if (builtin.mode != .Debug) { + return false; + } + return self.pending_timeout_count > 0; + } + // Retrieve all registred I/O events completed by OS kernel, // and execute sequentially their callbacks. // Stops when there is no more I/O events registered on the loop. @@ -103,25 +121,12 @@ pub const Loop = struct { self.stopping = true; defer self.stopping = false; - while (self.eventsNb() > 0) { + while (self.pending_network_count > 0) { try self.io.run_for_ns(10 * std.time.ns_per_ms); // at each iteration we might have new events registred by previous callbacks } } - // Register events atomically - // - add 1 event and return previous value - fn addEvent(self: *Self) void { - _ = @atomicRmw(usize, &self.events_nb, .Add, 1, .acq_rel); - } - // - remove 1 event and return previous value - fn removeEvent(self: *Self) void { - _ = @atomicRmw(usize, &self.events_nb, .Sub, 1, .acq_rel); - } - // - get the number of current events - fn eventsNb(self: *Self) usize { - return @atomicLoad(usize, &self.events_nb, .seq_cst); - } // JS callbacks APIs // ----------------- @@ -152,7 +157,7 @@ pub const Loop = struct { const loop = ctx.loop; if (ctx.initial) { - loop.removeEvent(); + loop.pending_timeout_count -= 1; } defer { @@ -207,7 +212,7 @@ pub const Loop = struct { .callback_node = callback_node, }; - self.addEvent(); + self.pending_timeout_count += 1; self.scheduleTimeout(nanoseconds, ctx, completion); return @intFromPtr(completion); } @@ -244,17 +249,18 @@ pub const Loop = struct { ) !void { const onConnect = struct { fn onConnect(callback: *EventCallbackContext, completion_: *Completion, res: ConnectError!void) void { + callback.loop.pending_network_count -= 1; defer callback.loop.event_callback_pool.destroy(callback); - callback.loop.removeEvent(); cbk(@alignCast(@ptrCast(callback.ctx)), completion_, res); } }.onConnect; + const callback = try self.event_callback_pool.create(); errdefer self.event_callback_pool.destroy(callback); callback.* = .{ .loop = self, .ctx = ctx }; - self.addEvent(); + self.pending_network_count += 1; self.io.connect(*EventCallbackContext, callback, onConnect, completion, socket, address); } @@ -271,8 +277,8 @@ pub const Loop = struct { ) !void { const onSend = struct { fn onSend(callback: *EventCallbackContext, completion_: *Completion, res: SendError!usize) void { + callback.loop.pending_network_count -= 1; defer callback.loop.event_callback_pool.destroy(callback); - callback.loop.removeEvent(); cbk(@alignCast(@ptrCast(callback.ctx)), completion_, res); } }.onSend; @@ -281,7 +287,7 @@ pub const Loop = struct { errdefer self.event_callback_pool.destroy(callback); callback.* = .{ .loop = self, .ctx = ctx }; - self.addEvent(); + self.pending_network_count += 1; self.io.send(*EventCallbackContext, callback, onSend, completion, socket, buf); } @@ -298,8 +304,8 @@ pub const Loop = struct { ) !void { const onRecv = struct { fn onRecv(callback: *EventCallbackContext, completion_: *Completion, res: RecvError!usize) void { + callback.loop.pending_network_count -= 1; defer callback.loop.event_callback_pool.destroy(callback); - callback.loop.removeEvent(); cbk(@alignCast(@ptrCast(callback.ctx)), completion_, res); } }.onRecv; @@ -307,8 +313,7 @@ pub const Loop = struct { const callback = try self.event_callback_pool.create(); errdefer self.event_callback_pool.destroy(callback); callback.* = .{ .loop = self, .ctx = ctx }; - - self.addEvent(); + self.pending_network_count += 1; self.io.recv(*EventCallbackContext, callback, onRecv, completion, socket, buf); } };