diff --git a/tokio-macros/src/entry.rs b/tokio-macros/src/entry.rs index 8bdc7a18242..0e31cebbb09 100644 --- a/tokio-macros/src/entry.rs +++ b/tokio-macros/src/entry.rs @@ -126,22 +126,22 @@ impl Configuration { } fn build(&self) -> Result { - let flavor = self.flavor.unwrap_or(self.default_flavor); - use RuntimeFlavor::*; + use RuntimeFlavor as F; + let flavor = self.flavor.unwrap_or(self.default_flavor); let worker_threads = match (flavor, self.worker_threads) { - (CurrentThread, Some((_, worker_threads_span))) => { + (F::CurrentThread, Some((_, worker_threads_span))) => { let msg = format!( "The `worker_threads` option requires the `multi_thread` runtime flavor. Use `#[{}(flavor = \"multi_thread\")]`", self.macro_name(), ); return Err(syn::Error::new(worker_threads_span, msg)); } - (CurrentThread, None) => None, - (Threaded, worker_threads) if self.rt_multi_thread_available => { + (F::CurrentThread, None) => None, + (F::Threaded, worker_threads) if self.rt_multi_thread_available => { worker_threads.map(|(val, _span)| val) } - (Threaded, _) => { + (F::Threaded, _) => { let msg = if self.flavor.is_none() { "The default runtime flavor is `multi_thread`, but the `rt-multi-thread` feature is disabled." } else { @@ -152,14 +152,14 @@ impl Configuration { }; let start_paused = match (flavor, self.start_paused) { - (Threaded, Some((_, start_paused_span))) => { + (F::Threaded, Some((_, start_paused_span))) => { let msg = format!( "The `start_paused` option requires the `current_thread` runtime flavor. Use `#[{}(flavor = \"current_thread\")]`", self.macro_name(), ); return Err(syn::Error::new(start_paused_span, msg)); } - (CurrentThread, Some((start_paused, _))) => Some(start_paused), + (F::CurrentThread, Some((start_paused, _))) => Some(start_paused), (_, None) => None, }; diff --git a/tokio-stream/src/stream_ext/merge.rs b/tokio-stream/src/stream_ext/merge.rs index 9d5123c85a3..4f7022b1f97 100644 --- a/tokio-stream/src/stream_ext/merge.rs +++ b/tokio-stream/src/stream_ext/merge.rs @@ -66,25 +66,23 @@ where T: Stream, U: Stream, { - use Poll::*; - let mut done = true; match first.poll_next(cx) { - Ready(Some(val)) => return Ready(Some(val)), - Ready(None) => {} - Pending => done = false, + Poll::Ready(Some(val)) => return Poll::Ready(Some(val)), + Poll::Ready(None) => {} + Poll::Pending => done = false, } match second.poll_next(cx) { - Ready(Some(val)) => return Ready(Some(val)), - Ready(None) => {} - Pending => done = false, + Poll::Ready(Some(val)) => return Poll::Ready(Some(val)), + Poll::Ready(None) => {} + Poll::Pending => done = false, } if done { - Ready(None) + Poll::Ready(None) } else { - Pending + Poll::Pending } } diff --git a/tokio-stream/src/stream_map.rs b/tokio-stream/src/stream_map.rs index 0c11bf1d543..da021c795f6 100644 --- a/tokio-stream/src/stream_map.rs +++ b/tokio-stream/src/stream_map.rs @@ -518,8 +518,6 @@ where { /// Polls the next value, includes the vec entry index fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll> { - use Poll::*; - let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize; let mut idx = start; @@ -527,8 +525,8 @@ where let (_, stream) = &mut self.entries[idx]; match Pin::new(stream).poll_next(cx) { - Ready(Some(val)) => return Ready(Some((idx, val))), - Ready(None) => { + Poll::Ready(Some(val)) => return Poll::Ready(Some((idx, val))), + Poll::Ready(None) => { // Remove the entry self.entries.swap_remove(idx); @@ -542,7 +540,7 @@ where idx = idx.wrapping_add(1) % self.entries.len(); } } - Pending => { + Poll::Pending => { idx = idx.wrapping_add(1) % self.entries.len(); } } @@ -550,9 +548,9 @@ where // If the map is empty, then the stream is complete. if self.entries.is_empty() { - Ready(None) + Poll::Ready(None) } else { - Pending + Poll::Pending } } } diff --git a/tokio-test/src/macros.rs b/tokio-test/src/macros.rs index 4c029c9e781..693ed9edd03 100644 --- a/tokio-test/src/macros.rs +++ b/tokio-test/src/macros.rs @@ -22,17 +22,17 @@ #[macro_export] macro_rules! assert_ready { ($e:expr) => {{ - use core::task::Poll::*; + use core::task::Poll; match $e { - Ready(v) => v, - Pending => panic!("pending"), + Poll::Ready(v) => v, + Poll::Pending => panic!("pending"), } }}; ($e:expr, $($msg:tt)+) => {{ - use core::task::Poll::*; + use core::task::Poll; match $e { - Ready(v) => v, - Pending => { + Poll::Ready(v) => v, + Poll::Pending => { panic!("pending; {}", format_args!($($msg)+)) } } @@ -127,17 +127,17 @@ macro_rules! assert_ready_err { #[macro_export] macro_rules! assert_pending { ($e:expr) => {{ - use core::task::Poll::*; + use core::task::Poll; match $e { - Pending => {} - Ready(v) => panic!("ready; value = {:?}", v), + Poll::Pending => {} + Poll::Ready(v) => panic!("ready; value = {:?}", v), } }}; ($e:expr, $($msg:tt)+) => {{ - use core::task::Poll::*; + use core::task::Poll; match $e { - Pending => {} - Ready(v) => { + Poll::Pending => {} + Poll::Ready(v) => { panic!("ready; value = {:?}; {}", v, format_args!($($msg)+)) } } diff --git a/tokio-util/CHANGELOG.md b/tokio-util/CHANGELOG.md index db60574653f..f1cfeaf3645 100644 --- a/tokio-util/CHANGELOG.md +++ b/tokio-util/CHANGELOG.md @@ -1,3 +1,31 @@ +# 0.7.9 (September 20th, 2023) + +### Added + +- io: add passthrough `AsyncRead`/`AsyncWrite` to `InspectWriter`/`InspectReader` ([#5739]) +- task: add spawn blocking methods to `JoinMap` ([#5797]) +- io: pass through traits for `StreamReader` and `SinkWriter` ([#5941]) +- io: add `SyncIoBridge::into_inner` ([#5971]) + +### Fixed + +- sync: handle possibly dangling reference safely ([#5812]) +- util: fix broken intra-doc link ([#5849]) +- compat: fix clippy warnings ([#5891]) + +### Documented + +- codec: Specify the line ending of `LinesCodec` ([#5982]) + +[#5739]: https://github.com/tokio-rs/tokio/pull/5739 +[#5797]: https://github.com/tokio-rs/tokio/pull/5797 +[#5941]: https://github.com/tokio-rs/tokio/pull/5941 +[#5971]: https://github.com/tokio-rs/tokio/pull/5971 +[#5812]: https://github.com/tokio-rs/tokio/pull/5812 +[#5849]: https://github.com/tokio-rs/tokio/pull/5849 +[#5891]: https://github.com/tokio-rs/tokio/pull/5891 +[#5982]: https://github.com/tokio-rs/tokio/pull/5982 + # 0.7.8 (April 25th, 2023) This release bumps the MSRV of tokio-util to 1.56. diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index 3e05a1c6239..524fba45f82 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -4,7 +4,7 @@ name = "tokio-util" # - Remove path dependencies # - Update CHANGELOG.md. # - Create "tokio-util-0.7.x" git tag. -version = "0.7.8" +version = "0.7.9" edition = "2021" rust-version = "1.56" authors = ["Tokio Contributors "] diff --git a/tokio-util/tests/length_delimited.rs b/tokio-util/tests/length_delimited.rs index 126e41b5cd3..ed5590f9644 100644 --- a/tokio-util/tests/length_delimited.rs +++ b/tokio-util/tests/length_delimited.rs @@ -12,7 +12,6 @@ use futures::{pin_mut, Sink, Stream}; use std::collections::VecDeque; use std::io; use std::pin::Pin; -use std::task::Poll::*; use std::task::{Context, Poll}; macro_rules! mock { @@ -39,10 +38,10 @@ macro_rules! assert_next_eq { macro_rules! assert_next_pending { ($io:ident) => {{ task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) { - Ready(Some(Ok(v))) => panic!("value = {:?}", v), - Ready(Some(Err(e))) => panic!("error = {:?}", e), - Ready(None) => panic!("done"), - Pending => {} + Poll::Ready(Some(Ok(v))) => panic!("value = {:?}", v), + Poll::Ready(Some(Err(e))) => panic!("error = {:?}", e), + Poll::Ready(None) => panic!("done"), + Poll::Pending => {} }); }}; } @@ -50,10 +49,10 @@ macro_rules! assert_next_pending { macro_rules! assert_next_err { ($io:ident) => {{ task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) { - Ready(Some(Ok(v))) => panic!("value = {:?}", v), - Ready(Some(Err(_))) => {} - Ready(None) => panic!("done"), - Pending => panic!("pending"), + Poll::Ready(Some(Ok(v))) => panic!("value = {:?}", v), + Poll::Ready(Some(Err(_))) => {} + Poll::Ready(None) => panic!("done"), + Poll::Pending => panic!("pending"), }); }}; } @@ -186,11 +185,11 @@ fn read_single_frame_multi_packet_wait() { let io = FramedRead::new( mock! { data(b"\x00\x00"), - Pending, + Poll::Pending, data(b"\x00\x09abc"), - Pending, + Poll::Pending, data(b"defghi"), - Pending, + Poll::Pending, }, LengthDelimitedCodec::new(), ); @@ -208,15 +207,15 @@ fn read_multi_frame_multi_packet_wait() { let io = FramedRead::new( mock! { data(b"\x00\x00"), - Pending, + Poll::Pending, data(b"\x00\x09abc"), - Pending, + Poll::Pending, data(b"defghi"), - Pending, + Poll::Pending, data(b"\x00\x00\x00\x0312"), - Pending, + Poll::Pending, data(b"3\x00\x00\x00\x0bhello world"), - Pending, + Poll::Pending, }, LengthDelimitedCodec::new(), ); @@ -250,9 +249,9 @@ fn read_incomplete_head() { fn read_incomplete_head_multi() { let io = FramedRead::new( mock! { - Pending, + Poll::Pending, data(b"\x00"), - Pending, + Poll::Pending, }, LengthDelimitedCodec::new(), ); @@ -268,9 +267,9 @@ fn read_incomplete_payload() { let io = FramedRead::new( mock! { data(b"\x00\x00\x00\x09ab"), - Pending, + Poll::Pending, data(b"cd"), - Pending, + Poll::Pending, }, LengthDelimitedCodec::new(), ); @@ -310,7 +309,7 @@ fn read_update_max_frame_len_at_rest() { fn read_update_max_frame_len_in_flight() { let io = length_delimited::Builder::new().new_read(mock! { data(b"\x00\x00\x00\x09abcd"), - Pending, + Poll::Pending, data(b"efghi"), data(b"\x00\x00\x00\x09abcdefghi"), }); @@ -533,9 +532,9 @@ fn write_single_multi_frame_multi_packet() { fn write_single_frame_would_block() { let io = FramedWrite::new( mock! { - Pending, + Poll::Pending, data(b"\x00\x00"), - Pending, + Poll::Pending, data(b"\x00\x09"), data(b"abcdefghi"), flush(), @@ -640,7 +639,7 @@ fn write_update_max_frame_len_in_flight() { let io = length_delimited::Builder::new().new_write(mock! { data(b"\x00\x00\x00\x06"), data(b"ab"), - Pending, + Poll::Pending, data(b"cdef"), flush(), }); @@ -701,8 +700,6 @@ enum Op { Flush, } -use self::Op::*; - impl AsyncRead for Mock { fn poll_read( mut self: Pin<&mut Self>, @@ -710,15 +707,15 @@ impl AsyncRead for Mock { dst: &mut ReadBuf<'_>, ) -> Poll> { match self.calls.pop_front() { - Some(Ready(Ok(Op::Data(data)))) => { + Some(Poll::Ready(Ok(Op::Data(data)))) => { debug_assert!(dst.remaining() >= data.len()); dst.put_slice(&data); - Ready(Ok(())) + Poll::Ready(Ok(())) } - Some(Ready(Ok(_))) => panic!(), - Some(Ready(Err(e))) => Ready(Err(e)), - Some(Pending) => Pending, - None => Ready(Ok(())), + Some(Poll::Ready(Ok(_))) => panic!(), + Some(Poll::Ready(Err(e))) => Poll::Ready(Err(e)), + Some(Poll::Pending) => Poll::Pending, + None => Poll::Ready(Ok(())), } } } @@ -730,31 +727,31 @@ impl AsyncWrite for Mock { src: &[u8], ) -> Poll> { match self.calls.pop_front() { - Some(Ready(Ok(Op::Data(data)))) => { + Some(Poll::Ready(Ok(Op::Data(data)))) => { let len = data.len(); assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src); assert_eq!(&data[..], &src[..len]); - Ready(Ok(len)) + Poll::Ready(Ok(len)) } - Some(Ready(Ok(_))) => panic!(), - Some(Ready(Err(e))) => Ready(Err(e)), - Some(Pending) => Pending, - None => Ready(Ok(0)), + Some(Poll::Ready(Ok(_))) => panic!(), + Some(Poll::Ready(Err(e))) => Poll::Ready(Err(e)), + Some(Poll::Pending) => Poll::Pending, + None => Poll::Ready(Ok(0)), } } fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { match self.calls.pop_front() { - Some(Ready(Ok(Op::Flush))) => Ready(Ok(())), - Some(Ready(Ok(_))) => panic!(), - Some(Ready(Err(e))) => Ready(Err(e)), - Some(Pending) => Pending, - None => Ready(Ok(())), + Some(Poll::Ready(Ok(Op::Flush))) => Poll::Ready(Ok(())), + Some(Poll::Ready(Ok(_))) => panic!(), + Some(Poll::Ready(Err(e))) => Poll::Ready(Err(e)), + Some(Poll::Pending) => Poll::Pending, + None => Poll::Ready(Ok(())), } } fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Ready(Ok(())) + Poll::Ready(Ok(())) } } @@ -771,9 +768,9 @@ impl From> for Op { } fn data(bytes: &[u8]) -> Poll> { - Ready(Ok(bytes.into())) + Poll::Ready(Ok(bytes.into())) } fn flush() -> Poll> { - Ready(Ok(Flush)) + Poll::Ready(Ok(Op::Flush)) } diff --git a/tokio/CHANGELOG.md b/tokio/CHANGELOG.md index eb40bc10100..0c821596462 100644 --- a/tokio/CHANGELOG.md +++ b/tokio/CHANGELOG.md @@ -360,6 +360,16 @@ This release bumps the MSRV of Tokio to 1.56. ([#5559]) [#5513]: https://github.com/tokio-rs/tokio/pull/5513 [#5517]: https://github.com/tokio-rs/tokio/pull/5517 +# 1.25.2 (September 22, 2023) + +Forward ports 1.20.6 changes. + +### Changed + +- io: use `memchr` from `libc` ([#5960]) + +[#5960]: https://github.com/tokio-rs/tokio/pull/5960 + # 1.25.1 (May 28, 2023) Forward ports 1.18.6 changes. @@ -706,6 +716,16 @@ wasm32-wasi target is given unstable support for the `net` feature. [#4956]: https://github.com/tokio-rs/tokio/pull/4956 [#4959]: https://github.com/tokio-rs/tokio/pull/4959 +# 1.20.6 (September 22, 2023) + +This is a backport of a change from 1.27.0. + +### Changed + +- io: use `memchr` from `libc` ([#5960]) + +[#5960]: https://github.com/tokio-rs/tokio/pull/5960 + # 1.20.5 (May 28, 2023) Forward ports 1.18.6 changes. diff --git a/tokio/src/fs/file.rs b/tokio/src/fs/file.rs index cd140c6cc97..2590d305c46 100644 --- a/tokio/src/fs/file.rs +++ b/tokio/src/fs/file.rs @@ -2,7 +2,6 @@ //! //! [`File`]: File -use self::State::*; use crate::fs::{asyncify, OpenOptions}; use crate::io::blocking::Buf; use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; @@ -17,7 +16,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::Context; use std::task::Poll; -use std::task::Poll::*; #[cfg(test)] use super::mocks::JoinHandle; @@ -351,7 +349,7 @@ impl File { inner.complete_inflight().await; let mut buf = match inner.state { - Idle(ref mut buf_cell) => buf_cell.take().unwrap(), + State::Idle(ref mut buf_cell) => buf_cell.take().unwrap(), _ => unreachable!(), }; @@ -363,7 +361,7 @@ impl File { let std = self.std.clone(); - inner.state = Busy(spawn_blocking(move || { + inner.state = State::Busy(spawn_blocking(move || { let res = if let Some(seek) = seek { (&*std).seek(seek).and_then(|_| std.set_len(size)) } else { @@ -376,11 +374,11 @@ impl File { })); let (op, buf) = match inner.state { - Idle(_) => unreachable!(), - Busy(ref mut rx) => rx.await?, + State::Idle(_) => unreachable!(), + State::Busy(ref mut rx) => rx.await?, }; - inner.state = Idle(Some(buf)); + inner.state = State::Idle(Some(buf)); match op { Operation::Seek(res) => res.map(|pos| { @@ -532,51 +530,51 @@ impl AsyncRead for File { loop { match inner.state { - Idle(ref mut buf_cell) => { + State::Idle(ref mut buf_cell) => { let mut buf = buf_cell.take().unwrap(); if !buf.is_empty() { buf.copy_to(dst); *buf_cell = Some(buf); - return Ready(Ok(())); + return Poll::Ready(Ok(())); } buf.ensure_capacity_for(dst); let std = me.std.clone(); - inner.state = Busy(spawn_blocking(move || { + inner.state = State::Busy(spawn_blocking(move || { let res = buf.read_from(&mut &*std); (Operation::Read(res), buf) })); } - Busy(ref mut rx) => { + State::Busy(ref mut rx) => { let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?; match op { Operation::Read(Ok(_)) => { buf.copy_to(dst); - inner.state = Idle(Some(buf)); - return Ready(Ok(())); + inner.state = State::Idle(Some(buf)); + return Poll::Ready(Ok(())); } Operation::Read(Err(e)) => { assert!(buf.is_empty()); - inner.state = Idle(Some(buf)); - return Ready(Err(e)); + inner.state = State::Idle(Some(buf)); + return Poll::Ready(Err(e)); } Operation::Write(Ok(_)) => { assert!(buf.is_empty()); - inner.state = Idle(Some(buf)); + inner.state = State::Idle(Some(buf)); continue; } Operation::Write(Err(e)) => { assert!(inner.last_write_err.is_none()); inner.last_write_err = Some(e.kind()); - inner.state = Idle(Some(buf)); + inner.state = State::Idle(Some(buf)); } Operation::Seek(result) => { assert!(buf.is_empty()); - inner.state = Idle(Some(buf)); + inner.state = State::Idle(Some(buf)); if let Ok(pos) = result { inner.pos = pos; } @@ -595,11 +593,11 @@ impl AsyncSeek for File { let inner = me.inner.get_mut(); match inner.state { - Busy(_) => Err(io::Error::new( + State::Busy(_) => Err(io::Error::new( io::ErrorKind::Other, "other file operation is pending, call poll_complete before start_seek", )), - Idle(ref mut buf_cell) => { + State::Idle(ref mut buf_cell) => { let mut buf = buf_cell.take().unwrap(); // Factor in any unread data from the buf @@ -613,7 +611,7 @@ impl AsyncSeek for File { let std = me.std.clone(); - inner.state = Busy(spawn_blocking(move || { + inner.state = State::Busy(spawn_blocking(move || { let res = (&*std).seek(pos); (Operation::Seek(res), buf) })); @@ -628,10 +626,10 @@ impl AsyncSeek for File { loop { match inner.state { - Idle(_) => return Poll::Ready(Ok(inner.pos)), - Busy(ref mut rx) => { + State::Idle(_) => return Poll::Ready(Ok(inner.pos)), + State::Busy(ref mut rx) => { let (op, buf) = ready!(Pin::new(rx).poll(cx))?; - inner.state = Idle(Some(buf)); + inner.state = State::Idle(Some(buf)); match op { Operation::Read(_) => {} @@ -644,7 +642,7 @@ impl AsyncSeek for File { if let Ok(pos) = res { inner.pos = pos; } - return Ready(res); + return Poll::Ready(res); } } } @@ -664,12 +662,12 @@ impl AsyncWrite for File { let inner = me.inner.get_mut(); if let Some(e) = inner.last_write_err.take() { - return Ready(Err(e.into())); + return Poll::Ready(Err(e.into())); } loop { match inner.state { - Idle(ref mut buf_cell) => { + State::Idle(ref mut buf_cell) => { let mut buf = buf_cell.take().unwrap(); let seek = if !buf.is_empty() { @@ -694,13 +692,13 @@ impl AsyncWrite for File { io::Error::new(io::ErrorKind::Other, "background task failed") })?; - inner.state = Busy(blocking_task_join_handle); + inner.state = State::Busy(blocking_task_join_handle); - return Ready(Ok(n)); + return Poll::Ready(Ok(n)); } - Busy(ref mut rx) => { + State::Busy(ref mut rx) => { let (op, buf) = ready!(Pin::new(rx).poll(cx))?; - inner.state = Idle(Some(buf)); + inner.state = State::Idle(Some(buf)); match op { Operation::Read(_) => { @@ -735,12 +733,12 @@ impl AsyncWrite for File { let inner = me.inner.get_mut(); if let Some(e) = inner.last_write_err.take() { - return Ready(Err(e.into())); + return Poll::Ready(Err(e.into())); } loop { match inner.state { - Idle(ref mut buf_cell) => { + State::Idle(ref mut buf_cell) => { let mut buf = buf_cell.take().unwrap(); let seek = if !buf.is_empty() { @@ -765,13 +763,13 @@ impl AsyncWrite for File { io::Error::new(io::ErrorKind::Other, "background task failed") })?; - inner.state = Busy(blocking_task_join_handle); + inner.state = State::Busy(blocking_task_join_handle); - return Ready(Ok(n)); + return Poll::Ready(Ok(n)); } - Busy(ref mut rx) => { + State::Busy(ref mut rx) => { let (op, buf) = ready!(Pin::new(rx).poll(cx))?; - inner.state = Idle(Some(buf)); + inner.state = State::Idle(Some(buf)); match op { Operation::Read(_) => { @@ -896,21 +894,21 @@ impl Inner { fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll> { if let Some(e) = self.last_write_err.take() { - return Ready(Err(e.into())); + return Poll::Ready(Err(e.into())); } let (op, buf) = match self.state { - Idle(_) => return Ready(Ok(())), - Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?, + State::Idle(_) => return Poll::Ready(Ok(())), + State::Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?, }; // The buffer is not used here - self.state = Idle(Some(buf)); + self.state = State::Idle(Some(buf)); match op { - Operation::Read(_) => Ready(Ok(())), - Operation::Write(res) => Ready(res), - Operation::Seek(_) => Ready(Ok(())), + Operation::Read(_) => Poll::Ready(Ok(())), + Operation::Write(res) => Poll::Ready(res), + Operation::Seek(_) => Poll::Ready(Ok(())), } } } diff --git a/tokio/src/fs/mocks.rs b/tokio/src/fs/mocks.rs index aa01e24711e..b718ed54f95 100644 --- a/tokio/src/fs/mocks.rs +++ b/tokio/src/fs/mocks.rs @@ -124,12 +124,12 @@ impl Future for JoinHandle { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use std::task::Poll::*; + use std::task::Poll; match Pin::new(&mut self.rx).poll(cx) { - Ready(Ok(v)) => Ready(Ok(v)), - Ready(Err(e)) => panic!("error = {:?}", e), - Pending => Pending, + Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)), + Poll::Ready(Err(e)) => panic!("error = {:?}", e), + Poll::Pending => Poll::Pending, } } } diff --git a/tokio/src/future/mod.rs b/tokio/src/future/mod.rs index 084ddc571f5..7c883eb3c34 100644 --- a/tokio/src/future/mod.rs +++ b/tokio/src/future/mod.rs @@ -20,6 +20,7 @@ cfg_sync! { cfg_trace! { mod trace; + #[allow(unused_imports)] pub(crate) use trace::InstrumentedFuture as Future; } diff --git a/tokio/src/io/blocking.rs b/tokio/src/io/blocking.rs index 27d08b1fcc7..b988ec7180c 100644 --- a/tokio/src/io/blocking.rs +++ b/tokio/src/io/blocking.rs @@ -6,11 +6,8 @@ use std::future::Future; use std::io; use std::io::prelude::*; use std::pin::Pin; -use std::task::Poll::*; use std::task::{Context, Poll}; -use self::State::*; - /// `T` should not implement _both_ Read and Write. #[derive(Debug)] pub(crate) struct Blocking { @@ -58,38 +55,38 @@ where ) -> Poll> { loop { match self.state { - Idle(ref mut buf_cell) => { + State::Idle(ref mut buf_cell) => { let mut buf = buf_cell.take().unwrap(); if !buf.is_empty() { buf.copy_to(dst); *buf_cell = Some(buf); - return Ready(Ok(())); + return Poll::Ready(Ok(())); } buf.ensure_capacity_for(dst); let mut inner = self.inner.take().unwrap(); - self.state = Busy(sys::run(move || { + self.state = State::Busy(sys::run(move || { let res = buf.read_from(&mut inner); (res, buf, inner) })); } - Busy(ref mut rx) => { + State::Busy(ref mut rx) => { let (res, mut buf, inner) = ready!(Pin::new(rx).poll(cx))?; self.inner = Some(inner); match res { Ok(_) => { buf.copy_to(dst); - self.state = Idle(Some(buf)); - return Ready(Ok(())); + self.state = State::Idle(Some(buf)); + return Poll::Ready(Ok(())); } Err(e) => { assert!(buf.is_empty()); - self.state = Idle(Some(buf)); - return Ready(Err(e)); + self.state = State::Idle(Some(buf)); + return Poll::Ready(Err(e)); } } } @@ -109,7 +106,7 @@ where ) -> Poll> { loop { match self.state { - Idle(ref mut buf_cell) => { + State::Idle(ref mut buf_cell) => { let mut buf = buf_cell.take().unwrap(); assert!(buf.is_empty()); @@ -117,7 +114,7 @@ where let n = buf.copy_from(src); let mut inner = self.inner.take().unwrap(); - self.state = Busy(sys::run(move || { + self.state = State::Busy(sys::run(move || { let n = buf.len(); let res = buf.write_to(&mut inner).map(|_| n); @@ -125,11 +122,11 @@ where })); self.need_flush = true; - return Ready(Ok(n)); + return Poll::Ready(Ok(n)); } - Busy(ref mut rx) => { + State::Busy(ref mut rx) => { let (res, buf, inner) = ready!(Pin::new(rx).poll(cx))?; - self.state = Idle(Some(buf)); + self.state = State::Idle(Some(buf)); self.inner = Some(inner); // If error, return @@ -144,24 +141,24 @@ where let need_flush = self.need_flush; match self.state { // The buffer is not used here - Idle(ref mut buf_cell) => { + State::Idle(ref mut buf_cell) => { if need_flush { let buf = buf_cell.take().unwrap(); let mut inner = self.inner.take().unwrap(); - self.state = Busy(sys::run(move || { + self.state = State::Busy(sys::run(move || { let res = inner.flush().map(|_| 0); (res, buf, inner) })); self.need_flush = false; } else { - return Ready(Ok(())); + return Poll::Ready(Ok(())); } } - Busy(ref mut rx) => { + State::Busy(ref mut rx) => { let (res, buf, inner) = ready!(Pin::new(rx).poll(cx))?; - self.state = Idle(Some(buf)); + self.state = State::Idle(Some(buf)); self.inner = Some(inner); // If error, return diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index ebc7bbaff63..0c6fd0d2f2c 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -604,20 +604,19 @@ enum TryCurrentErrorKind { impl fmt::Debug for TryCurrentErrorKind { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - use TryCurrentErrorKind::*; match self { - NoContext => f.write_str("NoContext"), - ThreadLocalDestroyed => f.write_str("ThreadLocalDestroyed"), + TryCurrentErrorKind::NoContext => f.write_str("NoContext"), + TryCurrentErrorKind::ThreadLocalDestroyed => f.write_str("ThreadLocalDestroyed"), } } } impl fmt::Display for TryCurrentError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - use TryCurrentErrorKind::*; + use TryCurrentErrorKind as E; match self.kind { - NoContext => f.write_str(CONTEXT_MISSING_ERROR), - ThreadLocalDestroyed => f.write_str(THREAD_LOCAL_DESTROYED_ERROR), + E::NoContext => f.write_str(CONTEXT_MISSING_ERROR), + E::ThreadLocalDestroyed => f.write_str(THREAD_LOCAL_DESTROYED_ERROR), } } } diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index d848b803656..b6187b86667 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -28,7 +28,6 @@ use std::marker::PhantomPinned; use std::pin::Pin; use std::ptr::NonNull; use std::sync::atomic::Ordering::*; -use std::task::Poll::*; use std::task::{Context, Poll, Waker}; use std::{cmp, fmt}; @@ -391,7 +390,7 @@ impl Semaphore { let mut waiters = loop { // Has the semaphore closed? if curr & Self::CLOSED > 0 { - return Ready(Err(AcquireError::closed())); + return Poll::Ready(Err(AcquireError::closed())); } let mut remaining = 0; @@ -436,7 +435,7 @@ impl Semaphore { ) }); - return Ready(Ok(())); + return Poll::Ready(Ok(())); } else if lock.is_none() { break self.waiters.lock(); } @@ -448,7 +447,7 @@ impl Semaphore { }; if waiters.closed { - return Ready(Err(AcquireError::closed())); + return Poll::Ready(Err(AcquireError::closed())); } #[cfg(all(tokio_unstable, feature = "tracing"))] @@ -462,7 +461,7 @@ impl Semaphore { if node.assign_permits(&mut acquired) { self.add_permits_locked(acquired, waiters); - return Ready(Ok(())); + return Poll::Ready(Ok(())); } assert_eq!(acquired, 0); @@ -494,7 +493,7 @@ impl Semaphore { drop(waiters); drop(old_waker); - Pending + Poll::Pending } } @@ -572,15 +571,15 @@ impl Future for Acquire<'_> { let coop = ready!(crate::runtime::coop::poll_proceed(cx)); let result = match semaphore.poll_acquire(cx, needed, node, *queued) { - Pending => { + Poll::Pending => { *queued = true; - Pending + Poll::Pending } - Ready(r) => { + Poll::Ready(r) => { coop.made_progress(); r?; *queued = false; - Ready(Ok(())) + Poll::Ready(Ok(())) } }; diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index f24bb03fb4f..c7c0caf6c0e 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -241,7 +241,7 @@ impl Rx { /// Receive the next value pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll> { - use super::block::Read::*; + use super::block::Read; ready!(crate::trace::trace_leaf(cx)); @@ -254,12 +254,12 @@ impl Rx { macro_rules! try_recv { () => { match rx_fields.list.pop(&self.inner.tx) { - Some(Value(value)) => { + Some(Read::Value(value)) => { self.inner.semaphore.add_permit(); coop.made_progress(); return Ready(Some(value)); } - Some(Closed) => { + Some(Read::Closed) => { // TODO: This check may not be required as it most // likely can only return `true` at this point. A // channel is closed when all tx handles are diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs index be8093163e6..56b54a7786a 100644 --- a/tokio/src/sync/mutex.rs +++ b/tokio/src/sync/mutex.rs @@ -103,7 +103,7 @@ use std::{fmt, mem, ptr}; /// threads. /// 2. Each spawned task obtains a lock and releases it on every iteration. /// 3. Mutation of the data protected by the Mutex is done by de-referencing -/// the obtained lock as seen on lines 12 and 19. +/// the obtained lock as seen on lines 13 and 20. /// /// Tokio's Mutex works in a simple FIFO (first in, first out) style where all /// calls to [`lock`] complete in the order they were performed. In that way the diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index e1cf5603c2e..c94c2bc0fff 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -888,13 +888,11 @@ impl Notified<'_> { } fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> { - use State::*; - let (notify, state, notify_waiters_calls, waiter) = self.project(); 'outer_loop: loop { match *state { - Init => { + State::Init => { let curr = notify.state.load(SeqCst); // Optimistically try acquiring a pending notification @@ -907,7 +905,7 @@ impl Notified<'_> { if res.is_ok() { // Acquired the notification - *state = Done; + *state = State::Done; continue 'outer_loop; } @@ -925,7 +923,7 @@ impl Notified<'_> { // if notify_waiters has been called after the future // was created, then we are done if get_num_notify_waiters_calls(curr) != *notify_waiters_calls { - *state = Done; + *state = State::Done; continue 'outer_loop; } @@ -961,7 +959,7 @@ impl Notified<'_> { match res { Ok(_) => { // Acquired the notification - *state = Done; + *state = State::Done; continue 'outer_loop; } Err(actual) => { @@ -989,14 +987,14 @@ impl Notified<'_> { // Insert the waiter into the linked list waiters.push_front(NonNull::from(waiter)); - *state = Waiting; + *state = State::Waiting; drop(waiters); drop(old_waker); return Poll::Pending; } - Waiting => { + State::Waiting => { #[cfg(tokio_taskdump)] if let Some(waker) = waker { let mut ctx = Context::from_waker(waker); @@ -1009,7 +1007,7 @@ impl Notified<'_> { drop(unsafe { waiter.waker.with_mut(|waker| (*waker).take()) }); waiter.notification.clear(); - *state = Done; + *state = State::Done; return Poll::Ready(()); } @@ -1034,7 +1032,7 @@ impl Notified<'_> { drop(waiters); drop(old_waker); - *state = Done; + *state = State::Done; return Poll::Ready(()); } @@ -1056,7 +1054,7 @@ impl Notified<'_> { // The list is used in `notify_waiters`, so it must be guarded. unsafe { waiters.remove(NonNull::from(waiter)) }; - *state = Done; + *state = State::Done; } else { // Safety: we hold the lock, so we can modify the waker. unsafe { @@ -1090,7 +1088,7 @@ impl Notified<'_> { // Drop the old waker after releasing the lock. drop(old_waker); } - Done => { + State::Done => { #[cfg(tokio_taskdump)] if let Some(waker) = waker { let mut ctx = Context::from_waker(waker); @@ -1113,15 +1111,13 @@ impl Future for Notified<'_> { impl Drop for Notified<'_> { fn drop(&mut self) { - use State::*; - // Safety: The type only transitions to a "Waiting" state when pinned. let (notify, state, _, waiter) = unsafe { Pin::new_unchecked(self).project() }; // This is where we ensure safety. The `Notified` value is being // dropped, which means we must ensure that the waiter entry is no // longer stored in the linked list. - if matches!(*state, Waiting) { + if matches!(*state, State::Waiting) { let mut waiters = notify.waiters.lock(); let mut notify_state = notify.state.load(SeqCst); diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 3f7db8a3cca..08a86f4b9f7 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -47,7 +47,7 @@ use std::sync::Arc; /// } /// ``` /// -/// Limit the number of simultaneously opened files in your program. +/// ## Limit the number of simultaneously opened files in your program. /// /// Most operating systems have limits on the number of open file /// handles. Even in systems without explicit limits, resource constraints @@ -76,7 +76,7 @@ use std::sync::Arc; /// } /// ``` /// -/// Limit the number of incoming requests being handled at the same time. +/// ## Limit the number of incoming requests being handled at the same time. /// /// Similar to limiting the number of simultaneously opened files, network handles /// are a limited resource. Allowing an unbounded amount of requests to be processed @@ -123,6 +123,93 @@ use std::sync::Arc; /// # } /// ``` /// +/// ## Rate limiting using a token bucket +/// +/// Many applications and systems have constraints on the rate at which certain +/// operations should occur. Exceeding this rate can result in suboptimal +/// performance or even errors. +/// +/// This example implements rate limiting using a [token bucket]. A token bucket is a form of rate +/// limiting that doesn't kick in immediately, to allow for short bursts of incoming requests that +/// arrive at the same time. +/// +/// With a token bucket, each incoming request consumes a token, and the tokens are refilled at a +/// certain rate that defines the rate limit. When a burst of requests arrives, tokens are +/// immediately given out until the bucket is empty. Once the bucket is empty, requests will have to +/// wait for new tokens to be added. +/// +/// Unlike the example that limits how many requests can be handled at the same time, we do not add +/// tokens back when we finish handling a request. Instead, tokens are added only by a timer task. +/// +/// Note that this implementation is suboptimal when the duration is small, because it consumes a +/// lot of cpu constantly looping and sleeping. +/// +/// [token bucket]: https://en.wikipedia.org/wiki/Token_bucket +/// ``` +/// use std::sync::Arc; +/// use tokio::sync::Semaphore; +/// use tokio::time::{interval, Duration}; +/// +/// struct TokenBucket { +/// sem: Arc, +/// jh: tokio::task::JoinHandle<()>, +/// } +/// +/// impl TokenBucket { +/// fn new(duration: Duration, capacity: usize) -> Self { +/// let sem = Arc::new(Semaphore::new(capacity)); +/// +/// // refills the tokens at the end of each interval +/// let jh = tokio::spawn({ +/// let sem = sem.clone(); +/// let mut interval = interval(duration); +/// interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); +/// +/// async move { +/// loop { +/// interval.tick().await; +/// +/// if sem.available_permits() < capacity { +/// sem.add_permits(1); +/// } +/// } +/// } +/// }); +/// +/// Self { jh, sem } +/// } +/// +/// async fn acquire(&self) { +/// // This can return an error if the semaphore is closed, but we +/// // never close it, so just ignore errors. +/// let _ = self.sem.acquire().await; +/// } +/// } +/// +/// impl Drop for TokenBucket { +/// fn drop(&mut self) { +/// // Kill the background task so it stops taking up resources when we +/// // don't need it anymore. +/// self.jh.abort(); +/// } +/// } +/// +/// #[tokio::main] +/// # async fn _hidden() {} +/// # #[tokio::main(flavor = "current_thread", start_paused = true)] +/// async fn main() { +/// let capacity = 5; +/// let update_interval = Duration::from_secs_f32(1.0 / capacity as f32); +/// let bucket = TokenBucket::new(update_interval, capacity); +/// +/// for _ in 0..5 { +/// bucket.acquire().await; +/// +/// // do the operation +/// } +/// } +/// ``` +/// /// [`PollSemaphore`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSemaphore.html /// [`Semaphore::acquire_owned`]: crate::sync::Semaphore::acquire_owned #[derive(Debug)] diff --git a/tokio/src/sync/tests/loom_list.rs b/tokio/src/sync/tests/loom_list.rs index 4067f865ce4..9015835e095 100644 --- a/tokio/src/sync/tests/loom_list.rs +++ b/tokio/src/sync/tests/loom_list.rs @@ -5,7 +5,7 @@ use std::sync::Arc; #[test] fn smoke() { - use crate::sync::mpsc::block::Read::*; + use crate::sync::mpsc::block::Read; const NUM_TX: usize = 2; const NUM_MSG: usize = 2; @@ -28,7 +28,7 @@ fn smoke() { loop { match rx.pop(&tx) { - Some(Value((th, v))) => { + Some(Read::Value((th, v))) => { assert_eq!(v, next[th]); next[th] += 1; @@ -36,7 +36,7 @@ fn smoke() { break; } } - Some(Closed) => { + Some(Read::Closed) => { panic!(); } None => { diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 67b6bdbd4c8..14b22cbd5ee 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -715,7 +715,7 @@ impl Receiver { changed_impl(&self.shared, &mut self.version).await } - /// Waits for a value that satisifes the provided condition. + /// Waits for a value that satisfies the provided condition. /// /// This method will call the provided closure whenever something is sent on /// the channel. Once the closure returns `true`, this method will return a @@ -788,8 +788,23 @@ impl Receiver { let has_changed = self.version != new_version; self.version = new_version; - if (!closed || has_changed) && f(&inner) { - return Ok(Ref { inner, has_changed }); + if !closed || has_changed { + let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&inner))); + match result { + Ok(true) => { + return Ok(Ref { inner, has_changed }); + } + Ok(false) => { + // Skip the value. + } + Err(panicked) => { + // Drop the read-lock to avoid poisoning it. + drop(inner); + // Forward the panic to the caller. + panic::resume_unwind(panicked); + // Unreachable + } + }; } } @@ -840,7 +855,7 @@ fn maybe_changed( } if state.is_closed() { - // All receivers have dropped. + // The sender has been dropped. return Some(Err(error::RecvError(()))); } diff --git a/tokio/src/time/error.rs b/tokio/src/time/error.rs index 71344d43487..3d6025f5f29 100644 --- a/tokio/src/time/error.rs +++ b/tokio/src/time/error.rs @@ -1,6 +1,5 @@ //! Time error types. -use self::Kind::*; use std::error; use std::fmt; @@ -57,7 +56,7 @@ pub(crate) enum InsertError { impl Error { /// Creates an error representing a shutdown timer. pub fn shutdown() -> Error { - Error(Shutdown) + Error(Kind::Shutdown) } /// Returns `true` if the error was caused by the timer being shutdown. @@ -67,7 +66,7 @@ impl Error { /// Creates an error representing a timer at capacity. pub fn at_capacity() -> Error { - Error(AtCapacity) + Error(Kind::AtCapacity) } /// Returns `true` if the error was caused by the timer being at capacity. @@ -77,7 +76,7 @@ impl Error { /// Creates an error representing a misconfigured timer. pub fn invalid() -> Error { - Error(Invalid) + Error(Kind::Invalid) } /// Returns `true` if the error was caused by the timer being misconfigured. @@ -90,11 +89,12 @@ impl error::Error for Error {} impl fmt::Display for Error { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - use self::Kind::*; let descr = match self.0 { - Shutdown => "the timer is shutdown, must be called from the context of Tokio runtime", - AtCapacity => "timer is at capacity and cannot create a new entry", - Invalid => "timer duration exceeds maximum duration", + Kind::Shutdown => { + "the timer is shutdown, must be called from the context of Tokio runtime" + } + Kind::AtCapacity => "timer is at capacity and cannot create a new entry", + Kind::Invalid => "timer duration exceeds maximum duration", }; write!(fmt, "{}", descr) }