Skip to content

Commit

Permalink
Make SendStream::finish synchronous
Browse files Browse the repository at this point in the history
`await`ing on this was error-prone and not very useful, since it gave
little insight into application state, and was redundant to `stopped`.
  • Loading branch information
Ralith committed May 3, 2024
1 parent f28cb3d commit b42e21e
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 161 deletions.
4 changes: 3 additions & 1 deletion bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ pub async fn send_data_on_stream(stream: &mut quinn::SendStream, stream_size: u6
.context("failed sending data")?;
}

stream.finish().await.context("failed finishing stream")?;
stream.finish().unwrap();
// Wait for stream to close
_ = stream.stopped().await;

Ok(())
}
Expand Down
6 changes: 4 additions & 2 deletions perf/src/bin/perf_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ async fn request(
let upload_start = Instant::now();
send.write_all(&download.to_be_bytes()).await?;
if upload == 0 {
send.finish().await?;
send.finish().unwrap();
return Ok(());
}

Expand All @@ -317,7 +317,9 @@ async fn request(
send_stream_stats.on_bytes(chunk_len as usize);
upload -= chunk_len;
}
send.finish().await?;
send.finish().unwrap();
// Wait for stream to close
_ = send.stopped().await;
send_stream_stats.finish(upload_start.elapsed());

debug!("upload finished on {}", send.id());
Expand Down
4 changes: 3 additions & 1 deletion quinn/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ fn send_data(bench: &mut Bencher, data: &'static [u8], concurrent_streams: usize
handles.push(runtime.spawn(async move {
let mut stream = client.open_uni().await.unwrap();
stream.write_all(data).await.unwrap();
stream.finish().await.unwrap();
stream.finish().unwrap();
// Wait for stream to close
_ = stream.stopped().await;
}));
}

Expand Down
4 changes: 1 addition & 3 deletions quinn/examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,7 @@ async fn run(options: Opt) -> Result<()> {
send.write_all(request.as_bytes())
.await
.map_err(|e| anyhow!("failed to send request: {}", e))?;
send.finish()
.await
.map_err(|e| anyhow!("failed to shutdown stream: {}", e))?;
send.finish().unwrap();
let response_start = Instant::now();
eprintln!("request sent at {:?}", response_start - start);
let resp = recv
Expand Down
4 changes: 1 addition & 3 deletions quinn/examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,7 @@ async fn handle_request(
.await
.map_err(|e| anyhow!("failed to send response: {}", e))?;
// Gracefully terminate the stream
send.finish()
.await
.map_err(|e| anyhow!("failed to shutdown stream: {}", e))?;
send.finish().unwrap();
info!("complete");
Ok(())
}
Expand Down
28 changes: 14 additions & 14 deletions quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
mutex::Mutex,
recv_stream::RecvStream,
runtime::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPoller},
send_stream::{SendStream, WriteError},
send_stream::SendStream,
udp_transmit, ConnectionEvent, VarInt,
};
use proto::{
Expand Down Expand Up @@ -856,7 +856,6 @@ impl ConnectionRef {
endpoint_events,
blocked_writers: FxHashMap::default(),
blocked_readers: FxHashMap::default(),
finishing: FxHashMap::default(),
stopped: FxHashMap::default(),
error: None,
ref_count: 0,
Expand Down Expand Up @@ -936,7 +935,6 @@ pub(crate) struct State {
endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
pub(crate) finishing: FxHashMap<StreamId, oneshot::Sender<Option<WriteError>>>,
pub(crate) stopped: FxHashMap<StreamId, Waker>,
/// Always set to Some before the connection becomes drained
pub(crate) error: Option<ConnectionError>,
Expand Down Expand Up @@ -1102,21 +1100,14 @@ impl State {
shared.stream_budget_available[dir as usize].notify_waiters();
}
Stream(StreamEvent::Finished { id }) => {
if let Some(finishing) = self.finishing.remove(&id) {
// If the finishing stream was already dropped, there's nothing more to do.
let _ = finishing.send(None);
}
if let Some(stopped) = self.stopped.remove(&id) {
stopped.wake();
}
}
Stream(StreamEvent::Stopped { id, error_code }) => {
Stream(StreamEvent::Stopped { id, .. }) => {
if let Some(stopped) = self.stopped.remove(&id) {
stopped.wake();
}
if let Some(finishing) = self.finishing.remove(&id) {
let _ = finishing.send(Some(WriteError::Stopped(error_code)));
}
if let Some(writer) = self.blocked_writers.remove(&id) {
writer.wake();
}
Expand Down Expand Up @@ -1200,9 +1191,6 @@ impl State {
shared.stream_incoming[Dir::Bi as usize].notify_waiters();
shared.datagram_received.notify_waiters();
shared.datagrams_unblocked.notify_waiters();
for (_, x) in self.finishing.drain() {
let _ = x.send(Some(WriteError::ConnectionLost(reason.clone())));
}
if let Some(x) = self.on_connected.take() {
let _ = x.send(false);
}
Expand Down Expand Up @@ -1285,8 +1273,20 @@ pub struct UnknownStream {
_private: (),
}

impl UnknownStream {
pub(crate) fn new() -> Self {
Self { _private: () }
}
}

impl From<proto::UnknownStream> for UnknownStream {
fn from(_: proto::UnknownStream) -> Self {
Self { _private: () }
}
}

impl From<UnknownStream> for io::Error {
fn from(x: UnknownStream) -> Self {
Self::new(io::ErrorKind::NotConnected, x)
}
}
44 changes: 0 additions & 44 deletions quinn/src/recv_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,50 +45,6 @@ use crate::{
/// bidirectional stream 1, the first stream yielded by [`Connection::accept_bi`] on the receiver
/// will be bidirectional stream 0.
///
/// ## Unexpected [`WriteError::Stopped`] in sender
///
/// When a stream is expected to be closed gracefully the sender should call
/// [`SendStream::finish`]. However there is no guarantee the connected [`RecvStream`] will
/// receive the "finished" notification in the same QUIC frame as the last frame which
/// carried data.
///
/// Even if the application layer logic already knows it read all the data because it does
/// its own framing, it should still read until it reaches the end of the [`RecvStream`].
/// Otherwise it risks inadvertently calling [`RecvStream::stop`] if it drops the stream.
/// And calling [`RecvStream::stop`] could result in the connected [`SendStream::finish`]
/// call failing with a [`WriteError::Stopped`] error.
///
/// For example if exactly 10 bytes are to be read, you still need to explicitly read the
/// end of the stream:
///
/// ```no_run
/// # use quinn::{SendStream, RecvStream};
/// # async fn func(
/// # mut send_stream: SendStream,
/// # mut recv_stream: RecvStream,
/// # ) -> anyhow::Result<()>
/// # {
/// // In the sending task
/// send_stream.write(&b"0123456789"[..]).await?;
/// send_stream.finish().await?;
///
/// // In the receiving task
/// let mut buf = [0u8; 10];
/// let data = recv_stream.read_exact(&mut buf).await?;
/// if recv_stream.read_to_end(0).await.is_err() {
/// // Discard unexpected data and notify the peer to stop sending it
/// let _ = recv_stream.stop(0u8.into());
/// }
/// # Ok(())
/// # }
/// ```
///
/// An alternative approach, used in HTTP/3, is to specify a particular error code used with `stop`
/// that indicates graceful receiver-initiated stream shutdown, rather than a true error condition.
///
/// [`RecvStream::read_chunk`] could be used instead which does not take ownership and
/// allows using an explicit call to [`RecvStream::stop`] with a custom error code.
///
/// [`ReadError`]: crate::ReadError
/// [`stop()`]: RecvStream::stop
/// [`SendStream::finish`]: crate::SendStream::finish
Expand Down
Loading

0 comments on commit b42e21e

Please sign in to comment.