diff --git a/src/header.rs b/src/header.rs index ee84035..cf74471 100644 --- a/src/header.rs +++ b/src/header.rs @@ -1,6 +1,6 @@ use core::cell::UnsafeCell; use core::fmt; -use core::task::Waker; +use core::task::{RawWaker, Waker}; #[cfg(not(feature = "portable-atomic"))] use core::sync::atomic::AtomicUsize; @@ -10,12 +10,20 @@ use portable_atomic::AtomicUsize; use crate::raw::TaskVTable; use crate::state::*; +use crate::utils::abort; use crate::utils::abort_on_panic; -/// The header of a task. -/// -/// This header is stored in memory at the beginning of the heap-allocated task. -pub(crate) struct Header { +/// Actions to take upon calling [`Header::drop_waker`]. +pub(crate) enum DropWakerAction { + /// Re-schedule the task + Schedule, + /// Destroy the task. + Destroy, + /// Do nothing. + None, +} + +pub(crate) struct Header { /// Current state of the task. /// /// Contains flags representing the current state and the reference count. @@ -32,17 +40,12 @@ pub(crate) struct Header { /// methods necessary for bookkeeping the heap-allocated task. pub(crate) vtable: &'static TaskVTable, - /// Metadata associated with the task. - /// - /// This metadata may be provided to the user. - pub(crate) metadata: M, - /// Whether or not a panic that occurs in the task should be propagated. #[cfg(feature = "std")] pub(crate) propagate_panic: bool, } -impl Header { +impl Header { /// Notifies the awaiter blocked on this task. /// /// If the awaiter is the same as the current waker, it will not be notified. @@ -157,11 +160,69 @@ impl Header { abort_on_panic(|| w.wake()); } } + + /// Clones a waker. + pub(crate) unsafe fn clone_waker(ptr: *const ()) -> RawWaker { + let header = ptr as *const Header; + + // Increment the reference count. With any kind of reference-counted data structure, + // relaxed ordering is appropriate when incrementing the counter. + let state = (*header).state.fetch_add(REFERENCE, Ordering::Relaxed); + + // If the reference count overflowed, abort. + if state > isize::MAX as usize { + abort(); + } + + RawWaker::new(ptr, (*header).vtable.raw_waker_vtable) + } + + #[inline(never)] + pub(crate) unsafe fn drop_waker(ptr: *const ()) -> DropWakerAction { + let header = ptr as *const Header; + + // Decrement the reference count. + let new = (*header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; + + // If this was the last reference to the task and the `Task` has been dropped too, + // then we need to decide how to destroy the task. + if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { + if new & (COMPLETED | CLOSED) == 0 { + // If the task was not completed nor closed, close it and schedule one more time so + // that its future gets dropped by the executor. + (*header) + .state + .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release); + DropWakerAction::Schedule + } else { + // Otherwise, destroy the task right away. + DropWakerAction::Destroy + } + } else { + DropWakerAction::None + } + } +} + +// SAFETY: repr(C) is explicitly used here so that casts between `Header` and `HeaderWithMetadata` +// can be done safely without additional offsets. +// +/// The header of a task. +/// +/// This header is stored in memory at the beginning of the heap-allocated task. +#[repr(C)] +pub(crate) struct HeaderWithMetadata { + pub(crate) header: Header, + + /// Metadata associated with the task. + /// + /// This metadata may be provided to the user. + pub(crate) metadata: M, } -impl fmt::Debug for Header { +impl fmt::Debug for HeaderWithMetadata { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let state = self.state.load(Ordering::SeqCst); + let state = self.header.state.load(Ordering::SeqCst); f.debug_struct("Header") .field("scheduled", &(state & SCHEDULED != 0)) diff --git a/src/lib.rs b/src/lib.rs index 58b2211..2f97ae1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,7 +73,7 @@ #![no_std] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] #![doc(test(attr(deny(rust_2018_idioms, warnings))))] -#![doc(test(attr(allow(unused_extern_crates, unused_variables))))] +#![doc(test(attr(allow(unused_variables))))] #![doc( html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" )] diff --git a/src/raw.rs b/src/raw.rs index 366f0fd..2994332 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -1,5 +1,6 @@ use alloc::alloc::Layout as StdLayout; use core::future::Future; +use core::marker::PhantomData; use core::mem::{self, ManuallyDrop}; use core::pin::Pin; use core::ptr::NonNull; @@ -7,7 +8,7 @@ use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; use core::sync::atomic::Ordering; -use crate::header::Header; +use crate::header::{DropWakerAction, Header, HeaderWithMetadata}; use crate::runnable::{Schedule, ScheduleInfo}; use crate::state::*; use crate::utils::{abort, abort_on_panic, max, Layout}; @@ -21,17 +22,13 @@ pub(crate) type Panic = core::convert::Infallible; /// The vtable for a task. pub(crate) struct TaskVTable { + pub(crate) raw_waker_vtable: &'static RawWakerVTable, + /// Schedules the task. pub(crate) schedule: unsafe fn(*const (), ScheduleInfo), /// Drops the future inside the task. - pub(crate) drop_future: unsafe fn(*const ()), - - /// Returns a pointer to the output stored after completion. - pub(crate) get_output: unsafe fn(*const ()) -> *const (), - - /// Drops the task reference (`Runnable` or `Waker`). - pub(crate) drop_ref: unsafe fn(ptr: *const ()), + pub(crate) drop_future: unsafe fn(*const (), &TaskLayout), /// Destroys the task. pub(crate) destroy: unsafe fn(*const ()), @@ -39,16 +36,19 @@ pub(crate) struct TaskVTable { /// Runs the task. pub(crate) run: unsafe fn(*const ()) -> bool, - /// Creates a new waker associated with the task. - pub(crate) clone_waker: unsafe fn(ptr: *const ()) -> RawWaker, - /// The memory layout of the task. This information enables /// debuggers to decode raw task memory blobs. Do not remove /// the field, even if it appears to be unused. - #[allow(unused)] pub(crate) layout_info: &'static TaskLayout, } +impl TaskVTable { + /// Returns a pointer to the output inside a task. + pub(crate) unsafe fn get_output(&self, ptr: *const ()) -> *const () { + ptr.add_byte(self.layout_info.offset_r) + } +} + /// Memory layout of a task. /// /// This struct contains the following information: @@ -73,7 +73,7 @@ pub(crate) struct TaskLayout { /// Raw pointers to the fields inside a task. pub(crate) struct RawTask { /// The task header. - pub(crate) header: *const Header, + pub(crate) header: *const HeaderWithMetadata, /// The schedule function. pub(crate) schedule: *const S, @@ -100,7 +100,7 @@ impl RawTask { #[inline] const fn eval_task_layout() -> TaskLayout { // Compute the layouts for `Header`, `S`, `F`, and `T`. - let layout_header = Layout::new::>(); + let layout_header = Layout::new::>(); let layout_s = Layout::new::(); let layout_f = Layout::new::(); let layout_r = Layout::new::>(); @@ -148,16 +148,18 @@ macro_rules! allocate_task { } = $builder; // Write the header as the first field of the task. - ($raw.header as *mut Header<$m>).write(Header { - #[cfg(not(feature = "portable-atomic"))] - state: core::sync::atomic::AtomicUsize::new(SCHEDULED | TASK | REFERENCE), - #[cfg(feature = "portable-atomic")] - state: portable_atomic::AtomicUsize::new(SCHEDULED | TASK | REFERENCE), - awaiter: core::cell::UnsafeCell::new(None), - vtable: &RawTask::<$f, <$f as Future>::Output, $s, $m>::TASK_VTABLE, + ($raw.header as *mut HeaderWithMetadata<$m>).write(HeaderWithMetadata { + header: Header { + #[cfg(not(feature = "portable-atomic"))] + state: core::sync::atomic::AtomicUsize::new(SCHEDULED | TASK | REFERENCE), + #[cfg(feature = "portable-atomic")] + state: portable_atomic::AtomicUsize::new(SCHEDULED | TASK | REFERENCE), + awaiter: core::cell::UnsafeCell::new(None), + vtable: &RawTask::<$f, <$f as Future>::Output, $s, $m>::TASK_VTABLE, + #[cfg(feature = "std")] + propagate_panic, + }, metadata, - #[cfg(feature = "std")] - propagate_panic, }); // Write the schedule function as the third field of the task. @@ -185,324 +187,66 @@ where S: Schedule, { pub(crate) const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( - Self::clone_waker, - Self::wake, - Self::wake_by_ref, - Self::drop_waker, + Header::clone_waker, + wake::, + wake_by_ref::, + drop_waker, ); pub(crate) const TASK_VTABLE: TaskVTable = TaskVTable { - schedule: Self::schedule, - drop_future: Self::drop_future, - get_output: Self::get_output, - drop_ref: Self::drop_ref, - destroy: Self::destroy, + raw_waker_vtable: &Self::RAW_WAKER_VTABLE, + schedule: schedule::, + drop_future: drop_future::, + destroy: destroy::, run: Self::run, - clone_waker: Self::clone_waker, layout_info: &Self::TASK_LAYOUT, }; /// Creates a `RawTask` from a raw task pointer. #[inline] pub(crate) fn from_ptr(ptr: *const ()) -> Self { - let task_layout = Self::task_layout(); - let p = ptr as *const u8; - unsafe { Self { - header: p as *const Header, - schedule: p.add(task_layout.offset_s) as *const S, - future: p.add(task_layout.offset_f) as *mut F, - output: p.add(task_layout.offset_r) as *mut Result, - } - } - } - - /// Returns the layout of the task. - #[inline] - fn task_layout() -> TaskLayout { - Self::TASK_LAYOUT - } - /// Wakes a waker. - unsafe fn wake(ptr: *const ()) { - // This is just an optimization. If the schedule function has captured variables, then - // we'll do less reference counting if we wake the waker by reference and then drop it. - if mem::size_of::() > 0 { - Self::wake_by_ref(ptr); - Self::drop_waker(ptr); - return; - } - - let raw = Self::from_ptr(ptr); - - let mut state = (*raw.header).state.load(Ordering::Acquire); - - loop { - // If the task is completed or closed, it can't be woken up. - if state & (COMPLETED | CLOSED) != 0 { - // Drop the waker. - Self::drop_waker(ptr); - break; - } - - // If the task is already scheduled, we just need to synchronize with the thread that - // will run the task by "publishing" our current view of the memory. - if state & SCHEDULED != 0 { - // Update the state without actually modifying it. - match (*raw.header).state.compare_exchange_weak( - state, - state, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // Drop the waker. - Self::drop_waker(ptr); - break; - } - Err(s) => state = s, - } - } else { - // Mark the task as scheduled. - match (*raw.header).state.compare_exchange_weak( - state, - state | SCHEDULED, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // If the task is not yet scheduled and isn't currently running, now is the - // time to schedule it. - if state & RUNNING == 0 { - // Schedule the task. - Self::schedule(ptr, ScheduleInfo::new(false)); - } else { - // Drop the waker. - Self::drop_waker(ptr); - } - - break; - } - Err(s) => state = s, - } - } - } - } - - /// Wakes a waker by reference. - unsafe fn wake_by_ref(ptr: *const ()) { - let raw = Self::from_ptr(ptr); - - let mut state = (*raw.header).state.load(Ordering::Acquire); - - loop { - // If the task is completed or closed, it can't be woken up. - if state & (COMPLETED | CLOSED) != 0 { - break; - } - - // If the task is already scheduled, we just need to synchronize with the thread that - // will run the task by "publishing" our current view of the memory. - if state & SCHEDULED != 0 { - // Update the state without actually modifying it. - match (*raw.header).state.compare_exchange_weak( - state, - state, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => break, - Err(s) => state = s, - } - } else { - // If the task is not running, we can schedule right away. - let new = if state & RUNNING == 0 { - (state | SCHEDULED) + REFERENCE - } else { - state | SCHEDULED - }; - - // Mark the task as scheduled. - match (*raw.header).state.compare_exchange_weak( - state, - new, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // If the task is not running, now is the time to schedule. - if state & RUNNING == 0 { - // If the reference count overflowed, abort. - if state > isize::MAX as usize { - abort(); - } - - // Schedule the task. There is no need to call `Self::schedule(ptr)` - // because the schedule function cannot be destroyed while the waker is - // still alive. - let task = Runnable::from_raw(NonNull::new_unchecked(ptr as *mut ())); - (*raw.schedule).schedule(task, ScheduleInfo::new(false)); - } - - break; - } - Err(s) => state = s, - } - } - } - } - - /// Clones a waker. - unsafe fn clone_waker(ptr: *const ()) -> RawWaker { - let raw = Self::from_ptr(ptr); - - // Increment the reference count. With any kind of reference-counted data structure, - // relaxed ordering is appropriate when incrementing the counter. - let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed); - - // If the reference count overflowed, abort. - if state > isize::MAX as usize { - abort(); - } - - RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE) - } - - /// Drops a waker. - /// - /// This function will decrement the reference count. If it drops down to zero, the associated - /// `Task` has been dropped too, and the task has not been completed, then it will get - /// scheduled one more time so that its future gets dropped by the executor. - #[inline] - unsafe fn drop_waker(ptr: *const ()) { - let raw = Self::from_ptr(ptr); - - // Decrement the reference count. - let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; - - // If this was the last reference to the task and the `Task` has been dropped too, - // then we need to decide how to destroy the task. - if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { - if new & (COMPLETED | CLOSED) == 0 { - // If the task was not completed nor closed, close it and schedule one more time so - // that its future gets dropped by the executor. - (*raw.header) - .state - .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release); - Self::schedule(ptr, ScheduleInfo::new(false)); - } else { - // Otherwise, destroy the task right away. - Self::destroy(ptr); + header: ptr as *const HeaderWithMetadata, + schedule: ptr.add_byte(Self::TASK_LAYOUT.offset_s) as *const S, + future: ptr.add_byte(Self::TASK_LAYOUT.offset_f) as *mut F, + output: ptr.add_byte(Self::TASK_LAYOUT.offset_r) as *mut Result, } } } - /// Drops a task reference (`Runnable` or `Waker`). - /// - /// This function will decrement the reference count. If it drops down to zero and the - /// associated `Task` handle has been dropped too, then the task gets destroyed. - #[inline] - unsafe fn drop_ref(ptr: *const ()) { - let raw = Self::from_ptr(ptr); - - // Decrement the reference count. - let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; - - // If this was the last reference to the task and the `Task` has been dropped too, - // then destroy the task. - if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { - Self::destroy(ptr); - } - } - - /// Schedules a task for running. - /// - /// This function doesn't modify the state of the task. It only passes the task reference to - /// its schedule function. - unsafe fn schedule(ptr: *const (), info: ScheduleInfo) { - let raw = Self::from_ptr(ptr); - - // If the schedule function has captured variables, create a temporary waker that prevents - // the task from getting deallocated while the function is being invoked. - let _waker; - if mem::size_of::() > 0 { - _waker = Waker::from_raw(Self::clone_waker(ptr)); - } - - let task = Runnable::from_raw(NonNull::new_unchecked(ptr as *mut ())); - (*raw.schedule).schedule(task, info); - } - - /// Drops the future inside a task. - #[inline] - unsafe fn drop_future(ptr: *const ()) { - let raw = Self::from_ptr(ptr); - - // We need a safeguard against panics because the destructor can panic. - abort_on_panic(|| { - raw.future.drop_in_place(); - }) - } - - /// Returns a pointer to the output inside a task. - unsafe fn get_output(ptr: *const ()) -> *const () { - let raw = Self::from_ptr(ptr); - raw.output as *const () - } - - /// Cleans up task's resources and deallocates it. - /// - /// The schedule function will be dropped, and the task will then get deallocated. - /// The task must be closed before this function is called. - #[inline] - unsafe fn destroy(ptr: *const ()) { - let raw = Self::from_ptr(ptr); - let task_layout = Self::task_layout(); - - // We need a safeguard against panics because destructors can panic. - abort_on_panic(|| { - // Drop the header along with the metadata. - (raw.header as *mut Header).drop_in_place(); - - // Drop the schedule function. - (raw.schedule as *mut S).drop_in_place(); - }); - - // Finally, deallocate the memory reserved by the task. - alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout); - } - /// Runs a task. /// /// If polling its future panics, the task will be closed and the panic will be propagated into /// the caller. unsafe fn run(ptr: *const ()) -> bool { let raw = Self::from_ptr(ptr); + let header = ptr as *const Header; // Create a context from the raw task pointer and the vtable inside the its header. let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE))); let cx = &mut Context::from_waker(&waker); - let mut state = (*raw.header).state.load(Ordering::Acquire); + let mut state = (*header).state.load(Ordering::Acquire); // Update the task's state before polling its future. loop { // If the task has already been closed, drop the task reference and return. if state & CLOSED != 0 { // Drop the future. - Self::drop_future(ptr); + drop_future::(ptr, &Self::TASK_LAYOUT); // Mark the task as unscheduled. - let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); + let state = (*header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); // Take the awaiter out. let mut awaiter = None; if state & AWAITER != 0 { - awaiter = (*raw.header).take(None); + awaiter = (*header).take(None); } // Drop the task reference. - Self::drop_ref(ptr); + drop_ref(ptr); // Notify the awaiter that the future has been dropped. if let Some(w) = awaiter { @@ -512,7 +256,7 @@ where } // Mark the task as unscheduled and running. - match (*raw.header).state.compare_exchange_weak( + match (*header).state.compare_exchange_weak( state, (state & !SCHEDULED) | RUNNING, Ordering::AcqRel, @@ -530,7 +274,7 @@ where // Poll the inner future, but surround it with a guard that closes the task in case polling // panics. // If available, we should also try to catch the panic so that it is propagated correctly. - let guard = Guard(raw); + let guard = Guard::(ptr, &Self::TASK_LAYOUT, PhantomData); // Panic propagation is not available for no_std. #[cfg(not(feature = "std"))] @@ -539,7 +283,7 @@ where #[cfg(feature = "std")] let poll = { // Check if we should propagate panics. - if (*raw.header).propagate_panic { + if (*header).propagate_panic { // Use catch_unwind to catch the panic. match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { ::poll(Pin::new_unchecked(&mut *raw.future), cx) @@ -558,7 +302,7 @@ where match poll { Poll::Ready(out) => { // Replace the future with its output. - Self::drop_future(ptr); + drop_future::(ptr, &Self::TASK_LAYOUT); raw.output.write(out); // The task is now completed. @@ -571,7 +315,7 @@ where }; // Mark the task as not running and completed. - match (*raw.header).state.compare_exchange_weak( + match (*header).state.compare_exchange_weak( state, new, Ordering::AcqRel, @@ -588,11 +332,11 @@ where // Take the awaiter out. let mut awaiter = None; if state & AWAITER != 0 { - awaiter = (*raw.header).take(None); + awaiter = (*header).take(None); } // Drop the task reference. - Self::drop_ref(ptr); + drop_ref(ptr); // Notify the awaiter that the future has been dropped. if let Some(w) = awaiter { @@ -620,12 +364,12 @@ where if state & CLOSED != 0 && !future_dropped { // The thread that closed the task didn't drop the future because it was // running so now it's our responsibility to do so. - Self::drop_future(ptr); + drop_future::(ptr, &Self::TASK_LAYOUT); future_dropped = true; } // Mark the task as not running. - match (*raw.header).state.compare_exchange_weak( + match (*header).state.compare_exchange_weak( state, new, Ordering::AcqRel, @@ -639,11 +383,11 @@ where // Take the awaiter out. let mut awaiter = None; if state & AWAITER != 0 { - awaiter = (*raw.header).take(None); + awaiter = (*header).take(None); } // Drop the task reference. - Self::drop_ref(ptr); + drop_ref(ptr); // Notify the awaiter that the future has been dropped. if let Some(w) = awaiter { @@ -652,11 +396,11 @@ where } else if state & SCHEDULED != 0 { // The thread that woke the task up didn't reschedule it because // it was running so now it's our responsibility to do so. - Self::schedule(ptr, ScheduleInfo::new(true)); + schedule::(ptr, ScheduleInfo::new(true)); return true; } else { // Drop the task reference. - Self::drop_ref(ptr); + drop_ref(ptr); } break; } @@ -666,86 +410,337 @@ where } } - return false; - - /// A guard that closes the task if polling its future panics. - struct Guard(RawTask) - where - F: Future, - S: Schedule; - - impl Drop for Guard - where - F: Future, - S: Schedule, - { - fn drop(&mut self) { - let raw = self.0; - let ptr = raw.header as *const (); - - unsafe { - let mut state = (*raw.header).state.load(Ordering::Acquire); - - loop { - // If the task was closed while running, then unschedule it, drop its - // future, and drop the task reference. - if state & CLOSED != 0 { - // The thread that closed the task didn't drop the future because it - // was running so now it's our responsibility to do so. - RawTask::::drop_future(ptr); - - // Mark the task as not running and not scheduled. - (*raw.header) - .state - .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel); + false + } +} - // Take the awaiter out. - let mut awaiter = None; - if state & AWAITER != 0 { - awaiter = (*raw.header).take(None); - } +/// A guard that closes the task if polling its future panics. +struct Guard(*const (), &'static TaskLayout, PhantomData F>) +where + F: Future; - // Drop the task reference. - RawTask::::drop_ref(ptr); +impl Drop for Guard +where + F: Future, +{ + fn drop(&mut self) { + let ptr = self.0; + let task_layout = self.1; + let header = ptr as *const Header; - // Notify the awaiter that the future has been dropped. - if let Some(w) = awaiter { - abort_on_panic(|| w.wake()); - } - break; + unsafe { + let header = &*header; + let mut state = header.state.load(Ordering::Acquire); + + loop { + // If the task was closed while running, then unschedule it, drop its + // future, and drop the task reference. + if state & CLOSED != 0 { + // The thread that closed the task didn't drop the future because it + // was running so now it's our responsibility to do so. + drop_future::(ptr, task_layout); + + // Mark the task as not running and not scheduled. + header + .state + .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel); + + // Take the awaiter out. + let mut awaiter = None; + if state & AWAITER != 0 { + awaiter = header.take(None); + } + + // Drop the task reference. + drop_ref(ptr); + + // Notify the awaiter that the future has been dropped. + if let Some(w) = awaiter { + abort_on_panic(|| w.wake()); + } + break; + } + + // Mark the task as not running, not scheduled, and closed. + match header.state.compare_exchange_weak( + state, + (state & !RUNNING & !SCHEDULED) | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(state) => { + // Drop the future because the task is now closed. + drop_future::(ptr, task_layout); + + // Take the awaiter out. + let mut awaiter = None; + if state & AWAITER != 0 { + awaiter = header.take(None); } - // Mark the task as not running, not scheduled, and closed. - match (*raw.header).state.compare_exchange_weak( - state, - (state & !RUNNING & !SCHEDULED) | CLOSED, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(state) => { - // Drop the future because the task is now closed. - RawTask::::drop_future(ptr); + // Drop the task reference. + drop_ref(ptr); - // Take the awaiter out. - let mut awaiter = None; - if state & AWAITER != 0 { - awaiter = (*raw.header).take(None); - } + // Notify the awaiter that the future has been dropped. + if let Some(w) = awaiter { + abort_on_panic(|| w.wake()); + } + break; + } + Err(s) => state = s, + } + } + } + } +} - // Drop the task reference. - RawTask::::drop_ref(ptr); +/// Schedules a task for running. +/// +/// This function doesn't modify the state of the task. It only passes the task reference to +/// its schedule function. +unsafe fn schedule, M>(ptr: *const (), info: ScheduleInfo) { + let header = ptr as *const Header; + let task_layout = (*header).vtable.layout_info; + let schedule = ptr.add_byte(task_layout.offset_s) as *mut S; + + // If the schedule function has captured variables, create a temporary waker that prevents + // the task from getting deallocated while the function is being invoked. + let _waker; + if mem::size_of::() > 0 { + _waker = Waker::from_raw(Header::clone_waker(ptr)); + } - // Notify the awaiter that the future has been dropped. - if let Some(w) = awaiter { - abort_on_panic(|| w.wake()); - } - break; - } - Err(s) => state = s, + let task = Runnable::from_raw(NonNull::new_unchecked(ptr as *mut ())); + (*schedule).schedule(task, info); +} + +/// Drops a waker. +/// +/// This function will decrement the reference count. If it drops down to zero, the associated +/// `Task` has been dropped too, and the task has not been completed, then it will get +/// scheduled one more time so that its future gets dropped by the executor. +#[inline] +unsafe fn drop_waker(ptr: *const ()) { + let header = ptr as *const Header; + match Header::drop_waker(ptr) { + DropWakerAction::Schedule => ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)), + DropWakerAction::Destroy => ((*header).vtable.destroy)(ptr), + DropWakerAction::None => {} + } +} + +/// Drops the future inside a task. +#[inline] +unsafe fn drop_future(ptr: *const (), task_layout: &TaskLayout) { + let future_ptr = ptr.add_byte(task_layout.offset_f) as *mut F; + + // We need a safeguard against panics because the destructor can panic. + abort_on_panic(|| { + future_ptr.drop_in_place(); + }) +} + +/// Wakes a waker. +unsafe fn wake, M>(ptr: *const ()) { + // This is just an optimization. If the schedule function has captured variables, then + // we'll do less reference counting if we wake the waker by reference and then drop it. + if mem::size_of::() > 0 { + wake_by_ref::(ptr); + drop_waker(ptr); + return; + } + + let header = ptr as *const Header; + + let mut state = (*header).state.load(Ordering::Acquire); + + loop { + // If the task is completed or closed, it can't be woken up. + if state & (COMPLETED | CLOSED) != 0 { + // Drop the waker. + drop_waker(ptr); + break; + } + + // If the task is already scheduled, we just need to synchronize with the thread that + // will run the task by "publishing" our current view of the memory. + if state & SCHEDULED != 0 { + // Update the state without actually modifying it. + match (*header).state.compare_exchange_weak( + state, + state, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Drop the waker. + drop_waker(ptr); + break; + } + Err(s) => state = s, + } + } else { + // Mark the task as scheduled. + match (*header).state.compare_exchange_weak( + state, + state | SCHEDULED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the task is not yet scheduled and isn't currently running, now is the + // time to schedule it. + if state & RUNNING == 0 { + // Schedule the task. + schedule::(ptr, ScheduleInfo::new(false)); + } else { + // Drop the waker. + drop_waker(ptr); + } + + break; + } + Err(s) => state = s, + } + } + } +} + +/// Wakes a waker by reference. +unsafe fn wake_by_ref, M>(ptr: *const ()) { + let header = ptr as *const Header; + let header = &*header; + let task_layout = header.vtable.layout_info; + + let mut state = header.state.load(Ordering::Acquire); + + loop { + // If the task is completed or closed, it can't be woken up. + if state & (COMPLETED | CLOSED) != 0 { + break; + } + + // If the task is already scheduled, we just need to synchronize with the thread that + // will run the task by "publishing" our current view of the memory. + if state & SCHEDULED != 0 { + // Update the state without actually modifying it. + match header.state.compare_exchange_weak( + state, + state, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(s) => state = s, + } + } else { + // If the task is not running, we can schedule right away. + let new = if state & RUNNING == 0 { + (state | SCHEDULED) + REFERENCE + } else { + state | SCHEDULED + }; + + // Mark the task as scheduled. + match header.state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the task is not running, now is the time to schedule. + if state & RUNNING == 0 { + // If the reference count overflowed, abort. + if state > isize::MAX as usize { + abort(); } + + let schedule = ptr.add_byte(task_layout.offset_s) as *mut S; + + // Schedule the task. There is no need to call `Self::schedule(ptr)` + // because the schedule function cannot be destroyed while the waker is + // still alive. + let task = Runnable::from_raw(NonNull::new_unchecked(ptr as *mut ())); + (*schedule).schedule(task, ScheduleInfo::new(false)); } + + break; } + Err(s) => state = s, } } } } + +/// Cleans up task's resources and deallocates it. +/// +/// The schedule function will be dropped, and the task will then get deallocated. +/// The task must be closed before this function is called. +#[inline] +unsafe fn destroy(ptr: *const ()) { + let header = ptr as *const Header; + let task_layout = (*header).vtable.layout_info; + let schedule = ptr.add_byte(task_layout.offset_s); + + // We need a safeguard against panics because destructors can panic. + abort_on_panic(|| { + // Drop the header along with the metadata. + (ptr as *mut HeaderWithMetadata).drop_in_place(); + + // Drop the schedule function. + (schedule as *mut S).drop_in_place(); + }); + + // Finally, deallocate the memory reserved by the task. + alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout); +} + +/// Drops a task reference (`Runnable` or `Waker`). +/// +/// This function will decrement the reference count. If it drops down to zero and the +/// associated `Task` handle has been dropped too, then the task gets destroyed. +#[inline] +pub(crate) unsafe fn drop_ref(ptr: *const ()) { + let header = ptr as *const Header; + let header = &*header; + + // Decrement the reference count. + let new = header.state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; + + // If this was the last reference to the task and the `Task` has been dropped too, + // then destroy the task. + if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { + (header.vtable.destroy)(ptr); + } +} + +trait PointerPolyfill { + // Polyfill for `byte_add`. + // TODO: Replace this with `byte_add` once the MSRV should be bumped past 1.75 + /// Adds an unsigned offset in bytes to a pointer. + /// + /// `count` is in units of bytes. + /// + /// This is purely a convenience for casting to a `u8` pointer and + /// using [add][pointer::add] on it. See that method for documentation + /// and safety requirements. + /// + /// # Safety + /// If any of the following conditions are violated, the result is Undefined Behavior: + /// + /// - The offset in bytes, count * size_of::(), computed on mathematical integers + /// (without “wrapping around”), must fit in an isize. + /// - If the computed offset is non-zero, then self must be derived from a pointer to + /// some allocation, and the entire memory range between self and the result must be + /// in bounds of that allocation. In particular, this range must not “wrap around” + /// the edge of the address space. + unsafe fn add_byte(self, size: usize) -> Self; +} + +impl PointerPolyfill for *const T { + #[inline] + unsafe fn add_byte(self, size: usize) -> Self { + (self.cast::().add(size)).cast::() + } +} diff --git a/src/runnable.rs b/src/runnable.rs index d0454a8..cada62d 100644 --- a/src/runnable.rs +++ b/src/runnable.rs @@ -7,6 +7,8 @@ use core::sync::atomic::Ordering; use core::task::Waker; use crate::header::Header; +use crate::header::HeaderWithMetadata; +use crate::raw::drop_ref; use crate::raw::{allocate_task, RawTask}; use crate::state::*; use crate::Task; @@ -715,7 +717,7 @@ impl Runnable { /// Tasks can be created with a metadata object associated with them; by default, this /// is a `()` value. See the [`Builder::metadata()`] method for more information. pub fn metadata(&self) -> &M { - &self.header().metadata + &self.header_with_metadata().metadata } /// Schedules the task. @@ -742,7 +744,7 @@ impl Runnable { /// ``` pub fn schedule(self) { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header; mem::forget(self); unsafe { @@ -780,7 +782,7 @@ impl Runnable { /// ``` pub fn run(self) -> bool { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header; mem::forget(self); unsafe { ((*header).vtable.run)(ptr) } @@ -813,17 +815,18 @@ impl Runnable { /// # handle.join().unwrap(); /// ``` pub fn waker(&self) -> Waker { - let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; - unsafe { - let raw_waker = ((*header).vtable.clone_waker)(ptr); + let raw_waker = Header::clone_waker(self.ptr.as_ptr()); Waker::from_raw(raw_waker) } } - fn header(&self) -> &Header { - unsafe { &*(self.ptr.as_ptr() as *const Header) } + fn header(&self) -> &Header { + unsafe { &*(self.ptr.as_ptr() as *const Header) } + } + + fn header_with_metadata(&self) -> &HeaderWithMetadata { + unsafe { &*(self.ptr.as_ptr() as *const HeaderWithMetadata) } } /// Converts this task into a raw pointer. @@ -925,7 +928,7 @@ impl Drop for Runnable { } // Drop the future. - (header.vtable.drop_future)(ptr); + (header.vtable.drop_future)(ptr, header.vtable.layout_info); // Mark the task as unscheduled. let state = header.state.fetch_and(!SCHEDULED, Ordering::AcqRel); @@ -936,7 +939,7 @@ impl Drop for Runnable { } // Drop the task reference. - (header.vtable.drop_ref)(ptr); + drop_ref(ptr); } } } @@ -944,7 +947,7 @@ impl Drop for Runnable { impl fmt::Debug for Runnable { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const HeaderWithMetadata; f.debug_struct("Runnable") .field("header", unsafe { &(*header) }) diff --git a/src/task.rs b/src/task.rs index 700cc60..d92172f 100644 --- a/src/task.rs +++ b/src/task.rs @@ -7,10 +7,10 @@ use core::ptr::NonNull; use core::sync::atomic::Ordering; use core::task::{Context, Poll}; -use crate::header::Header; +use crate::header::{Header, HeaderWithMetadata}; use crate::raw::Panic; -use crate::runnable::ScheduleInfo; use crate::state::*; +use crate::ScheduleInfo; /// A spawned task. /// @@ -85,8 +85,8 @@ impl Task { /// .detach(); /// ``` pub fn detach(self) { - let mut this = self; - let _out = this.set_detached(); + let this = self; + let _out = set_detached::(this.ptr.as_ptr()); mem::forget(this); } @@ -125,8 +125,8 @@ impl Task { /// }); /// ``` pub async fn cancel(self) -> Option { - let mut this = self; - this.set_canceled(); + let this = self; + set_canceled(this.ptr.as_ptr()); this.fallible().await } @@ -179,277 +179,277 @@ impl Task { FallibleTask { task: self } } - /// Puts the task in canceled state. - fn set_canceled(&mut self) { + fn header_with_metadata(&self) -> &HeaderWithMetadata { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; - - unsafe { - let mut state = (*header).state.load(Ordering::Acquire); - - loop { - // If the task has been completed or closed, it can't be canceled. - if state & (COMPLETED | CLOSED) != 0 { - break; - } - - // If the task is not scheduled nor running, we'll need to schedule it. - let new = if state & (SCHEDULED | RUNNING) == 0 { - (state | SCHEDULED | CLOSED) + REFERENCE - } else { - state | CLOSED - }; - - // Mark the task as closed. - match (*header).state.compare_exchange_weak( - state, - new, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // If the task is not scheduled nor running, schedule it one more time so - // that its future gets dropped by the executor. - if state & (SCHEDULED | RUNNING) == 0 { - ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); - } + let header = ptr as *const HeaderWithMetadata; + unsafe { &*header } + } - // Notify the awaiter that the task has been closed. - if state & AWAITER != 0 { - (*header).notify(None); - } + /// Returns `true` if the current task is finished. + /// + /// Note that in a multithreaded environment, this task can change finish immediately after calling this function. + pub fn is_finished(&self) -> bool { + let ptr = self.ptr.as_ptr(); + let header = ptr as *const Header; - break; - } - Err(s) => state = s, - } - } + unsafe { + let state = (*header).state.load(Ordering::Acquire); + state & (CLOSED | COMPLETED) != 0 } } - /// Puts the task in detached state. - fn set_detached(&mut self) -> Option> { + /// Get the metadata associated with this task. + /// + /// Tasks can be created with a metadata object associated with them; by default, this + /// is a `()` value. See the [`Builder::metadata()`] method for more information. + pub fn metadata(&self) -> &M { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const HeaderWithMetadata; + &unsafe { &*header }.metadata + } +} - unsafe { - // A place where the output will be stored in case it needs to be dropped. - let mut output = None; - - // Optimistically assume the `Task` is being detached just after creating the task. - // This is a common case so if the `Task` is detached, the overhead of it is only one - // compare-exchange operation. - if let Err(mut state) = (*header).state.compare_exchange_weak( - SCHEDULED | TASK | REFERENCE, - SCHEDULED | REFERENCE, - Ordering::AcqRel, - Ordering::Acquire, - ) { - loop { - // If the task has been completed but not yet closed, that means its output - // must be dropped. - if state & COMPLETED != 0 && state & CLOSED == 0 { - // Mark the task as closed in order to grab its output. - match (*header).state.compare_exchange_weak( - state, - state | CLOSED, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // Read the output. - output = Some( - (((*header).vtable.get_output)(ptr) as *mut Result) - .read(), - ); - - // Update the state variable because we're continuing the loop. - state |= CLOSED; - } - Err(s) => state = s, +/// Puts the task in detached state. +#[inline(never)] +fn set_detached(ptr: *const ()) -> Option> { + let header = ptr as *const Header; + + unsafe { + // A place where the output will be stored in case it needs to be dropped. + let mut output = None; + + // Optimistically assume the `Task` is being detached just after creating the task. + // This is a common case so if the `Task` is datached, the overhead of it is only one + // compare-exchange operation. + if let Err(mut state) = (*header).state.compare_exchange_weak( + SCHEDULED | TASK | REFERENCE, + SCHEDULED | REFERENCE, + Ordering::AcqRel, + Ordering::Acquire, + ) { + loop { + // If the task has been completed but not yet closed, that means its output + // must be dropped. + if state & COMPLETED != 0 && state & CLOSED == 0 { + // Mark the task as closed in order to grab its output. + match (*header).state.compare_exchange_weak( + state, + state | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Read the output. + output = Some( + ((*header).vtable.get_output(ptr) as *mut Result).read(), + ); + + // Update the state variable because we're continuing the loop. + state |= CLOSED; } + Err(s) => state = s, + } + } else { + // If this is the last reference to the task and it's not closed, then + // close it and schedule one more time so that its future gets dropped by + // the executor. + let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 { + SCHEDULED | CLOSED | REFERENCE } else { - // If this is the last reference to the task and it's not closed, then - // close it and schedule one more time so that its future gets dropped by - // the executor. - let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 { - SCHEDULED | CLOSED | REFERENCE - } else { - state & !TASK - }; - - // Unset the `TASK` flag. - match (*header).state.compare_exchange_weak( - state, - new, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // If this is the last reference to the task, we need to either - // schedule dropping its future or destroy it. - if state & !(REFERENCE - 1) == 0 { - if state & CLOSED == 0 { - ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); - } else { - ((*header).vtable.destroy)(ptr); - } + state & !TASK + }; + + // Unset the `TASK` flag. + match (*header).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If this is the last reference to the task, we need to either + // schedule dropping its future or destroy it. + if state & !(REFERENCE - 1) == 0 { + if state & CLOSED == 0 { + ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); + } else { + ((*header).vtable.destroy)(ptr); } - - break; } - Err(s) => state = s, + + break; } + Err(s) => state = s, } } } - - output } + + output } +} - /// Polls the task to retrieve its output. - /// - /// Returns `Some` if the task has completed or `None` if it was closed. - /// - /// A task becomes closed in the following cases: - /// - /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`. - /// 2. Its output gets awaited by the `Task`. - /// 3. It panics while polling the future. - /// 4. It is completed and the `Task` gets dropped. - fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll> { - let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; +/// Puts the task in canceled state. +#[inline(never)] +fn set_canceled(ptr: *const ()) { + let header = ptr as *const Header; - unsafe { - let mut state = (*header).state.load(Ordering::Acquire); + unsafe { + let mut state = (*header).state.load(Ordering::Acquire); - loop { - // If the task has been closed, notify the awaiter and return `None`. - if state & CLOSED != 0 { - // If the task is scheduled or running, we need to wait until its future is - // dropped. - if state & (SCHEDULED | RUNNING) != 0 { - // Replace the waker with one associated with the current task. - (*header).register(cx.waker()); + loop { + // If the task has been completed or closed, it can't be canceled. + if state & (COMPLETED | CLOSED) != 0 { + break; + } - // Reload the state after registering. It is possible changes occurred just - // before registration so we need to check for that. - state = (*header).state.load(Ordering::Acquire); + // If the task is not scheduled nor running, we'll need to schedule it. + let new = if state & (SCHEDULED | RUNNING) == 0 { + (state | SCHEDULED | CLOSED) + REFERENCE + } else { + state | CLOSED + }; + + // Mark the task as closed. + match (*header).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the task is not scheduled nor running, schedule it one more time so + // that its future gets dropped by the executor. + if state & (SCHEDULED | RUNNING) == 0 { + ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); + } - // If the task is still scheduled or running, we need to wait because its - // future is not dropped yet. - if state & (SCHEDULED | RUNNING) != 0 { - return Poll::Pending; - } + // Notify the awaiter that the task has been closed. + if state & AWAITER != 0 { + (*header).notify(None); } - // Even though the awaiter is most likely the current task, it could also be - // another task. - (*header).notify(Some(cx.waker())); - return Poll::Ready(None); + break; } + Err(s) => state = s, + } + } + } +} - // If the task is not completed, register the current task. - if state & COMPLETED == 0 { +/// Polls the task to retrieve its output. +/// +/// Returns `Some` if the task has completed or `None` if it was closed. +/// +/// A task becomes closed in the following cases: +/// +/// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`. +/// 2. Its output gets awaited by the `Task`. +/// 3. It panics while polling the future. +/// 4. It is completed and the `Task` gets dropped. +fn poll_task(ptr: *const (), cx: &mut Context<'_>) -> Poll> { + let header = ptr as *const Header; + + unsafe { + let mut state = (*header).state.load(Ordering::Acquire); + + loop { + // If the task has been closed, notify the awaiter and return `None`. + if state & CLOSED != 0 { + // If the task is scheduled or running, we need to wait until its future is + // dropped. + if state & (SCHEDULED | RUNNING) != 0 { // Replace the waker with one associated with the current task. (*header).register(cx.waker()); - // Reload the state after registering. It is possible that the task became - // completed or closed just before registration so we need to check for that. + // Reload the state after registering. It is possible changes occurred just + // before registration so we need to check for that. state = (*header).state.load(Ordering::Acquire); - // If the task has been closed, restart. - if state & CLOSED != 0 { - continue; - } - - // If the task is still not completed, we're blocked on it. - if state & COMPLETED == 0 { + // If the task is still scheduled or running, we need to wait because its + // future is not dropped yet. + if state & (SCHEDULED | RUNNING) != 0 { return Poll::Pending; } } - // Since the task is now completed, mark it as closed in order to grab its output. - match (*header).state.compare_exchange( - state, - state | CLOSED, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // Notify the awaiter. Even though the awaiter is most likely the current - // task, it could also be another task. - if state & AWAITER != 0 { - (*header).notify(Some(cx.waker())); - } + // Even though the awaiter is most likely the current task, it could also be + // another task. + (*header).notify(Some(cx.waker())); + return Poll::Ready(None); + } - // Take the output from the task. - let output = ((*header).vtable.get_output)(ptr) as *mut Result; - let output = output.read(); + // If the task is not completed, register the current task. + if state & COMPLETED == 0 { + // Replace the waker with one associated with the current task. + (*header).register(cx.waker()); - // Propagate the panic if the task panicked. - let output = match output { - Ok(output) => output, - #[allow(unreachable_patterns)] - Err(panic) => { - #[cfg(feature = "std")] - std::panic::resume_unwind(panic); + // Reload the state after registering. It is possible that the task became + // completed or closed just before registration so we need to check for that. + state = (*header).state.load(Ordering::Acquire); - #[cfg(not(feature = "std"))] - match panic {} - } - }; + // If the task has been closed, restart. + if state & CLOSED != 0 { + continue; + } - return Poll::Ready(Some(output)); - } - Err(s) => state = s, + // If the task is still not completed, we're blocked on it. + if state & COMPLETED == 0 { + return Poll::Pending; } } - } - } - fn header(&self) -> &Header { - let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; - unsafe { &*header } - } + // Since the task is now completed, mark it as closed in order to grab its output. + match (*header).state.compare_exchange( + state, + state | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Notify the awaiter. Even though the awaiter is most likely the current + // task, it could also be another task. + if state & AWAITER != 0 { + (*header).notify(Some(cx.waker())); + } - /// Returns `true` if the current task is finished. - /// - /// Note that in a multithreaded environment, this task can change finish immediately after calling this function. - pub fn is_finished(&self) -> bool { - let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + // Take the output from the task. + let output = (*header).vtable.get_output(ptr) as *mut Result; + let output = output.read(); - unsafe { - let state = (*header).state.load(Ordering::Acquire); - state & (CLOSED | COMPLETED) != 0 - } - } + // Propagate the panic if the task panicked. + let output = match output { + Ok(output) => output, + Err(panic) => { + #[cfg(feature = "std")] + std::panic::resume_unwind(panic); - /// Get the metadata associated with this task. - /// - /// Tasks can be created with a metadata object associated with them; by default, this - /// is a `()` value. See the [`Builder::metadata()`] method for more information. - pub fn metadata(&self) -> &M { - &self.header().metadata + #[cfg(not(feature = "std"))] + match panic {} + } + }; + + return Poll::Ready(Some(output)); + } + Err(s) => state = s, + } + } } } impl Drop for Task { fn drop(&mut self) { - self.set_canceled(); - self.set_detached(); + let ptr = self.ptr.as_ptr(); + set_canceled(ptr); + set_detached::(ptr); } } impl Future for Task { type Output = T; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.poll_task(cx) { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match poll_task::(self.ptr.as_ptr(), cx) { Poll::Ready(t) => Poll::Ready(t.expect("Task polled after completion")), Poll::Pending => Poll::Pending, } @@ -459,7 +459,7 @@ impl Future for Task { impl fmt::Debug for Task { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Task") - .field("header", self.header()) + .field("header", self.header_with_metadata()) .finish() } } @@ -552,15 +552,15 @@ impl FallibleTask { impl Future for FallibleTask { type Output = Option; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.task.poll_task(cx) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + poll_task::(self.task.ptr.as_ptr(), cx) } } impl fmt::Debug for FallibleTask { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FallibleTask") - .field("header", self.task.header()) + .field("header", self.task.header_with_metadata()) .finish() } }