Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce the lock contention in task spawn. #6001

Merged
merged 104 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
104 commits
Select commit Hold shift + click to select a range
895958f
reduce the lock contention in task spawn.
wathenjiang Sep 11, 2023
ae64dfe
Merge branch 'master' into reduce-lock-contention
wathenjiang Sep 12, 2023
5ffbf01
rm extra criterion in dependencies
wathenjiang Sep 12, 2023
8e2c0b2
restart a ci
wathenjiang Sep 12, 2023
33ab489
fix for each
wathenjiang Sep 12, 2023
ab2452c
fix for_each
wathenjiang Sep 12, 2023
a500a79
reduce the size of header
wathenjiang Sep 12, 2023
632a8d3
code refactor in list.rs
wathenjiang Sep 13, 2023
6453017
change ordering of atomic
wathenjiang Sep 13, 2023
9bfb4f1
fix iterate in close_and_shutdown
wathenjiang Sep 13, 2023
41ee62a
Merge branch 'master' into reduce-lock-contention
wathenjiang Sep 13, 2023
b4ac885
fix iterate in close_and_shutdown
wathenjiang Sep 13, 2023
4b386c5
fix atomic method
wathenjiang Sep 13, 2023
9ded74b
rm type CountedOwnedTasksInner
wathenjiang Sep 20, 2023
191edf6
refactor for_each
wathenjiang Sep 20, 2023
06a675c
Merge branch 'master' into reduce-lock-contention
wathenjiang Sep 20, 2023
2be70c4
use atomic type in loom instead
wathenjiang Sep 20, 2023
17b0be9
fix: put read closed flag after got the lock for avoid concurrency pr…
wathenjiang Sep 20, 2023
3bb484e
introduce random make shutdown faster
wathenjiang Sep 20, 2023
e325825
use grain instead
wathenjiang Sep 20, 2023
a11f80c
fix: dead lock
wathenjiang Sep 20, 2023
259582e
Merge branch 'master' into reduce-lock-contention
wathenjiang Sep 20, 2023
cab1f58
fix: use list_inner to offer lock of list
wathenjiang Sep 21, 2023
389e6b9
fix: use segment_size instead of grain
wathenjiang Sep 21, 2023
833377c
clippy
wathenjiang Sep 21, 2023
5422895
feat: let spawn_concurrency_level configurable
wathenjiang Sep 21, 2023
b90101a
feat: update benchmark
wathenjiang Sep 21, 2023
68af71a
feat: add benchmarks for spawn_concurrency_level
wathenjiang Sep 21, 2023
8e4716a
change spawn_concurrency_level to be 4 times the number of worker thr…
wathenjiang Sep 22, 2023
4a3ff7a
Merge branch 'master' into reduce-lock-contention
wathenjiang Sep 22, 2023
7bbc2e4
change benchmark tests name from shutdown_parallel_multi_thread to sh…
wathenjiang Sep 22, 2023
c15c0bd
fix the comments on spawn_concurrency_level
wathenjiang Sep 22, 2023
6706a35
add comments for parameter in OwnedTasks.close_and_shutdown_all
wathenjiang Sep 22, 2023
d2c7668
fix comments
wathenjiang Sep 22, 2023
5105610
style nit: simplify code, apply suggestions from hawkw
wathenjiang Sep 24, 2023
a30df11
make spawn_concurrency_level is constant 4 in loom test
wathenjiang Sep 25, 2023
df4ab61
use Header::get_id to get task_id
wathenjiang Sep 28, 2023
63a7679
change owned_id to u64 back
wathenjiang Sep 28, 2023
b2010d7
refactor: use local_staic in loom
wathenjiang Sep 28, 2023
052e141
fix: OwnedTasks get all locks first
wathenjiang Sep 29, 2023
65670eb
fix: rm segment_size field of OwnedTasks, use method to return it ins…
wathenjiang Sep 29, 2023
08d7d0c
feat: make spawn_concurrency_level to be a unstable api
wathenjiang Oct 1, 2023
26621d4
feat: rm shutdown flag
wathenjiang Oct 1, 2023
5b010bc
rm benches/spawn_concurrent.rs because it is unstable now in tokio
wathenjiang Oct 1, 2023
66fa190
use get_unchecked to get segment lock
wathenjiang Oct 3, 2023
2ac0b96
feat: drop lock promptly and explicitly
wathenjiang Oct 15, 2023
47820b3
feat: move the atomic operations of count into the lock
wathenjiang Oct 15, 2023
d0acd70
Merge branch 'master' into reduce-lock-contention
wathenjiang Oct 17, 2023
7ef0265
first commit
wathenjiang Oct 23, 2023
01da1ed
add Safety
wathenjiang Oct 23, 2023
6f5eaa2
mutable ref to immutable ref
wathenjiang Oct 23, 2023
3257cb7
use std AtomicUsize
wathenjiang Oct 23, 2023
7b101ee
fix: count sub in pop_back
wathenjiang Oct 23, 2023
608d2c6
refactor doube check closed flag in bind_inner
wathenjiang Oct 23, 2023
1c214e0
cast task_id to usize
wathenjiang Oct 23, 2023
e24db6d
use ShardGuard
wathenjiang Oct 23, 2023
3c90918
update comments
wathenjiang Oct 23, 2023
cd5fb20
update comments
wathenjiang Oct 23, 2023
38c9eba
fix: remove needless reference
wathenjiang Oct 23, 2023
7968b51
fix: release lock as far as possible
wathenjiang Oct 23, 2023
5d3da9e
Update tokio/src/runtime/task/list.rs
wathenjiang Nov 6, 2023
1d1a7a3
Update tokio/src/util/sharded_list.rs
wathenjiang Nov 6, 2023
13c4b93
Update tokio/src/util/sharded_list.rs
wathenjiang Nov 6, 2023
ee53e23
Update tokio/src/util/sharded_list.rs
wathenjiang Nov 6, 2023
01afd7b
Update tokio/src/util/sharded_list.rs
wathenjiang Nov 6, 2023
36c2355
Update tokio/src/util/sharded_list.rs
wathenjiang Nov 6, 2023
d6606b8
Update tokio/src/util/sharded_list.rs
wathenjiang Nov 6, 2023
c9f32d2
Update tokio/src/runtime/task/list.rs
wathenjiang Nov 6, 2023
bbefb70
fix: accept ownedship of closue in method for_each of OwnedTasks
wathenjiang Nov 6, 2023
f97748c
Merge branch 'master' into reduce-lock-contention
wathenjiang Nov 6, 2023
8baf79e
Apply suggestions from code review
wathenjiang Nov 7, 2023
87e70c3
Apply suggestions from code review
wathenjiang Nov 7, 2023
af92f20
Apply suggestions from code review
wathenjiang Nov 7, 2023
60104b8
Apply suggestions from code review
wathenjiang Nov 7, 2023
bb4458b
rm unused push method
wathenjiang Nov 7, 2023
1adad70
rename get_sharded_id to get_shard_id
wathenjiang Nov 7, 2023
ef8d2b7
Apply suggestions from code review
wathenjiang Nov 7, 2023
2d4fbf6
rename get_sharded_id to get_shard_id
wathenjiang Nov 7, 2023
777d97e
add sentence in comments
wathenjiang Nov 7, 2023
5406a7e
rm dead_code attr
wathenjiang Nov 7, 2023
c9b05ee
Merge branch 'master' into reduce-lock-contention
wathenjiang Nov 7, 2023
680848e
move spawn_concurrent_level size from shardedList to builder
wathenjiang Nov 7, 2023
6fb70b1
update comments
wathenjiang Nov 7, 2023
0ba87db
update comments
wathenjiang Nov 7, 2023
8bb106f
update comments
wathenjiang Nov 7, 2023
e0fb9e2
rm loop in ShardedList::new
wathenjiang Nov 7, 2023
57133a5
fix rustfmt
wathenjiang Nov 7, 2023
71e8983
fix
wathenjiang Nov 7, 2023
d1ce613
fix spawn_concurrency_level
wathenjiang Nov 7, 2023
e6b8db1
fix spawn_concurrency_level
wathenjiang Nov 7, 2023
592f432
rm get_spawn_concurrency_level to cfg_rt_multi_thread
wathenjiang Nov 7, 2023
f083757
add allow(dead_code))]
wathenjiang Nov 7, 2023
3105af7
rm dead_code attr
wathenjiang Nov 8, 2023
8e189cf
make spawn_concurrency_level unconfigurable
wathenjiang Nov 22, 2023
75d3081
Apply suggestions from code review
wathenjiang Nov 24, 2023
ed27f70
apply suggestions from core review
wathenjiang Nov 24, 2023
db58076
move get_spawn_concurrency_level from builder to task/list.rs
wathenjiang Nov 24, 2023
06656b9
feat: add comments and rm loom cfg
wathenjiang Nov 24, 2023
caa74c9
feat: update comments for gen_shared_list_size
wathenjiang Nov 24, 2023
865de08
feat: update comments for gen_shared_list_size
wathenjiang Nov 24, 2023
7a76ad5
Apply suggestions from code review
wathenjiang Nov 26, 2023
78d1dea
fix: fmt and typo fix
wathenjiang Nov 26, 2023
3844bd3
Update tokio/src/util/sharded_list.rs
Darksonn Dec 7, 2023
038650f
Merge branch 'master' into reduce-lock-contention
Darksonn Dec 7, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ tokio = { version = "1.5.0", path = "../tokio", features = ["full"] }
criterion = "0.5.1"
rand = "0.8"
rand_chacha = "0.3"
num_cpus = "1.16.0"

[dev-dependencies]
tokio-util = { version = "0.7.0", path = "../tokio-util", features = ["full"] }
Expand Down
71 changes: 71 additions & 0 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ pub struct Builder {
/// Only used when not using the current-thread executor.
worker_threads: Option<usize>,

/// Configures the global OwnedTasks's concurrency level
///
/// Only used when not using the current-thread executor.
pub(super) spawn_concurrency_level: Option<usize>,

/// Cap on thread usage.
max_blocking_threads: usize,

Expand Down Expand Up @@ -278,6 +283,9 @@ impl Builder {
// Default to lazy auto-detection (one thread per CPU core)
worker_threads: None,

// Default to lazy auto-detection (4 times the number of worker threads)
spawn_concurrency_level: None,

max_blocking_threads: 512,

// Default thread name
Expand Down Expand Up @@ -401,6 +409,53 @@ impl Builder {
self
}

/// Sets the spawn concurrency level the `Runtime` will use.
///
/// This can be any number greater than 0 and less than or equal to 65536,
/// if the parameter is larger than this value, concurrency level will actually select 65536 internally.
///
/// When the value of this is small compared to the number of concurrent threads, increasing it
/// will help improve the performanc of concurrently spawn tasks. However, when the value is
/// already large enough, further increasing it will not continue to improve performance.
/// Instead, it may result in longer time of the Runtime creation.
///
/// # Default
///
/// The default value for this is 4 times the number of worker threads.
///
/// When using the `current_thread` runtime this method has no effect.
///
/// # Examples
///
/// ## Multi threaded runtime with spawn_concurrency_level 8
///
/// ```
/// use tokio::runtime;
///
/// // This will spawn a work-stealing runtime with 4 worker threads.
/// let rt = runtime::Builder::new_multi_thread()
/// .spawn_concurrency_level(8)
/// .build()
/// .unwrap();
///
/// rt.spawn(async move {});
/// ```
///
/// # Panics
///
/// This will panic if `val` is not larger than `0`.
#[track_caller]
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
pub fn spawn_concurrency_level(&mut self, mut val: usize) -> &mut Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me that we want a configuration option for this. At least, it should be unstable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is as I said above:

The new added method spawn_concurrency_level() in Builder to set the number of segment lock in OwnedTasks struct is to allow us to customize this value. Its default value is 4 times the number of workers thread. An alternative approach is to set it as a constant, like 64, regardless of the number of CPU cores on different platforms. However, considering the usage of Tokio on micro-embedded devices, making the default value dependent on the number of worker threads can effectively minimize the time required for runtime creation on micro-embedded devices.

I thinke make it unstable is a good decision.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have made it unstable, and it seems that unstable features can not be tested in benches crate, so I remove benches/spawn_current.rs temporarily.

The benchmark test of parameter spawn_concurrency_level under different values is in github hidden items now: #6001 (comment)

assert!(val > 0, "spawn concurrency level cannot be set to 0");
if val > 1 << 16 {
val = 1 << 16;
}
self.spawn_concurrency_level = Some(val);
self
}

/// Specifies the limit for additional threads spawned by the Runtime.
///
/// These threads are used for blocking operations like tasks spawned
Expand Down Expand Up @@ -1231,6 +1286,12 @@ cfg_rt_multi_thread! {
use crate::runtime::scheduler::{self, MultiThread};

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);
// Shrink the size of spawn_concurrency_level when using loom. This shouldn't impact
// logic, but allows loom to test more edge cases in a reasoable a mount of time
#[cfg(loom)]
let spawn_concurrency_level = 4;
#[cfg(not(loom))]
let spawn_concurrency_level = self.spawn_concurrency_level.unwrap_or(core_threads * 4);

let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;

Expand All @@ -1249,6 +1310,7 @@ cfg_rt_multi_thread! {
driver_handle,
blocking_spawner,
seed_generator_2,
spawn_concurrency_level,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
Expand Down Expand Up @@ -1280,6 +1342,13 @@ cfg_rt_multi_thread! {

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

// Shrink the size of spawn_concurrency_level when using loom. This shouldn't impact
// logic, but allows loom to test more edge cases in a reasoable a mount of time
#[cfg(loom)]
let spawn_concurrency_level = 4;
#[cfg(not(loom))]
let spawn_concurrency_level = self.spawn_concurrency_level.unwrap_or(core_threads * 4);

let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;

// Create the blocking pool
Expand All @@ -1297,6 +1366,7 @@ cfg_rt_multi_thread! {
driver_handle,
blocking_spawner,
seed_generator_2,
spawn_concurrency_level,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
Expand All @@ -1321,6 +1391,7 @@ impl fmt::Debug for Builder {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Builder")
.field("worker_threads", &self.worker_threads)
.field("spawn_concurrency_level", &self.spawn_concurrency_level)
.field("max_blocking_threads", &self.max_blocking_threads)
.field(
"thread_name",
Expand Down
8 changes: 7 additions & 1 deletion tokio/src/runtime/id.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::fmt;
use std::num::NonZeroU64;
use std::num::{NonZeroU32, NonZeroU64};

/// An opaque ID that uniquely identifies a runtime relative to all other currently
/// running runtimes.
Expand Down Expand Up @@ -39,6 +39,12 @@ impl From<NonZeroU64> for Id {
}
}

impl From<NonZeroU32> for Id {
fn from(value: NonZeroU32) -> Self {
Id(value.into())
}
}

impl fmt::Display for Id {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl CurrentThread {
let handle = Arc::new(Handle {
shared: Shared {
inject: Inject::new(),
owned: OwnedTasks::new(),
owned: OwnedTasks::new(1),
woken: AtomicBool::new(false),
config,
scheduler_metrics: SchedulerMetrics::new(),
Expand Down Expand Up @@ -248,7 +248,7 @@ fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
// Drain the OwnedTasks collection. This call also closes the
// collection, ensuring that no tasks are ever pushed after this
// call returns.
handle.shared.owned.close_and_shutdown_all();
handle.shared.owned.close_and_shutdown_all(0);

// Drain local queue
// We already shut down every task, so we just need to drop the task.
Expand Down Expand Up @@ -614,7 +614,7 @@ impl Schedule for Arc<Handle> {
// If `None`, the runtime is shutting down, so there is no need to signal shutdown
if let Some(core) = core.as_mut() {
core.unhandled_panic = true;
self.shared.owned.close_and_shutdown_all();
self.shared.owned.close_and_shutdown_all(0);
}
}
_ => unreachable!("runtime core not set in CURRENT thread-local"),
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl MultiThread {
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
spawn_concurrency_level: usize,
config: Config,
) -> (MultiThread, Arc<Handle>, Launch) {
let parker = Parker::new(driver);
Expand All @@ -69,6 +70,7 @@ impl MultiThread {
driver_handle,
blocking_spawner,
seed_generator,
spawn_concurrency_level,
config,
);

Expand Down
14 changes: 11 additions & 3 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ pub(super) fn create(
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
spawn_concurrency_level: usize,
config: Config,
) -> (Arc<Handle>, Launch) {
let mut cores = Vec::with_capacity(size);
Expand Down Expand Up @@ -287,7 +288,7 @@ pub(super) fn create(
remotes: remotes.into_boxed_slice(),
inject,
idle,
owned: OwnedTasks::new(),
owned: OwnedTasks::new(spawn_concurrency_level as u32),
synced: Mutex::new(Synced {
idle: idle_synced,
inject: inject_synced,
Expand Down Expand Up @@ -547,7 +548,6 @@ impl Context {
}

core.pre_shutdown(&self.worker);

// Signal shutdown
self.worker.handle.shutdown_core(core);
Err(())
Expand Down Expand Up @@ -954,8 +954,16 @@ impl Core {
/// Signals all tasks to shut down, and waits for them to complete. Must run
/// before we enter the single-threaded phase of shutdown processing.
fn pre_shutdown(&mut self, worker: &Worker) {
// Start from a random inner list
let start = self
.rand
.fastrand_n(worker.handle.shared.owned.get_shard_size() as u32);
// Signal to all tasks to shut down.
worker.handle.shared.owned.close_and_shutdown_all();
worker
.handle
.shared
.owned
.close_and_shutdown_all(start as usize);

self.stats
.submit(&worker.handle.shared.worker_metrics[worker.index]);
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread_alt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl MultiThread {
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
spawn_concurrency_level: usize,
config: Config,
) -> (MultiThread, runtime::Handle) {
let handle = worker::create(
Expand All @@ -57,6 +58,7 @@ impl MultiThread {
driver_handle,
blocking_spawner,
seed_generator,
spawn_concurrency_level,
config,
);

Expand Down
7 changes: 5 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread_alt/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ pub(super) fn create(
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
spawn_concurrency_level: usize,
config: Config,
) -> runtime::Handle {
let mut num_workers = num_cores;
Expand Down Expand Up @@ -307,7 +308,7 @@ pub(super) fn create(
remotes: remotes.into_boxed_slice(),
inject,
idle,
owned: OwnedTasks::new(),
owned: OwnedTasks::new(spawn_concurrency_level as u32),
synced: Mutex::new(Synced {
assigned_cores: (0..num_workers).map(|_| None).collect(),
shutdown_cores: Vec::with_capacity(num_cores),
Expand Down Expand Up @@ -1460,7 +1461,9 @@ impl Shared {
}

pub(super) fn shutdown_core(&self, handle: &Handle, mut core: Box<Core>) {
self.owned.close_and_shutdown_all();
// Start from a random inner list
let start = core.rand.fastrand_n(self.owned.get_shard_size() as u32);
self.owned.close_and_shutdown_all(start as usize);

core.stats.submit(&self.worker_metrics[core.index]);

Expand Down
19 changes: 15 additions & 4 deletions tokio/src/runtime/task/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::fmt;
#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
pub struct Id(u64);
pub struct Id(pub(crate) u64);

/// Returns the [`Id`] of the currently running task.
///
Expand Down Expand Up @@ -74,11 +74,22 @@ impl fmt::Display for Id {

impl Id {
pub(crate) fn next() -> Self {
use crate::loom::sync::atomic::{Ordering::Relaxed, StaticAtomicU64};
use crate::loom::sync::atomic::Ordering::Relaxed;
use crate::loom::sync::atomic::StaticAtomicU64;

static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
#[cfg(all(test, loom))]
{
crate::loom::lazy_static! {
static ref NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
}
Self(NEXT_ID.fetch_add(1, Relaxed))
}

Self(NEXT_ID.fetch_add(1, Relaxed))
#[cfg(not(all(test, loom)))]
{
static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
Self(NEXT_ID.fetch_add(1, Relaxed))
}
}

pub(crate) fn as_u64(&self) -> u64 {
Expand Down
Loading
Loading