Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Outputs without inputs #19

Open
rhoot opened this issue Oct 13, 2023 · 5 comments
Open

Outputs without inputs #19

rhoot opened this issue Oct 13, 2023 · 5 comments

Comments

@rhoot
Copy link

rhoot commented Oct 13, 2023

I was hoping to use this as a simple way to handle some simple communication, but the problem is I need to be able to generate outputs without just responding to input (but still accept inputs). Unless I've missed something, that seems to not be possible with the current API?

It would be nice if I could somehow either hook into the Conn's readLoop, or write my own loop that periodically calls into the Conn to receive incoming data.

@karlseguin
Copy link
Owner

We're talking about the client API? You can use client.write outside of the readLoop. So you client.write whenever you have to (outputs) and you answer messages in you handle (inputs).

@rhoot
Copy link
Author

rhoot commented Oct 13, 2023

No, I've so far only looked at the server.

@rhoot
Copy link
Author

rhoot commented Oct 13, 2023

To give a better idea, the specific problem I'm trying to solve is where input can take a long time to produce the response for, but that should progressively return outputs as they become available. Ideally when there's no remaining output it can go back to blocking reads.

Essentially I'd want something like this in pseudo-code:

while (true) {
    const use_blocking_read = !hasRemainingWork();
    enqueueInputs(use_blocking_read);
    if (hasRemainingWork()) {
        processOutput();
    }
}

But I could also imagine wanting an entirely pub/sub sort of service that parses the URL to determine what output you're interested in and then only produces output with no input. It would be useful for live metrics.

@karlseguin
Copy link
Owner

karlseguin commented Oct 13, 2023

One option would be to let the app handler optionally define its own readLoop and use that if it's defined. But there's currently details in readLoop that most implementations probably woudn't care to replicate (e.g. properly handling a close message).

The handle callback does block the readloop, so you can block it by not returning:

pub fn handle(self: *Handler, message: Message) !void {
   buildSlowResponse(message)
}

Could get fancier and use a condition variable to stop handle from returning until the full response is sent (maybe from another thread, if that's how it's working).

@rhoot
Copy link
Author

rhoot commented Oct 14, 2023

That would also prevent incoming messages though, including messages to abort the previous one.

I was able to get things working without re-implementing the message handling entirely by doing this (you can ignore the whitespace differences; I ran diff with -w to reduce the diff size).

Click for diff
diff --git a/src/server.zig b/src/server.zig
index 28c5231..dce90ba 100644
--- a/src/server.zig
+++ b/src/server.zig
@@ -130,7 +130,12 @@ fn clientLoop(comptime H: type, context: anytype, net_conn: NetConn, config: *co
 		return;
 	};
 	defer reader.deinit();
-	conn.readLoop(H, handler, &reader) catch {};
+
+	if (comptime std.meta.trait.hasFn("readLoop")(H)) {
+		handler.readLoop(&conn, &reader) catch {};
+	} else {
+		conn.readLoop(H, &handler, &reader) catch {};
+	}
 }
 
 const EMPTY_PONG = ([2]u8{ @intFromEnum(OpCode.pong), 0 })[0..];
@@ -216,30 +221,42 @@ pub const Conn = struct {
 		self.closed = true;
 	}
 
-	fn readLoop(self: *Conn, comptime H: type, handler: H, reader: *Reader) !void {
+	pub fn handleNextMessage(self: *Conn, comptime H: type, handler: *H, reader: *Reader, us_timeout: i32) !void {
 		var h = handler;
 		const stream = self.stream;
 		const handle_ping = self._handle_ping;
 		const handle_pong = self._handle_pong;
 		const handle_close = self._handle_close;
 
-		while (true) {
+		if (us_timeout >= 0) {
+			var fds = [1]os.pollfd{.{ .fd = stream.handle, .events = os.POLL.IN, .revents = undefined }};
+			if (try os.poll(&fds, us_timeout) == 0) {
+				return;
+			}
+
+			if (fds[0].revents & (os.POLL.ERR | os.POLL.HUP | os.POLL.NVAL) != 0) {
+				return error.Closed;
+			}
+
+			if (fds[0].revents & os.POLL.IN == 0) {
+				return;
+			}
+		}
+
 		const message = reader.readMessage(stream) catch |err| {
 			switch (err) {
 				error.LargeControl => try stream.writeAll(CLOSE_PROTOCOL_ERROR),
 				error.ReservedFlags => try stream.writeAll(CLOSE_PROTOCOL_ERROR),
 				else => {},
 			}
-				return;
+			return err;
 		};
 
 		switch (message.type) {
 			.text, .binary => {
 				try h.handle(message);
 				reader.handled();
-					if (self.closed) {
-						return;
-					}
 			},
 			.pong => {
 				if (handle_pong) {
@@ -260,40 +277,52 @@ pub const Conn = struct {
 			},
 			.close => {
 				if (handle_close) {
-						return h.handle(message);
+					try h.handle(message);
+					return error.Closed;
 				}
 
 				const data = message.data;
 				const l = data.len;
 
 				if (l == 0) {
-						return self.writeClose();
+					try self.writeClose();
+					return error.Closed;
 				}
 
 				if (l == 1) {
 					// close with a payload always has to have at least a 2-byte payload,
 					// since a 2-byte code is required
-						return stream.writeAll(CLOSE_PROTOCOL_ERROR);
+					try stream.writeAll(CLOSE_PROTOCOL_ERROR);
+					return error.Closed;
 				}
 
 				const code = @as(u16, @intCast(data[1])) | (@as(u16, @intCast(data[0])) << 8);
 				if (code < 1000 or code == 1004 or code == 1005 or code == 1006 or (code > 1013 and code < 3000)) {
-						return stream.writeAll(CLOSE_PROTOCOL_ERROR);
+					try stream.writeAll(CLOSE_PROTOCOL_ERROR);
+					return error.Closed;
 				}
 
 				if (l == 2) {
-						return try stream.writeAll(CLOSE_NORMAL);
+					try stream.writeAll(CLOSE_NORMAL);
+					return error.Closed;
 				}
 
 				const payload = data[2..];
 				if (!std.unicode.utf8ValidateSlice(payload)) {
 					// if we have a payload, it must be UTF8 (why?!)
-						return try stream.writeAll(CLOSE_PROTOCOL_ERROR);
+					try stream.writeAll(CLOSE_PROTOCOL_ERROR);
+					return error.Closed;
 				}
-					return self.writeClose();
+				try self.writeClose();
+				return error.Closed;
 			},
 		}
 	}
+
+	fn readLoop(self: *Conn, comptime H: type, handler: *H, reader: *Reader) !void {
+		while (!self.closed) {
+			try self.handleNextMessage(H, handler, reader, -1);
+		}
 	}
 };
 
diff --git a/src/websocket.zig b/src/websocket.zig
index e70c7e1..2481373 100644
--- a/src/websocket.zig
+++ b/src/websocket.zig
@@ -14,6 +14,7 @@ pub const Message = lib.Message;
 pub const Handshake = lib.Handshake;
 pub const OpCode = lib.framing.OpCode;
 pub const Client = client.Client(client.Stream);
+pub const Reader = lib.Reader;
 
 pub const Config = struct{
 	pub const Server = server.Config;

So before calling readMessage it polls to see if there's any data to be read if a timeout was provided. handleNextMessage allows implementing the readLoop without all the message handling. So now I can do this:

    pub fn readLoop(self: *Handler, conn: *websocket.Conn, reader: *websocket.Reader) !void {
        while (!conn.closed) {
            self.got_input = false;

            const us_timeout: i32 = if (self.has_pending_output) 0 else -1;
            try conn.handleNextMessage(Handler, self, reader, us_timeout);

            if (self.got_input) {
                continue;
            }

            // process stuff
        }
    }

    pub fn handle(self: *Handler, message: websocket.Message) !void {
        self.got_input = true;
        // handle the message
    }

I could make a PR if you think it'd be worth getting into the library? I'm not sure if there's some edge case it doesn't handle though. In particular after adding the errors (to cause the try in readLoop to fail and break out of the loop). The tests are passing at least.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants