Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task: fix infinite loop when dropping a LocalSet #1892

Merged
merged 10 commits into from
Dec 4, 2019
180 changes: 60 additions & 120 deletions tokio/src/runtime/basic_scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use crate::park::{Park, Unpark};
use crate::task::{self, JoinHandle, Schedule, ScheduleSendOnly, Task};
use crate::task::{self, queue::MpscQueues, JoinHandle, Schedule, ScheduleSendOnly, Task};

use std::cell::{Cell, UnsafeCell};
use std::collections::VecDeque;
use std::cell::Cell;
use std::fmt;
use std::future::Future;
use std::mem::ManuallyDrop;
use std::ptr;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::task::{RawWaker, RawWakerVTable, Waker};
use std::time::Duration;

Expand All @@ -31,31 +30,7 @@ pub(crate) struct Spawner {

/// The scheduler component.
pub(super) struct SchedulerPriv {
/// List of all active tasks spawned onto this executor.
///
/// # Safety
///
/// Must only be accessed from the primary thread
owned_tasks: UnsafeCell<task::OwnedList<Self>>,

/// Local run queue.
///
/// Tasks notified from the current thread are pushed into this queue.
///
/// # Safety
///
/// References should not be handed out. Only call `push` / `pop` functions.
/// Only call from the owning thread.
local_queue: UnsafeCell<VecDeque<Task<SchedulerPriv>>>,

/// Remote run queue.
///
/// Tasks notified from another thread are pushed into this queue.
remote_queue: Mutex<RemoteQueue>,

/// Tasks pending drop
pending_drop: task::TransferStack<Self>,

queues: MpscQueues<Self>,
/// Unpark the blocked thread
unpark: Box<dyn Unpark>,
}
Expand All @@ -73,21 +48,9 @@ struct LocalState<P> {
park: P,
}

#[derive(Debug)]
struct RemoteQueue {
/// FIFO list of tasks
queue: VecDeque<Task<SchedulerPriv>>,

/// `true` when a task can be pushed into the queue, false otherwise.
open: bool,
}

/// Max number of tasks to poll per tick.
const MAX_TASKS_PER_TICK: usize = 61;

/// How often to check the remote queue first
const CHECK_REMOTE_INTERVAL: u8 = 13;

thread_local! {
static ACTIVE: Cell<*const SchedulerPriv> = Cell::new(ptr::null())
}
Expand All @@ -101,13 +64,7 @@ where

BasicScheduler {
scheduler: Arc::new(SchedulerPriv {
owned_tasks: UnsafeCell::new(task::OwnedList::new()),
local_queue: UnsafeCell::new(VecDeque::with_capacity(64)),
remote_queue: Mutex::new(RemoteQueue {
queue: VecDeque::with_capacity(64),
open: true,
}),
pending_drop: task::TransferStack::new(),
queues: MpscQueues::new(),
unpark: Box::new(unpark),
}),
local: LocalState { tick: 0, park },
Expand Down Expand Up @@ -155,9 +112,7 @@ where

// Track the current scheduler
let _guard = ACTIVE.with(|cell| {
let guard = Guard {
old: cell.get(),
};
let guard = Guard { old: cell.get() };

cell.set(scheduler as *const SchedulerPriv);

Expand Down Expand Up @@ -188,7 +143,11 @@ where
scheduler.tick(local);

// Maintenance work
scheduler.drain_pending_drop();
unsafe {
hawkw marked this conversation as resolved.
Show resolved Hide resolved
// safety: this function is safe to call only from the
// thread the basic scheduler is running on (which we are).
scheduler.queues.drain_pending_drop();
}
}
})
}
Expand Down Expand Up @@ -216,6 +175,8 @@ impl Spawner {
}
}

// === impl SchedulerPriv ===

impl SchedulerPriv {
fn tick(&self, local: &mut LocalState<impl Park>) {
for _ in 0..MAX_TASKS_PER_TICK {
Expand All @@ -224,8 +185,14 @@ impl SchedulerPriv {

// Increment the tick
local.tick = tick.wrapping_add(1);
let next = unsafe {
// safety: this function is safe to call only from the
// thread the basic scheduler is running on. The `LocalState`
// parameter to this method implies that we are on that thread.
self.queues.next_task(tick)
};

let task = match self.next_task(tick) {
let task = match next {
Some(task) => task,
None => {
local.park.park().ok().expect("failed to park");
Expand All @@ -235,7 +202,10 @@ impl SchedulerPriv {

if let Some(task) = task.run(&mut || Some(self.into())) {
unsafe {
self.schedule_local(task);
// safety: this function is safe to call only from the
// thread the basic scheduler is running on. The `LocalState`
// parameter to this method implies that we are on that thread.
self.queues.push_local(task);
}
}
}
Expand All @@ -247,15 +217,6 @@ impl SchedulerPriv {
.expect("failed to park");
}

fn drain_pending_drop(&self) {
for task in self.pending_drop.drain() {
unsafe {
(*self.owned_tasks.get()).remove(&task);
}
drop(task);
}
}

/// # Safety
///
/// Must be called from the same thread that holds the `BasicScheduler`
Expand All @@ -266,63 +227,51 @@ impl SchedulerPriv {
F::Output: Send + 'static,
{
let (task, handle) = task::joinable(future);
self.schedule_local(task);
self.queues.push_local(task);
handle
}

unsafe fn schedule_local(&self, task: Task<Self>) {
(*self.local_queue.get()).push_back(task);
}

fn next_task(&self, tick: u8) -> Option<Task<Self>> {
if 0 == tick % CHECK_REMOTE_INTERVAL {
self.next_remote_task().or_else(|| self.next_local_task())
} else {
self.next_local_task().or_else(|| self.next_remote_task())
}
}

fn next_local_task(&self) -> Option<Task<Self>> {
unsafe { (*self.local_queue.get()).pop_front() }
}

fn next_remote_task(&self) -> Option<Task<Self>> {
self.remote_queue.lock().unwrap().queue.pop_front()
}
}

impl Schedule for SchedulerPriv {
fn bind(&self, task: &Task<Self>) {
unsafe {
(*self.owned_tasks.get()).insert(task);
// safety: `Queues::add_task` is only safe to call from the thread
// that owns the queues (the thread the scheduler is running on).
// `Scheduler::bind` is called when polling a task that
// doesn't have a scheduler set. We will only poll new tasks from
// the thread that the scheduler is running on. Therefore, this is
// safe to call.
self.queues.add_task(task);
hawkw marked this conversation as resolved.
Show resolved Hide resolved
}
}

fn release(&self, task: Task<Self>) {
self.pending_drop.push(task);
self.queues.release_remote(task);
}

fn release_local(&self, task: &Task<Self>) {
unsafe {
(*self.owned_tasks.get()).remove(task);
// safety: `Scheduler::release_local` is only called from the
// thread that the scheduler is running on. The `Schedule` trait's
// contract is that releasing a task from another thread should call
// `release` rather than `release_local`.
self.queues.release_local(task);
}
}

fn schedule(&self, task: Task<Self>) {
let is_current = ACTIVE.with(|cell| {
cell.get() == self as *const SchedulerPriv
});
let is_current = ACTIVE.with(|cell| cell.get() == self as *const SchedulerPriv);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ACTIVE thing was hacked in to fix a bug. Nothing to do now, but we probably should try cleaning it up sometime...


if is_current {
unsafe { self.schedule_local(task) };
unsafe {
// safety: this function is safe to call only from the
// thread the basic scheduler is running on. If `is_current` is
// then we are on that thread.
self.queues.push_local(task)
};
} else {
let mut lock = self.remote_queue.lock().unwrap();

if lock.open {
lock.queue.push_back(task);
} else {
task.shutdown();
}
let mut lock = self.queues.remote();
lock.schedule(task);

// while locked, call unpark
self.unpark.unpark();
Expand All @@ -339,39 +288,30 @@ where
P: Park,
{
fn drop(&mut self) {
// Close the remote queue
let mut lock = self.scheduler.remote_queue.lock().unwrap();
lock.open = false;

while let Some(task) = lock.queue.pop_front() {
task.shutdown();
}

drop(lock);

// Drain all local tasks
while let Some(task) = self.scheduler.next_local_task() {
task.shutdown();
}

// Release owned tasks
unsafe {
(*self.scheduler.owned_tasks.get()).shutdown();
}
// safety: the `Drop` impl owns the scheduler's queues. these fields
// will only be accessed when running the scheduler, and it can no
// longer be run, since we are in the process of dropping it.

self.scheduler.drain_pending_drop();
// Shut down the task queues.
self.scheduler.queues.shutdown();
}

// Wait until all tasks have been released.
while unsafe { !(*self.scheduler.owned_tasks.get()).is_empty() } {
while unsafe { self.scheduler.queues.has_tasks_remaining() } {
self.local.park.park().ok().expect("park failed");
self.scheduler.drain_pending_drop();
unsafe {
self.scheduler.queues.drain_pending_drop();
}
}
}
}

impl fmt::Debug for SchedulerPriv {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Scheduler").finish()
fmt.debug_struct("Scheduler")
.field("queues", &self.queues)
.finish()
}
}

Expand Down
Loading