Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce RecvStream::received_reset #1873

Merged
merged 2 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions quinn-proto/src/connection/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,33 @@ impl<'a> RecvStream<'a> {

Ok(())
}

/// Check whether this stream has been reset by the peer, returning the reset error code if so
///
/// After returning `Ok(Some(_))` once, stream state will be discarded and all future calls will
/// return `Err(ClosedStream)`.
pub fn received_reset(&mut self) -> Result<Option<VarInt>, ClosedStream> {
let hash_map::Entry::Occupied(entry) = self.state.recv.entry(self.id) else {
return Err(ClosedStream { _private: () });
};
let Some(s) = entry.get().as_ref() else {
return Ok(None);
};
if s.stopped {
return Err(ClosedStream { _private: () });
}
let Some(code) = s.reset_code() else {
return Ok(None);
};

// Clean up state after application observes the reset, since there's no reason for the
// application to attempt to read or stop the stream once it knows it's reset
entry.remove_entry();
self.state.stream_freed(self.id, StreamHalf::Recv);
self.state.queue_max_stream_id(self.pending);

Ok(Some(code))
}
}

/// Access to streams
Expand Down
7 changes: 7 additions & 0 deletions quinn-proto/src/connection/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,13 @@ impl Recv {
Ok(true)
}

pub(super) fn reset_code(&self) -> Option<VarInt> {
match self.state {
RecvState::ResetRecvd { error_code, .. } => Some(error_code),
_ => None,
}
}

/// Compute the amount of flow control credit consumed, or return an error if more was consumed
/// than issued
fn credit_consumed_by(
Expand Down
81 changes: 79 additions & 2 deletions quinn/src/recv_stream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
future::Future,
future::{poll_fn, Future},
io,
pin::Pin,
task::{Context, Poll},
Expand Down Expand Up @@ -281,6 +281,46 @@ impl RecvStream {
self.stream
}

/// Completes when the stream has been reset by the peer or otherwise closed
///
/// Yields `Some` with the reset error code when the stream is reset by the peer. Yields `None`
/// when the stream was previously [`stop()`](Self::stop)ed, or when the stream was
/// [`finish()`](crate::SendStream::finish)ed by the peer and all data has been received, after
/// which it is no longer meaningful for the stream to be reset.
///
/// This operation is cancel-safe.
pub async fn received_reset(&mut self) -> Result<Option<VarInt>, ResetError> {
poll_fn(|cx| {
let mut conn = self.conn.state.lock("RecvStream::reset");
if self.is_0rtt && conn.check_0rtt().is_err() {
return Poll::Ready(Err(ResetError::ZeroRttRejected));
}

if let Some(code) = self.reset {
return Poll::Ready(Ok(Some(code)));
}
Ralith marked this conversation as resolved.
Show resolved Hide resolved

match conn.inner.recv_stream(self.stream).received_reset() {
Err(_) => Poll::Ready(Ok(None)),
Ok(Some(error_code)) => {
// Stream state has just now been freed, so the connection may need to issue new
// stream ID flow control credit
conn.wake();
Poll::Ready(Ok(Some(error_code)))
}
Ok(None) => {
// Resets always notify readers, since a reset is an immediate read error. We
// could introduce a dedicated channel to reduce the risk of spurious wakeups,
// but that increased complexity is probably not justified, as an application
// that is expecting a reset is not likely to receive large amounts of data.
conn.blocked_readers.insert(self.stream, cx.waker().clone());
Poll::Pending
}
}
})
.await
}

/// Handle common logic related to reading out of a receive stream
///
/// This takes an `FnMut` closure that takes care of the actual reading process, matching
Expand Down Expand Up @@ -308,7 +348,7 @@ impl RecvStream {

// If we stored an error during a previous call, return it now. This can happen if a
// `read_fn` both wants to return data and also returns an error in its final stream status.
let status = match self.reset.take() {
let status = match self.reset {
Some(code) => ReadStatus::Failed(None, Reset(code)),
None => {
let mut recv = conn.inner.recv_stream(self.stream);
Expand Down Expand Up @@ -340,6 +380,7 @@ impl RecvStream {
ReadStatus::Failed(read, Reset(error_code)) => match read {
None => {
self.all_data_read = true;
self.reset = Some(error_code);
Poll::Ready(Err(ReadError::Reset(error_code)))
}
done => {
Expand Down Expand Up @@ -502,6 +543,15 @@ impl From<ReadableError> for ReadError {
}
}

impl From<ResetError> for ReadError {
Ralith marked this conversation as resolved.
Show resolved Hide resolved
fn from(e: ResetError) -> Self {
match e {
ResetError::ConnectionLost(e) => Self::ConnectionLost(e),
ResetError::ZeroRttRejected => Self::ZeroRttRejected,
}
}
}

impl From<ReadError> for io::Error {
fn from(x: ReadError) -> Self {
use self::ReadError::*;
Expand All @@ -514,6 +564,33 @@ impl From<ReadError> for io::Error {
}
}

/// Errors that arise while waiting for a stream to be reset
#[derive(Debug, Error, Clone, PartialEq, Eq)]
pub enum ResetError {
/// The connection was lost
#[error("connection lost")]
ConnectionLost(#[from] ConnectionError),
/// This was a 0-RTT stream and the server rejected it
///
/// Can only occur on clients for 0-RTT streams, which can be opened using
/// [`Connecting::into_0rtt()`].
///
/// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
#[error("0-RTT rejected")]
ZeroRttRejected,
}

impl From<ResetError> for io::Error {
fn from(x: ResetError) -> Self {
use ResetError::*;
let kind = match x {
ZeroRttRejected => io::ErrorKind::ConnectionReset,
ConnectionLost(_) => io::ErrorKind::NotConnected,
};
Self::new(kind, x)
}
}

/// Future produced by [`RecvStream::read()`].
///
/// [`RecvStream::read()`]: crate::RecvStream::read
Expand Down