Skip to content

Commit

Permalink
Make the semantics of Stream more reasonable
Browse files Browse the repository at this point in the history
Closes #200
  • Loading branch information
daniel-abramov committed Mar 22, 2022
1 parent 1d69273 commit 5a1f60c
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ where
pub struct WebSocketStream<S> {
inner: WebSocket<AllowStd<S>>,
closing: bool,
ended: bool,
}

impl<S> WebSocketStream<S> {
Expand Down Expand Up @@ -216,7 +217,7 @@ impl<S> WebSocketStream<S> {
}

pub(crate) fn new(ws: WebSocket<AllowStd<S>>) -> Self {
WebSocketStream { inner: ws, closing: false }
WebSocketStream { inner: ws, closing: false, ended: false }
}

fn with_context<F, R>(&mut self, ctx: Option<(ContextWaker, &mut Context<'_>)>, f: F) -> R
Expand Down Expand Up @@ -271,13 +272,27 @@ where

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
trace!("{}:{} Stream.poll_next", file!(), line!());

// The connection has been closed or the critical error has occurred.
// We have already returned the error to the user, the `Stream` is unusable,
// so we assume that the stream has been "fused".
if self.ended {
return Poll::Ready(None);
}

match futures_util::ready!(self.with_context(Some((ContextWaker::Read, cx)), |s| {
trace!("{}:{} Stream.with_context poll_next -> read_message()", file!(), line!());
cvt(s.read_message())
})) {
Ok(v) => Poll::Ready(Some(Ok(v))),
Err(WsError::AlreadyClosed) | Err(WsError::ConnectionClosed) => Poll::Ready(None),
Err(e) => Poll::Ready(Some(Err(e))),
Err(e) => {
self.ended = true;
if matches!(e, WsError::AlreadyClosed | WsError::ConnectionClosed) {
Poll::Ready(None)
} else {
Poll::Ready(Some(Err(e)))
}
}
}
}
}
Expand Down

0 comments on commit 5a1f60c

Please sign in to comment.