Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a configuration option to skip the lifo_slot optimization #4051

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ pub struct Builder {
/// Only used when not using the current-thread executor.
worker_threads: Option<usize>,

/// Whether or not to enable to the lifo_slot optimization
lifo_slot_optimization: bool,

/// Cap on thread usage.
max_blocking_threads: usize,

Expand Down Expand Up @@ -126,6 +129,9 @@ impl Builder {

max_blocking_threads: 512,

// Default to using the optimization
lifo_slot_optimization: true,

// Default thread name
thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()),

Expand Down Expand Up @@ -220,6 +226,43 @@ impl Builder {
self
}

/// Configures whether or not the `Runtime` will use an optimization that
/// causes recently spawned tasks to be scheduled first. This defaults to `true`,
/// as this optimization improves latencies in many patterns, but may be undesired
/// behavior
///
/// # Default
///
/// The default value is `true`
///
/// # Panic
///
/// When using the `current_thread` runtime this method will panic, since
/// those variants do not allow setting worker thread counts.
///
///
/// # Examples
///
/// ## Multi threaded runtime with 4 threads
///
/// ```
/// use tokio::runtime;
///
/// // This will spawn a work-stealing runtime with the optimization turned off
/// let rt = runtime::Builder::new_multi_thread()
/// .lifo_slot_optimization(true)
/// .build()
/// .unwrap();
///
/// rt.spawn(async move {});
/// ```
pub fn lifo_slot_optimization(&mut self, yes: bool) -> &mut Self {
// Doesn't actually panic as described in the docs, to match the current
// behavior of `worker_threads`
self.lifo_slot_optimization = yes;
self
}

/// Specifies the limit for additional threads spawned by the Runtime.
///
/// These threads are used for blocking operations like tasks spawned
Expand Down Expand Up @@ -546,7 +589,7 @@ cfg_rt_multi_thread! {

let (driver, resources) = driver::Driver::new(self.get_cfg())?;

let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver));
let (scheduler, launch) = ThreadPool::new(core_threads, self.lifo_slot_optimization, Parker::new(driver));
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());

// Create the blocking pool
Expand Down Expand Up @@ -580,6 +623,7 @@ impl fmt::Debug for Builder {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Builder")
.field("worker_threads", &self.worker_threads)
.field("lifo_slot_optimization", &self.lifo_slot_optimization)
.field("max_blocking_threads", &self.max_blocking_threads)
.field(
"thread_name",
Expand Down
8 changes: 6 additions & 2 deletions tokio/src/runtime/thread_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ pub(crate) struct Spawner {
// ===== impl ThreadPool =====

impl ThreadPool {
pub(crate) fn new(size: usize, parker: Parker) -> (ThreadPool, Launch) {
let (shared, launch) = worker::create(size, parker);
pub(crate) fn new(
size: usize,
lifo_slot_optimization: bool,
parker: Parker,
) -> (ThreadPool, Launch) {
let (shared, launch) = worker::create(size, lifo_slot_optimization, parker);
let spawner = Spawner { shared };
let thread_pool = ThreadPool { spawner };

Expand Down
38 changes: 28 additions & 10 deletions tokio/src/runtime/thread_pool/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ struct Core {
/// queue. This effectively results in the **last** scheduled task to be run
/// next (LIFO). This is an optimization for message passing patterns and
/// helps to reduce latency.
lifo_slot: Option<Notified>,
///
/// The first layer of `Option` is whether or not the slot is configured for use or not
lifo_slot: Option<Option<Notified>>,

/// The worker-local run queue.
run_queue: queue::Local<Arc<Shared>>,
Expand Down Expand Up @@ -174,7 +176,11 @@ type Notified = task::Notified<Arc<Shared>>;
// Tracks thread-local state
scoped_thread_local!(static CURRENT: Context);

pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
pub(super) fn create(
size: usize,
lifo_slot_optimization: bool,
park: Parker,
) -> (Arc<Shared>, Launch) {
let mut cores = vec![];
let mut remotes = vec![];

Expand All @@ -187,7 +193,11 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {

cores.push(Box::new(Core {
tick: 0,
lifo_slot: None,
lifo_slot: if lifo_slot_optimization {
Some(None)
} else {
None
},
run_queue,
is_searching: false,
is_shutdown: false,
Expand Down Expand Up @@ -408,8 +418,11 @@ impl Context {
};

// Check for a task in the LIFO slot
let task = match core.lifo_slot.take() {
Some(task) => task,
let task = match &mut core.lifo_slot {
Some(lifo_slot) => match lifo_slot.take() {
Some(task) => task,
None => return Ok(core),
},
None => return Ok(core),
};

Expand Down Expand Up @@ -503,7 +516,10 @@ impl Core {
}

fn next_local_task(&mut self) -> Option<Notified> {
self.lifo_slot.take().or_else(|| self.run_queue.pop())
self.lifo_slot
.as_mut()
.and_then(|lifo_slot| lifo_slot.take())
.or_else(|| self.run_queue.pop())
}

fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
Expand Down Expand Up @@ -573,7 +589,7 @@ impl Core {
fn transition_from_parked(&mut self, worker: &Worker) -> bool {
// If a task is in the lifo slot, then we must unpark regardless of
// being notified
if self.lifo_slot.is_some() {
if matches!(self.lifo_slot, Some(Some(_))) {
worker.shared.idle.unpark_worker_by_id(worker.index);
self.is_searching = true;
return true;
Expand Down Expand Up @@ -675,19 +691,21 @@ impl Shared {
// task must always be pushed to the back of the queue, enabling other
// tasks to be executed. If **not** a yield, then there is more
// flexibility and the task may go to the front of the queue.
let should_notify = if is_yield {
let should_notify = if is_yield || core.lifo_slot.is_none() {
core.run_queue.push_back(task, &self.inject);
true
} else {
// Push to the LIFO slot
let prev = core.lifo_slot.take();

// We checked `is_none` above
let prev = core.lifo_slot.as_mut().unwrap().take();
let ret = prev.is_some();

if let Some(prev) = prev {
core.run_queue.push_back(prev, &self.inject);
}

core.lifo_slot = Some(task);
core.lifo_slot = Some(Some(task));

ret
};
Expand Down
37 changes: 36 additions & 1 deletion tokio/tests/rt_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime::{self, Runtime};
use tokio::runtime::{self, Builder, Runtime};
use tokio::sync::oneshot;
use tokio_test::{assert_err, assert_ok};

Expand Down Expand Up @@ -54,6 +54,41 @@ fn many_oneshot_futures() {
drop(rt);
}
}

#[test]
fn no_lifo_slot_complex() {
// used for notifying the main thread
const NUM: usize = 1_000;

for _ in 0..5 {
let (tx, rx) = mpsc::channel();

let rt = Builder::new_multi_thread()
.lifo_slot_optimization(false)
.build()
.unwrap();
let cnt = Arc::new(AtomicUsize::new(0));

for _ in 0..NUM {
let cnt = cnt.clone();
let tx = tx.clone();

rt.spawn(async move {
let num = cnt.fetch_add(1, Relaxed) + 1;

if num == NUM {
tx.send(()).unwrap();
}
});
}

rx.recv().unwrap();

// Wait for the pool to shutdown
drop(rt);
}
}

#[test]
fn many_multishot_futures() {
const CHAIN: usize = 200;
Expand Down