Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge 'tokio-1.25.3' into 'tokio-1.32.x' #6227

Merged
merged 3 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions tokio/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,13 @@ This release bumps the MSRV of Tokio to 1.56. ([#5559])
[#5513]: https://github.com/tokio-rs/tokio/pull/5513
[#5517]: https://github.com/tokio-rs/tokio/pull/5517

# 1.25.3 (December 17th, 2023)

### Fixed
- io: add budgeting to `tokio::runtime::io::registration::async_io` ([#6221])

[#6221]: https://github.com/tokio-rs/tokio/pull/6221

# 1.25.2 (September 22, 2023)

Forward ports 1.20.6 changes.
Expand Down
16 changes: 8 additions & 8 deletions tokio/src/runtime/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,23 @@ tokio_thread_local! {
#[cfg(feature = "rt")]
thread_id: Cell::new(None),

/// Tracks the current runtime handle to use when spawning,
/// accessing drivers, etc...
// Tracks the current runtime handle to use when spawning,
// accessing drivers, etc...
#[cfg(feature = "rt")]
current: current::HandleCell::new(),

/// Tracks the current scheduler internal context
// Tracks the current scheduler internal context
#[cfg(feature = "rt")]
scheduler: Scoped::new(),

#[cfg(feature = "rt")]
current_task_id: Cell::new(None),

/// Tracks if the current thread is currently driving a runtime.
/// Note, that if this is set to "entered", the current scheduler
/// handle may not reference the runtime currently executing. This
/// is because other runtime handles may be set to current from
/// within a runtime.
// Tracks if the current thread is currently driving a runtime.
// Note, that if this is set to "entered", the current scheduler
// handle may not reference the runtime currently executing. This
// is because other runtime handles may be set to current from
// within a runtime.
#[cfg(feature = "rt")]
runtime: Cell::new(EnterRuntime::NotEntered),

Expand Down
7 changes: 6 additions & 1 deletion tokio/src/runtime/io/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,16 @@ impl Registration {
loop {
let event = self.readiness(interest).await?;

let coop = crate::future::poll_fn(crate::runtime::coop::poll_proceed).await;

match f() {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.clear_readiness(event);
}
x => return x,
x => {
coop.made_progress();
return x;
}
}
}
}
Expand Down
77 changes: 77 additions & 0 deletions tokio/tests/coop_budger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", target_os = "linux"))]

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::net::UdpSocket;

/// Ensure that UDP sockets have functional budgeting
///
/// # Design
/// Two sockets communicate by spamming packets from one to the other.
///
/// In Linux, this packet will be slammed through the entire network stack and into the receiver's buffer during the
/// send system call because we are using the loopback interface.
/// This happens because the softirq chain invoked on send when using the loopback interface covers virtually the
/// entirety of the lifecycle of a packet within the kernel network stack.
///
/// As a result, neither socket will ever encounter an EWOULDBLOCK, and the only way for these to yield during the loop
/// is through budgeting.
///
/// A second task runs in the background and increments a counter before yielding, allowing us to know how many times sockets yielded.
/// Since we are both sending and receiving, that should happen once per 64 packets, because budgets are of size 128
/// and there are two budget events per packet, a send and a recv.
#[tokio::test]
async fn coop_budget_udp_send_recv() {
const BUDGET: usize = 128;
const N_ITERATIONS: usize = 1024;

const PACKET: &[u8] = b"Hello, world";
const PACKET_LEN: usize = 12;

assert_eq!(
PACKET_LEN,
PACKET.len(),
"Defect in test, programmer can't do math"
);

// bind each socket to a dynamic port, forcing IPv4 addressing on the localhost interface
let tx = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let rx = UdpSocket::bind("127.0.0.1:0").await.unwrap();

tx.connect(rx.local_addr().unwrap()).await.unwrap();
rx.connect(tx.local_addr().unwrap()).await.unwrap();

let tracker = Arc::new(AtomicUsize::default());

let tracker_clone = Arc::clone(&tracker);

tokio::task::yield_now().await;

tokio::spawn(async move {
loop {
tracker_clone.fetch_add(1, Ordering::SeqCst);

tokio::task::yield_now().await;
}
});

for _ in 0..N_ITERATIONS {
tx.send(PACKET).await.unwrap();

let mut tmp = [0; PACKET_LEN];

// ensure that we aren't somehow accumulating other
assert_eq!(
PACKET_LEN,
rx.recv(&mut tmp).await.unwrap(),
"Defect in test case, received unexpected result from socket"
);
assert_eq!(
PACKET, &tmp,
"Defect in test case, received unexpected result from socket"
);
}

assert_eq!(N_ITERATIONS / (BUDGET / 2), tracker.load(Ordering::SeqCst));
}
Loading