Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: initial implementation of new threaded runtime #5823

Merged
merged 33 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
2f4fc13
rt: initial implementation of new threaded runtime
carllerche Jun 26, 2023
cf1bac3
loom PR was merged
carllerche Jun 26, 2023
f6a5668
fmt
carllerche Jun 26, 2023
911a29b
Merge remote-tracking branch 'origin/master' into rt-multi-thread-alt
carllerche Jun 27, 2023
3dc1886
fixes for CI
carllerche Jun 27, 2023
8baf117
update loom
carllerche Jun 27, 2023
74fc03d
fix test
carllerche Jun 27, 2023
91b7e80
restructure loom tests
carllerche Jun 27, 2023
b1a3055
Merge remote-tracking branch 'origin/master' into rt-multi-thread-alt
carllerche Jun 27, 2023
cbe55c7
Merge remote-tracking branch 'origin/master' into rt-multi-thread-alt
carllerche Jun 29, 2023
7524f02
fix leak
carllerche Jun 30, 2023
90910c0
Merge remote-tracking branch 'origin/master' into rt-multi-thread-alt
carllerche Jun 30, 2023
0abf746
try to make clippy happy
carllerche Jun 30, 2023
3c7973e
see if this passes
carllerche Jul 2, 2023
3e3aa98
try with more stack
carllerche Jul 2, 2023
15dac69
remove release
carllerche Jul 2, 2023
55e7da4
actually remove release
carllerche Jul 2, 2023
5bf1f0d
try no capture
carllerche Jul 2, 2023
6de9bf6
release
carllerche Jul 3, 2023
a00d890
Merge remote-tracking branch 'origin/master' into rt-multi-thread-alt
carllerche Jul 17, 2023
9f93079
try to make CI happy... again
carllerche Jul 18, 2023
1b4fb39
try again
carllerche Jul 18, 2023
9b62ba3
Merge remote-tracking branch 'origin/master' into rt-multi-thread-alt
carllerche Jul 18, 2023
f37e054
fix build
carllerche Jul 18, 2023
7dff6be
increase global queue interval for loom tests
carllerche Jul 20, 2023
4888908
Merge remote-tracking branch 'origin/master' into rt-multi-thread-alt
carllerche Jul 20, 2023
80f037d
fix build
carllerche Jul 20, 2023
43bc3ae
try removing idle map
carllerche Jul 20, 2023
2542796
try fixing the build
carllerche Jul 20, 2023
51b342c
again
carllerche Jul 20, 2023
43331f1
fmt
carllerche Jul 20, 2023
fad2ce2
try again
carllerche Jul 21, 2023
ca7d282
use released loom
carllerche Jul 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 25 additions & 5 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,28 @@

R-loom:
R-loom-sync:
- tokio/src/sync/*
- tokio/src/sync/**/*
- tokio-util/src/sync/*
- tokio-util/src/sync/**/*
- tokio/src/runtime/*
- tokio/src/runtime/**/*

R-loom-time-driver:
- tokio/src/runtime/time/*
- tokio/src/runtime/time/**/*

R-loom-current-thread:
- tokio/src/runtime/scheduler/*
- tokio/src/runtime/scheduler/current_thread/*
- tokio/src/runtime/task/*
- tokio/src/runtime/task/**

R-loom-multi-thread:
- tokio/src/runtime/scheduler/*
- tokio/src/runtime/scheduler/multi_thread/*
- tokio/src/runtime/scheduler/multi_thread/**
- tokio/src/runtime/task/*
- tokio/src/runtime/task/**

R-loom-multi-thread-alt:
- tokio/src/runtime/scheduler/*
- tokio/src/runtime/scheduler/multi_thread_alt/*
- tokio/src/runtime/scheduler/multi_thread_alt/**
- tokio/src/runtime/task/*
- tokio/src/runtime/task/**
106 changes: 86 additions & 20 deletions .github/workflows/loom.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ on:
name: Loom

env:
RUSTFLAGS: -Dwarnings
RUSTFLAGS: -Dwarnings --cfg loom --cfg tokio_unstable -C debug_assertions
LOOM_MAX_PREEMPTIONS: 2
LOOM_MAX_BRANCHES: 10000
RUST_BACKTRACE: 1
# Change to specific Rust release to pin
rust_stable: stable
Expand All @@ -17,26 +19,91 @@ permissions:
contents: read

jobs:
loom:
name: loom
loom-sync:
name: loom tokio::sync
# base_ref is null when it's not a pull request
if: github.repository_owner == 'tokio-rs' && (contains(github.event.pull_request.labels.*.name, 'R-loom') || (github.base_ref == null))
if: github.repository_owner == 'tokio-rs' && (contains(github.event.pull_request.labels.*.name, 'R-loom-sync') || (github.base_ref == null))
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Rust ${{ env.rust_stable }}
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.rust_stable }}
- uses: Swatinem/rust-cache@v2
- name: run tests
run: cargo test --lib --release --features full -- --nocapture sync::tests
working-directory: tokio

loom-time-driver:
name: loom time driver
# base_ref is null when it's not a pull request
if: github.repository_owner == 'tokio-rs' && (contains(github.event.pull_request.labels.*.name, 'R-loom-time-driver') || (github.base_ref == null))
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Rust ${{ env.rust_stable }}
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.rust_stable }}
- uses: Swatinem/rust-cache@v2
- name: run tests
run: cargo test --lib --release --features full -- --nocapture runtime::time::tests
working-directory: tokio

loom-current-thread:
name: loom current-thread scheduler
# base_ref is null when it's not a pull request
if: github.repository_owner == 'tokio-rs' && (contains(github.event.pull_request.labels.*.name, 'R-loom-current-thread') || (github.base_ref == null))
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Rust ${{ env.rust_stable }}
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.rust_stable }}
- uses: Swatinem/rust-cache@v2
- name: run tests
run: cargo test --lib --release --features full -- --nocapture loom_current_thread
working-directory: tokio

loom-multi-thread:
name: loom multi-thread scheduler
# base_ref is null when it's not a pull request
if: github.repository_owner == 'tokio-rs' && (contains(github.event.pull_request.labels.*.name, 'R-loom-multi-thread') || (github.base_ref == null))
runs-on: ubuntu-latest
strategy:
matrix:
include:
- scope: loom_multi_thread::group_a
- scope: loom_multi_thread::group_b
- scope: loom_multi_thread::group_c
- scope: loom_multi_thread::group_d
steps:
- uses: actions/checkout@v3
- name: Install Rust ${{ env.rust_stable }}
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.rust_stable }}
- uses: Swatinem/rust-cache@v2
- name: loom ${{ matrix.scope }}
run: cargo test --lib --release --features full -- $SCOPE
working-directory: tokio
env:
SCOPE: ${{ matrix.scope }}

loom-multi-thread-alt:
name: loom ALT multi-thread scheduler
# base_ref is null when it's not a pull request
if: github.repository_owner == 'tokio-rs' && (contains(github.event.pull_request.labels.*.name, 'R-loom-multi-thread-alt') || (github.base_ref == null))
runs-on: ubuntu-latest
strategy:
matrix:
include:
- scope: --skip loom_pool
max_preemptions: 2
- scope: loom_pool::group_a
max_preemptions: 2
- scope: loom_pool::group_b
max_preemptions: 2
- scope: loom_pool::group_c
max_preemptions: 2
- scope: loom_pool::group_d
max_preemptions: 2
- scope: time::driver
max_preemptions: 2
- scope: loom_multi_thread_alt::group_a
- scope: loom_multi_thread_alt::group_b
- scope: loom_multi_thread_alt::group_c
- scope: loom_multi_thread_alt::group_d
steps:
- uses: actions/checkout@v3
- name: Install Rust ${{ env.rust_stable }}
Expand All @@ -45,10 +112,9 @@ jobs:
toolchain: ${{ env.rust_stable }}
- uses: Swatinem/rust-cache@v2
- name: loom ${{ matrix.scope }}
run: cargo test --lib --release --features full -- --nocapture $SCOPE
run: cargo test --lib --release --features full -- $SCOPE
working-directory: tokio
env:
RUSTFLAGS: --cfg loom --cfg tokio_unstable -Dwarnings -C debug-assertions
LOOM_MAX_PREEMPTIONS: ${{ matrix.max_preemptions }}
LOOM_MAX_BRANCHES: 10000
SCOPE: ${{ matrix.scope }}
# TODO: remove this before stabilizing
LOOM_MAX_PREEMPTIONS: 1
2 changes: 1 addition & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ wasm-bindgen-test = "0.3.0"
mio-aio = { version = "0.7.0", features = ["tokio"] }

[target.'cfg(loom)'.dev-dependencies]
loom = { version = "0.5.2", features = ["futures", "checkpoint"] }
loom = { version = "0.6", features = ["futures", "checkpoint"] }

[package.metadata.docs.rs]
all-features = true
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/loom/std/unsafe_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ impl<T> UnsafeCell<T> {
UnsafeCell(std::cell::UnsafeCell::new(data))
}

#[inline(always)]
pub(crate) fn with<R>(&self, f: impl FnOnce(*const T) -> R) -> R {
f(self.0.get())
}

#[inline(always)]
pub(crate) fn with_mut<R>(&self, f: impl FnOnce(*mut T) -> R) -> R {
f(self.0.get())
}
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/blocking/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ impl BlockingSchedule {
}
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThread(_) => {}
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThreadAlt(_) => {}
}
}
BlockingSchedule {
Expand All @@ -45,6 +47,8 @@ impl task::Schedule for BlockingSchedule {
}
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThread(_) => {}
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThreadAlt(_) => {}
}
}
None
Expand Down
68 changes: 68 additions & 0 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ pub(crate) enum Kind {
CurrentThread,
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
MultiThread,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
MultiThreadAlt,
}

impl Builder {
Expand Down Expand Up @@ -230,6 +232,26 @@ impl Builder {
// The number `61` is fairly arbitrary. I believe this value was copied from golang.
Builder::new(Kind::MultiThread, 61)
}

cfg_unstable! {
/// Returns a new builder with the alternate multi thread scheduler
/// selected.
///
/// The alternate multi threaded scheduler is an in-progress
/// candidate to replace the existing multi threaded scheduler. It
/// currently does not scale as well to 16+ processors.
///
/// This runtime flavor is currently **not considered production
/// ready**.
///
/// Configuration methods can be chained on the return value.
#[cfg(feature = "rt-multi-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
pub fn new_multi_thread_alt() -> Builder {
// The number `61` is fairly arbitrary. I believe this value was copied from golang.
Builder::new(Kind::MultiThreadAlt, 61)
}
}
}

/// Returns a new runtime builder initialized with default configuration
Expand Down Expand Up @@ -656,6 +678,8 @@ impl Builder {
Kind::CurrentThread => self.build_current_thread_runtime(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::MultiThread => self.build_threaded_runtime(),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::MultiThreadAlt => self.build_alt_threaded_runtime(),
}
}

Expand All @@ -665,6 +689,8 @@ impl Builder {
Kind::CurrentThread => true,
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::MultiThread => false,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::MultiThreadAlt => false,
},
enable_io: self.enable_io,
enable_time: self.enable_time,
Expand Down Expand Up @@ -1214,6 +1240,48 @@ cfg_rt_multi_thread! {

Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
}

cfg_unstable! {
fn build_alt_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{Config, runtime::Scheduler};
use crate::runtime::scheduler::MultiThreadAlt;

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;

// Create the blocking pool
let blocking_pool =
blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();

// Generate a rng seed for this runtime.
let seed_generator_1 = self.seed_generator.next_generator();
let seed_generator_2 = self.seed_generator.next_generator();

let (scheduler, handle) = MultiThreadAlt::new(
core_threads,
driver,
driver_handle,
blocking_spawner,
seed_generator_2,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
seed_generator: seed_generator_1,
metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
},
);

Ok(Runtime::from_parts(Scheduler::MultiThreadAlt(scheduler), handle, blocking_pool))
}
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ impl Handle {
scheduler::Handle::CurrentThread(_) => RuntimeFlavor::CurrentThread,
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThread(_) => RuntimeFlavor::MultiThread,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThreadAlt(_) => RuntimeFlavor::MultiThreadAlt,
}
}

Expand Down Expand Up @@ -385,6 +387,8 @@ impl Handle {
scheduler::Handle::CurrentThread(handle) => handle.owned_id(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThread(handle) => handle.owned_id(),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThreadAlt(handle) => handle.owned_id(),
};
owned_id.into()
}
Expand Down Expand Up @@ -535,6 +539,8 @@ cfg_taskdump! {
handle.dump().await
}).await
},
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThreadAlt(_) => panic!("task dump not implemented for this runtime flavor"),
}
}
}
Expand Down
19 changes: 19 additions & 0 deletions tokio/src/runtime/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ use std::time::Duration;
cfg_rt_multi_thread! {
use crate::runtime::Builder;
use crate::runtime::scheduler::MultiThread;

cfg_unstable! {
use crate::runtime::scheduler::MultiThreadAlt;
}
}

/// The Tokio runtime.
Expand Down Expand Up @@ -109,6 +113,9 @@ pub enum RuntimeFlavor {
CurrentThread,
/// The flavor that executes tasks across multiple threads.
MultiThread,
/// The flavor that executes tasks across multiple threads.
#[cfg(tokio_unstable)]
MultiThreadAlt,
}

/// The runtime scheduler is either a multi-thread or a current-thread executor.
Expand All @@ -120,6 +127,10 @@ pub(super) enum Scheduler {
/// Execute tasks across multiple threads.
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
MultiThread(MultiThread),

/// Execute tasks across multiple threads.
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
MultiThreadAlt(MultiThreadAlt),
}

impl Runtime {
Expand Down Expand Up @@ -336,6 +347,8 @@ impl Runtime {
Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
Scheduler::MultiThreadAlt(exec) => exec.block_on(&self.handle.inner, future),
}
}

Expand Down Expand Up @@ -456,6 +469,12 @@ impl Drop for Runtime {
// already in the runtime's context.
multi_thread.shutdown(&self.handle.inner);
}
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
Scheduler::MultiThreadAlt(multi_thread) => {
// The threaded scheduler drops its tasks on its worker threads, which is
// already in the runtime's context.
multi_thread.shutdown(&self.handle.inner);
}
}
}
}
Expand Down