-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Server uses ReadStream and WriteStream #136 #137
Conversation
const hasInit = 'init' in procedure; | ||
|
||
if (!isFirstMessage || !hasInit) { | ||
// Init message is consumed during stream instantiation | ||
if (Value.Check(procedure.input, message.payload)) { | ||
procStream.incoming.pushValue(message.payload as PayloadType); | ||
} else if (!Value.Check(ControlMessagePayloadSchema, message.payload)) { | ||
log?.error( | ||
`procedure ${serviceName}.${procedureName} received invalid payload`, | ||
{ clientId: this.transport.clientId, fullTransportMessage: message }, | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like this approach (or any of the changes i made to this file for that matter.) There's going to have to be some refactors for us to implement invalid requests, half-close properly, and close requests, so I'm gonna stick to keeping changes to here as minimal as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah this is pretty hard to parse in some points, it feels inconsistent how and when we check init vs payload at least within the diff
const hasInit = 'init' in procedure; | ||
|
||
if (!isFirstMessage || !hasInit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in theory this shouldn't work for rpc
, but it does 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We just end up pushing extraneous values to the stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do we end up pushing 🤔
if (!stream.incoming.isClosed()) { | ||
stream.incoming.triggerClose(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should probably call drain
here. Also don't need to be checking isClosed
, but it's ok for now
82d0a80
to
cb18527
Compare
cb18527
to
9f7d8a9
Compare
@@ -326,12 +302,24 @@ class RiverServer<Services extends AnyServiceSchemaMap> { | |||
}, | |||
activeContext, | |||
async (span: Span) => { | |||
if (!Value.Check(procedure.input, message.payload)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't the first (and only message) on a subscription to the server be init? im a little confused, is this + #136 rewriting to also do the thing we talked about where all streams are mandatory stream init message + potentially more body messages instead of the current state of the world where it is optional special init message + maybe more body messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the RPC handler btw, first message don't go into the stream.
Yeah if we have mandatory inits the code will be much cleaner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
subscriptions and rpc currently are not init
. I'll update the protocol doc then readapt the changes
const hasInit = 'init' in procedure; | ||
|
||
if (!isFirstMessage || !hasInit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do we end up pushing 🤔
const hasInit = 'init' in procedure; | ||
|
||
if (!isFirstMessage || !hasInit) { | ||
// Init message is consumed during stream instantiation | ||
if (Value.Check(procedure.input, message.payload)) { | ||
procStream.incoming.pushValue(message.payload as PayloadType); | ||
} else if (!Value.Check(ControlMessagePayloadSchema, message.payload)) { | ||
log?.error( | ||
`procedure ${serviceName}.${procedureName} received invalid payload`, | ||
{ clientId: this.transport.clientId, fullTransportMessage: message }, | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah this is pretty hard to parse in some points, it feels inconsistent how and when we check init vs payload at least within the diff
const inputPipe = createPipe<Static<I>>(); | ||
const outputPipe = createPipe<ProcedureResult<O, E>>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooo this is nice
@@ -448,10 +444,22 @@ class RiverServer<Services extends AnyServiceSchemaMap> { | |||
}, | |||
activeContext, | |||
async (span: Span) => { | |||
if (!Value.Check(procedure.input, message.payload)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah this should check init :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or wait, subscriptions don't have init
, only input
.
All the changes are documented in `Protocol.md` but here's a summary: - Handle invalid client requests by sending a close with an error back - This was the main motivation for the change. While we could sort-of implement this error response without the other changes, things are setup in such a way where it is very hard to implement correctly without deeper changes in how we handle closing. - Add more robust closing mechanics - Half-close states - Close signals from read end of the pipes - Abort full-closure (for errors and cancellation) - Switch from `Pushable` and `AsyncIterator` APIs to a `ReadStream` and `WriteStream` - All procedures have `init` and some have `input` While the changes are not strictly backwards compatible, hence the major protocol bump, the system can still operate across versions to some extent. See PRs linked below for more information on the above # TODOs - [x] Define protocol and update doc #111 - [x] Design stream abstractions #118 - [x] Redsigned in #249 - [x] Implement stream abstractions - [x] ReadStream #130 - [x] WriteStream #132 - [x] All streams have init, some have input. - [x] Protocol change documented in #153 - [x] Implementation change #159 - [x] Use stream abstractions & implement protocol closing semantics - [x] Protocol: Implement close requests from readers #165 - [x] Protocol: Implement half-close - [x] Client #162 - [x] Server #163 - [x] Simple s/Pushable/Stream replacement - [x] Client #136 - [x] Server #137 - [x] Make `Input` iterator on the server use `Result` so we can signal stream closes, client disconnects, and aborts #172 - [x] Add Abort mechanism - [x] Docs update #175 - [x] Implement abort - [x] Client #193 - [x] Server #200 - [x] Add `INVALID_REQUEST` to schema #107 - [x] Handle/send back `INVALID_REQUEST` errors with an abort bit #203 - [x] Handle/send back `INTERNAL_RIVER_ERROR` with an abort bit #203 - [x] Send abort bit with `UNCAUGHT_ERROR` #201 - [x] Abort tombstones #204 - [ ] Try to find uncovered areas to test - [ ] `undefined` value for `init`, `input`, & `output`. - [ ] Update docs - [ ] Changelog --------- Co-authored-by: Jacky Zhao <j.zhao2k19@gmail.com>
Exactly the same as #136 but for the server