Skip to content

Commit

Permalink
Merge branch 'master' into socketaddr_abstract_namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn committed Nov 25, 2023
2 parents 6c1d36c + a8e8fa6 commit 43e312b
Show file tree
Hide file tree
Showing 18 changed files with 382 additions and 13 deletions.
39 changes: 37 additions & 2 deletions tokio-util/src/time/delay_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,39 @@ impl<T> DelayQueue<T> {
}
}

/// Returns the deadline of the item associated with `key`.
///
/// Since the queue operates at millisecond granularity, the returned
/// deadline may not exactly match the value that was given when initially
/// inserting the item into the queue.
///
/// # Panics
///
/// This function panics if `key` is not contained by the queue.
///
/// # Examples
///
/// Basic usage
///
/// ```rust
/// use tokio_util::time::DelayQueue;
/// use std::time::Duration;
///
/// # #[tokio::main]
/// # async fn main() {
/// let mut delay_queue = DelayQueue::new();
///
/// let key1 = delay_queue.insert("foo", Duration::from_secs(5));
/// let key2 = delay_queue.insert("bar", Duration::from_secs(10));
///
/// assert!(delay_queue.deadline(&key1) < delay_queue.deadline(&key2));
/// # }
/// ```
#[track_caller]
pub fn deadline(&self, key: &Key) -> Instant {
self.start + Duration::from_millis(self.slab[*key].when)
}

/// Removes the key from the expired queue or the timer wheel
/// depending on its expiration status.
///
Expand Down Expand Up @@ -909,8 +942,10 @@ impl<T> DelayQueue<T> {
self.expired.peek().or_else(|| self.wheel.peek())
}

/// Returns the next time to poll as determined by the wheel
fn next_deadline(&mut self) -> Option<Instant> {
/// Returns the next time to poll as determined by the wheel.
///
/// Note that this does not include deadlines in the `expired` queue.
fn next_deadline(&self) -> Option<Instant> {
self.wheel
.poll_at()
.map(|poll_at| self.start + Duration::from_millis(poll_at))
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/doc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#[derive(Debug)]
pub enum NotDefinedHere {}

#[cfg(feature = "net")]
impl mio::event::Source for NotDefinedHere {
fn register(
&mut self,
Expand All @@ -42,4 +43,5 @@ impl mio::event::Source for NotDefinedHere {
}
}

#[cfg(feature = "net")]
pub mod os;
5 changes: 5 additions & 0 deletions tokio/src/io/util/async_write_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ cfg_io_util! {
///
/// // Writes some prefix of the byte string, not necessarily all of it.
/// file.write(b"some bytes").await?;
/// file.flush().await?;
/// Ok(())
/// }
/// ```
Expand Down Expand Up @@ -162,6 +163,7 @@ cfg_io_util! {
/// ];
///
/// file.write_vectored(&bufs).await?;
/// file.flush().await?;
///
/// Ok(())
/// }
Expand Down Expand Up @@ -244,6 +246,7 @@ cfg_io_util! {
/// // all of it.
/// file.write_buf(&mut buffer).await?;
/// }
/// file.flush().await?;
///
/// Ok(())
/// }
Expand Down Expand Up @@ -307,6 +310,7 @@ cfg_io_util! {
/// let mut buffer = Cursor::new(b"data to write");
///
/// file.write_all_buf(&mut buffer).await?;
/// file.flush().await?;
/// Ok(())
/// }
/// ```
Expand Down Expand Up @@ -356,6 +360,7 @@ cfg_io_util! {
/// let mut file = File::create("foo.txt").await?;
///
/// file.write_all(b"some bytes").await?;
/// file.flush().await?;
/// Ok(())
/// }
/// ```
Expand Down
4 changes: 3 additions & 1 deletion tokio/src/runtime/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ cfg_rt! {

#[track_caller]
pub(super) fn with_scheduler<R>(f: impl FnOnce(Option<&scheduler::Context>) -> R) -> R {
CONTEXT.with(|c| c.scheduler.with(f))
let mut f = Some(f);
CONTEXT.try_with(|c| c.scheduler.with(f.take().unwrap()))
.unwrap_or_else(|_| (f.take().unwrap())(None))
}

cfg_taskdump! {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ cfg_taskdump! {
/// `.cargo/config.toml`:
/// ```text
/// [build]
/// rustflags = ["--cfg tokio_unstable", "--cfg tokio_taskdump"]
/// rustflags = ["--cfg", "tokio_unstable", "--cfg", "tokio_taskdump"]
/// ```
///
/// [cargo-config]:
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ impl Core {
);

// Take at least one task since the first task is returned directly
// and nto pushed onto the local queue.
// and not pushed onto the local queue.
let n = usize::max(1, n);

let mut synced = worker.handle.shared.synced.lock();
Expand Down
1 change: 1 addition & 0 deletions tokio/src/sync/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl Barrier {
let resource_span = {
let location = std::panic::Location::caller();
let resource_span = tracing::trace_span!(
parent: None,
"runtime.resource",
concrete_type = "Barrier",
kind = "Sync",
Expand Down
1 change: 1 addition & 0 deletions tokio/src/sync/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ impl<T: ?Sized> Mutex<T> {
let location = std::panic::Location::caller();

tracing::trace_span!(
parent: None,
"runtime.resource",
concrete_type = "Mutex",
kind = "Sync",
Expand Down
1 change: 1 addition & 0 deletions tokio/src/sync/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let location = std::panic::Location::caller();

let resource_span = tracing::trace_span!(
parent: None,
"runtime.resource",
concrete_type = "Sender|Receiver",
kind = "Sync",
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/sync/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ impl<T: ?Sized> RwLock<T> {
let resource_span = {
let location = std::panic::Location::caller();
let resource_span = tracing::trace_span!(
parent: None,
"runtime.resource",
concrete_type = "RwLock",
kind = "Sync",
Expand Down Expand Up @@ -282,6 +283,7 @@ impl<T: ?Sized> RwLock<T> {
let location = std::panic::Location::caller();

let resource_span = tracing::trace_span!(
parent: None,
"runtime.resource",
concrete_type = "RwLock",
kind = "Sync",
Expand Down
1 change: 1 addition & 0 deletions tokio/src/sync/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ impl Semaphore {
let location = std::panic::Location::caller();

tracing::trace_span!(
parent: None,
"runtime.resource",
concrete_type = "Semaphore",
kind = "Sync",
Expand Down
6 changes: 5 additions & 1 deletion tokio/src/task/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,10 @@ impl LocalSet {
/// will remain on the local set, and will be driven on subsequent calls to
/// `run_until` or when [awaiting the local set] itself.
///
/// # Cancel safety
///
/// This method is cancel safe when `future` is cancel safe.
///
/// # Examples
///
/// ```rust
Expand Down Expand Up @@ -1165,7 +1169,7 @@ mod tests {
// Does a `LocalSet` running on a current-thread runtime...basically work?
//
// This duplicates a test in `tests/task_local_set.rs`, but because this is
// a lib test, it wil run under Miri, so this is necessary to catch stacked
// a lib test, it will run under Miri, so this is necessary to catch stacked
// borrows violations in the `LocalSet` implementation.
#[test]
fn local_current_thread_scheduler() {
Expand Down
1 change: 1 addition & 0 deletions tokio/src/time/interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ fn internal_interval_at(
let location = location.expect("should have location if tracing");

tracing::trace_span!(
parent: None,
"runtime.resource",
concrete_type = "Interval",
kind = "timer",
Expand Down
1 change: 1 addition & 0 deletions tokio/src/util/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ cfg_trace! {
let location = std::panic::Location::caller();
tracing::trace_span!(
target: "tokio::task",
parent: None,
"runtime.spawn",
%kind,
task.name = %name.unwrap_or_default(),
Expand Down
62 changes: 62 additions & 0 deletions tokio/tests/rt_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1366,4 +1366,66 @@ rt_test! {
th.join().unwrap();
}
}

#[test]
#[cfg_attr(target_family = "wasm", ignore)]
fn wake_by_ref_from_thread_local() {
wake_from_thread_local(true);
}

#[test]
#[cfg_attr(target_family = "wasm", ignore)]
fn wake_by_val_from_thread_local() {
wake_from_thread_local(false);
}

fn wake_from_thread_local(by_ref: bool) {
use std::cell::RefCell;
use std::sync::mpsc::{channel, Sender};
use std::task::Waker;

struct TLData {
by_ref: bool,
waker: Option<Waker>,
done: Sender<bool>,
}

impl Drop for TLData {
fn drop(&mut self) {
if self.by_ref {
self.waker.take().unwrap().wake_by_ref();
} else {
self.waker.take().unwrap().wake();
}
let _ = self.done.send(true);
}
}

std::thread_local! {
static TL_DATA: RefCell<Option<TLData>> = RefCell::new(None);
};

let (send, recv) = channel();

std::thread::spawn(move || {
let rt = rt();
rt.block_on(rt.spawn(poll_fn(move |cx| {
let waker = cx.waker().clone();
let send = send.clone();
TL_DATA.with(|tl| {
tl.replace(Some(TLData {
by_ref,
waker: Some(waker),
done: send,
}));
});
Poll::Ready(())
})))
.unwrap();
})
.join()
.unwrap();

assert!(recv.recv().unwrap());
}
}
2 changes: 1 addition & 1 deletion tokio/tests/rt_handle_block_on.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

// All io tests that deal with shutdown is currently ignored because there are known bugs in with
// shutting down the io driver while concurrently registering new resources. See
// https://github.com/tokio-rs/tokio/pull/3569#pullrequestreview-612703467 fo more details.
// https://github.com/tokio-rs/tokio/pull/3569#pullrequestreview-612703467 for more details.
//
// When this has been fixed we want to re-enable these tests.

Expand Down
15 changes: 9 additions & 6 deletions tokio/tests/rt_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,14 +471,17 @@ fn worker_overflow_count() {

// First, we need to block the other worker until all tasks have
// been spawned.
tokio::spawn(async move {
tx1.send(()).unwrap();
rx2.recv().unwrap();
//
// We spawn from outside the runtime to ensure that the other worker
// will pick it up:
// <https://github.com/tokio-rs/tokio/issues/4730>
tokio::task::spawn_blocking(|| {
tokio::spawn(async move {
tx1.send(()).unwrap();
rx2.recv().unwrap();
});
});

// Bump the next-run spawn
tokio::spawn(async {});

rx1.recv().unwrap();

// Spawn many tasks
Expand Down
Loading

0 comments on commit 43e312b

Please sign in to comment.