Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[protocol v2] RFC: custom streams #118

Merged
merged 1 commit into from
May 18, 2024
Merged

[protocol v2] RFC: custom streams #118

merged 1 commit into from
May 18, 2024

Conversation

masad-frost
Copy link
Member

@masad-frost masad-frost commented May 11, 2024

Why

Pushable from it-pushable is a convenient stream-looking abstraction to use, but it doesn't have the right affordances for consumers or the constraints we want in order to achieve good close semantics as proposed in #111. Web streams would have been nice to use, but they have a similar problem.

What changed

Built some stream interfaces to be used by consumers in server handlers or on the client side. Things are documented inside. The e2e tests and related are updated to give an idea of how things will look.

Decided to leave implementation out of this PR just to get comments on the idea.

@masad-frost masad-frost requested a review from a team as a code owner May 11, 2024 01:46
@masad-frost masad-frost requested review from bradymadden97 and removed request for a team May 11, 2024 01:46
@masad-frost masad-frost marked this pull request as draft May 11, 2024 07:22
@masad-frost
Copy link
Member Author

Still some things to uncover

@masad-frost masad-frost marked this pull request as ready for review May 11, 2024 17:46
router/streams.ts Outdated Show resolved Hide resolved
@masad-frost masad-frost force-pushed the fm-streams branch 3 times, most recently from 3a8aa8c to 7b9e499 Compare May 11, 2024 18:03
@masad-frost
Copy link
Member Author

This feels bit over engineered and yet another layer to river 😢, but I can't think of a better way to provide ergonomic streams that force devs into pit of success.

@bradymadden97
Copy link
Contributor

Maybe it would help to lay out a specific scenario where the pushable API breaks down, then we can figure out how to solve it? That part isn't clear to me, so it's hard to see how a new API helps.

@masad-frost
Copy link
Member Author

Maybe it would help to lay out a specific scenario where the pushable API breaks down, then we can figure out how to solve it? That part isn't clear to me, so it's hard to see how a new API helps.

The pushable API exposed 2 methods (push and end), async iterator, and no properties. Specific scenarios are already expressed in the protocol PR.

For the writer, exposing an async iterator makes no sense and is confusing for it to be part of the API. There's also no way to get a signal that he reader is interested in closing. You can also call end multiple times, and you can start pushing to it again at any point, there's no way to tell if the steam has been closed. We'll also want to add abort it's semantics at some point. Prior to #111, we don't even need a pushable, just a simple write function.

On the reader side, it's a lot worse. First, there's no locking mechanism, so you could have 2 iterations fighting for the reads and you wouldn't know. The push method is exposed, and it makes no sense for a reader to have this. Getting a signal if the stream ended requires you to iterate over the whole pushable. You can call end on the pushable, I guess to signal that you're no longer interested, but the API isn't self explanatory, and end has unintended side effects like ending your iterator even though the stream didn't really end, it's just a request to close, so you lose your ability to know when the stream actually closes entirely. Prior to #111, we just need an asynciterable, not a pushable.

*
* Consuming a locked stream will throw an error.
*/
sink(): void;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably drain

Comment on lines 164 to 177
async handler(_ctx, { reader, writer }) {
for await (const { msg, throwError, throwResult } of reader.iter()) {
if (throwError) {
// this force closes the stream
throw new Error('some message');
} else if (throwResult) {
returnStream.push(
writer.write(
Err({
code: STREAM_ERROR,
message: 'field throwResult was set to true',
}),
);
} else {
returnStream.push(Ok({ response: msg }));
writer.write(Ok({ response: msg }));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like using "write" terminology for the output stream is good, but "read" for the input stream feels not as intuitive. I feel like I want to use the word "observe" here more, and in-fact I think a true observable (so it's allowed to complete, or emit errors) fits really well here, and has a nice intuitive understanding. It may also be more flexible.

In the end this doesn't really matter and using "read" is intuitive if you're used to e.g. buffers from Node, so it's fine. I also think it's better than what came before, I feel like this API is more naturally explanatory.

Copy link
Contributor

@Monkatraz Monkatraz May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading:

No they're completely independent. Each end (server or client) has its own reader for writes coming from the other side, and its own writer to write to the other side's reader.

I feel more comfortable with this, although I feel like when I read this code I want to ask "reading from what?" and "writing to what?". If you are viewing streams as iterators, maybe you'd get confused that you're reading from the stream you're writing to, or you're writing to the stream you read from, which isn't the case, as these are independent streams.

It's verbose, but I feel like for readability I'm wanting to name these something like fromClientReader and toClientWriter. I'm not actually suggesting that, mainly wondering if there are better names here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe on the server inputReader and outputWriter, and on the client outputReader and inputWriter, since input and output are part of the api nomenclature

Comment on lines 148 to 153
export interface ReadWriteStream<TRead, TWrite> {
/**
* The reader side of the stream.
*/
reader: CloseableReadStream<TRead>;
/**
* The writer side of the stream.
*/
writer: WriteStream<TWrite>;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are invariants between reader and writer here, right? Like, if the writer closes the stream, reader is going to be closed, right? It feels awkward to have to go and do something like stream.reader.isClosed() to check, because it feels like you should also have to check stream.writer.isClosed(), even if you don't. So maybe for ReadWriteStream it should lift those methods up?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No they're completely independent. Each end (server or client) has its own reader for writes coming from the other side, and its own writer to write to the other side's reader.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me update the protocol doc to make the reader and writer semantics more clear, I'll ping you when that's done. But this is basically modeling after how most bidirectional streams work, e.g. http/2

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, that makes sense. Also, maybe this should just be called BidirectionalStream?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that works.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 46 to 54
/**
* `tee` splits the stream into two {@link ReadStream} instances that
* can be consumed independently. The original stream will be locked forever.
*/
tee(): [ReadStream<T>, ReadStream<T>];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for?

Copy link
Member Author

@masad-frost masad-frost May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to read in 2 different places, it allows you to create separate readers.

Comment on lines 49 to 51
void writer.waitForCloseRequest().then(() => {
writer.close();
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this might be a common enough pattern that you could make a method for it or something - like writer.setCloseWhenRequested(true)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although elsewhere you do more than just close the stream when this promise resolves...

I wonder if adding teardowns would be a good here, like:

// called when the stream is closed
// maybe should just be onClose
writer.addTeardown(dispose)

Although I suppose you could just do this:

writer.waitForClose().then(dispose)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I wasn't sure which is more common. Wrapping callbacks in a promise tends to be messier than calling .then on a promise

@masad-frost masad-frost mentioned this pull request May 16, 2024
32 tasks
@masad-frost masad-frost changed the base branch from main to protocolv2 May 16, 2024 03:49
@masad-frost masad-frost merged commit 19e1db3 into protocolv2 May 18, 2024
3 checks passed
@masad-frost masad-frost deleted the fm-streams branch May 18, 2024 00:06
This was referenced May 19, 2024
masad-frost added a commit that referenced this pull request May 21, 2024
masad-frost added a commit that referenced this pull request May 21, 2024
*
* Consuming a locked stream will throw an error.
*/
drain(): undefined;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm my rust brain thought is that asArray should do what drain does here: https://doc.rust-lang.org/std/vec/struct.Drain.html

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting idea. I imagine in most cases where you want this you can iterate and call a callback when you have a buffer of some length.

* it is unsafe is that the consumer will not be notified that the stream was interrupted, but
* they can manually check by calling {@link isBroken}.
*/
breakConsumerUnsafe(): undefined;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would you need this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought was that you might want to interrupt an iterator/consumer without causing them to throw for some reason. I ended up not incorporating this in the final design, drain interrupts now.

Comment on lines +84 to +92
/**
* `requestClose` sends a request to the writer to close the stream, and resolves when the stream
* is fully closed. The stream can still receive more data after the request is sent.
*/
requestClose(): undefined;
/**
* `isCloseRequested` checks if the reader has requested to close the stream.
*/
isCloseRequested(): boolean;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feels like these two should only be present on the bidi pair not on just the pure ends

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but i guess in the actual usage you only ever have access to just one of the ends

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah exactly, if you have a reader then there's a writer on the other end that might be waiting for your close request.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words, a reader will always have a writer, it's not related to bidi streams

masad-frost added a commit that referenced this pull request May 29, 2024
masad-frost added a commit that referenced this pull request May 31, 2024
masad-frost added a commit that referenced this pull request Jun 6, 2024
masad-frost added a commit that referenced this pull request Jun 11, 2024
masad-frost added a commit that referenced this pull request Jun 13, 2024
masad-frost added a commit that referenced this pull request Jun 20, 2024
masad-frost added a commit that referenced this pull request Jun 24, 2024
masad-frost added a commit that referenced this pull request Aug 16, 2024
All the changes are documented in `Protocol.md` but here's a summary:
- Handle invalid client requests by sending a close with an error back
- This was the main motivation for the change. While we could sort-of
implement this error response without the other changes, things are
setup in such a way where it is very hard to implement correctly without
deeper changes in how we handle closing.
- Add more robust closing mechanics
  - Half-close states
  - Close signals from read end of the pipes
  - Abort full-closure (for errors and cancellation)
- Switch from `Pushable` and `AsyncIterator` APIs to a `ReadStream` and
`WriteStream`
- All procedures have `init` and some have `input`

While the changes are not strictly backwards compatible, hence the major
protocol bump, the system can still operate across versions to some
extent.

See PRs linked below for more information on the above

# TODOs
- [x] Define protocol and update doc #111 
- [x] Design stream abstractions #118 
  - [x] Redsigned in #249 
- [x] Implement stream abstractions
  - [x] ReadStream #130
  - [x] WriteStream #132
- [x] All streams have init, some have input.
  - [x] Protocol change documented in #153
  - [x] Implementation change #159  
- [x] Use stream abstractions & implement protocol closing semantics
  - [x] Protocol: Implement close requests from readers #165 
  - [x] Protocol: Implement half-close
    - [x] Client #162 
    - [x] Server #163 
  - [x] Simple s/Pushable/Stream replacement
    - [x] Client #136 
    - [x] Server #137 
- [x] Make `Input` iterator on the server use `Result` so we can signal
stream closes, client disconnects, and aborts #172
- [x] Add Abort mechanism
  - [x] Docs update #175
  - [x] Implement abort
    - [x] Client #193
    - [x] Server #200
  - [x] Add `INVALID_REQUEST` to schema #107
- [x] Handle/send back `INVALID_REQUEST` errors with an abort bit #203
  - [x] Handle/send back `INTERNAL_RIVER_ERROR` with an abort bit #203 
  - [x] Send abort bit with `UNCAUGHT_ERROR` #201 
  - [x] Abort tombstones #204

- [ ] Try to find uncovered areas to test
   - [ ] `undefined` value for `init`, `input`, & `output`. 
- [ ] Update docs
- [ ] Changelog

---------

Co-authored-by: Jacky Zhao <j.zhao2k19@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants