Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: add track_caller to public APIs #4785

Merged
merged 2 commits into from Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions tokio-util/Cargo.toml
Expand Up @@ -55,6 +55,7 @@ tokio-stream = { version = "0.1", path = "../tokio-stream" }
async-stream = "0.3.0"
futures = "0.3.0"
futures-test = "0.3.5"
parking_lot = "0.12.0"

[package.metadata.docs.rs]
all-features = true
Expand Down
3 changes: 2 additions & 1 deletion tokio-util/src/io/sync_bridge.rs
Expand Up @@ -85,9 +85,10 @@ impl<T: Unpin> SyncIoBridge<T> {
///
/// Use e.g. `SyncIoBridge::new(Box::pin(src))`.
///
/// # Panic
/// # Panics
///
/// This will panic if called outside the context of a Tokio runtime.
#[track_caller]
pub fn new(src: T) -> Self {
Self::new_with_handle(src, tokio::runtime::Handle::current())
}
Expand Down
1 change: 1 addition & 0 deletions tokio-util/src/sync/mpsc.rs
Expand Up @@ -136,6 +136,7 @@ impl<T: Send + 'static> PollSender<T> {
///
/// If `poll_reserve` was not successfully called prior to calling `send_item`, then this method
/// will panic.
#[track_caller]
pub fn send_item(&mut self, value: T) -> Result<(), PollSendError<T>> {
let (result, next_state) = match self.take_state() {
State::Idle(_) | State::Acquiring => {
Expand Down
5 changes: 5 additions & 0 deletions tokio-util/src/task/spawn_pinned.rs
Expand Up @@ -57,7 +57,9 @@ impl LocalPoolHandle {
/// pool via [`LocalPoolHandle::spawn_pinned`].
///
/// # Panics
///
/// Panics if the pool size is less than one.
#[track_caller]
pub fn new(pool_size: usize) -> LocalPoolHandle {
assert!(pool_size > 0);

Expand Down Expand Up @@ -167,6 +169,7 @@ impl LocalPoolHandle {
/// }
/// ```
///
#[track_caller]
pub fn spawn_pinned_by_idx<F, Fut>(&self, create_task: F, idx: usize) -> JoinHandle<Fut::Output>
where
F: FnOnce() -> Fut,
Expand Down Expand Up @@ -196,6 +199,7 @@ struct LocalPool {

impl LocalPool {
/// Spawn a `?Send` future onto a worker
#[track_caller]
fn spawn_pinned<F, Fut>(
&self,
create_task: F,
Expand Down Expand Up @@ -324,6 +328,7 @@ impl LocalPool {
}
}

#[track_caller]
fn find_worker_by_idx(&self, idx: usize) -> (&LocalWorkerHandle, JobCountGuard) {
let worker = &self.workers[idx];
worker.task_count.fetch_add(1, Ordering::SeqCst);
Expand Down
13 changes: 13 additions & 0 deletions tokio-util/src/time/delay_queue.rs
Expand Up @@ -531,6 +531,7 @@ impl<T> DelayQueue<T> {
/// [`reset`]: method@Self::reset
/// [`Key`]: struct@Key
/// [type]: #
#[track_caller]
pub fn insert_at(&mut self, value: T, when: Instant) -> Key {
assert!(self.slab.len() < MAX_ENTRIES, "max entries exceeded");

Expand Down Expand Up @@ -649,10 +650,12 @@ impl<T> DelayQueue<T> {
/// [`reset`]: method@Self::reset
/// [`Key`]: struct@Key
/// [type]: #
#[track_caller]
pub fn insert(&mut self, value: T, timeout: Duration) -> Key {
self.insert_at(value, Instant::now() + timeout)
}

#[track_caller]
fn insert_idx(&mut self, when: u64, key: Key) {
use self::wheel::{InsertError, Stack};

Expand All @@ -674,6 +677,7 @@ impl<T> DelayQueue<T> {
/// # Panics
///
/// Panics if the key is not contained in the expired queue or the wheel.
#[track_caller]
fn remove_key(&mut self, key: &Key) {
use crate::time::wheel::Stack;

Expand Down Expand Up @@ -713,6 +717,7 @@ impl<T> DelayQueue<T> {
/// assert_eq!(*item.get_ref(), "foo");
/// # }
/// ```
#[track_caller]
pub fn remove(&mut self, key: &Key) -> Expired<T> {
let prev_deadline = self.next_deadline();

Expand Down Expand Up @@ -769,6 +774,7 @@ impl<T> DelayQueue<T> {
/// // "foo" is now scheduled to be returned in 10 seconds
/// # }
/// ```
#[track_caller]
pub fn reset_at(&mut self, key: &Key, when: Instant) {
self.remove_key(key);

Expand Down Expand Up @@ -873,6 +879,7 @@ impl<T> DelayQueue<T> {
/// // "foo"is now scheduled to be returned in 10 seconds
/// # }
/// ```
#[track_caller]
pub fn reset(&mut self, key: &Key, timeout: Duration) {
self.reset_at(key, Instant::now() + timeout);
}
Expand Down Expand Up @@ -978,7 +985,12 @@ impl<T> DelayQueue<T> {
/// assert!(delay_queue.capacity() >= 11);
/// # }
/// ```
#[track_caller]
pub fn reserve(&mut self, additional: usize) {
assert!(
self.slab.capacity() + additional <= MAX_ENTRIES,
"max queue capacity exceeded"
);
self.slab.reserve(additional);
}

Expand Down Expand Up @@ -1117,6 +1129,7 @@ impl<T> wheel::Stack for Stack<T> {
}
}

#[track_caller]
fn remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store) {
let key = *item;
assert!(store.contains(item));
Expand Down
1 change: 1 addition & 0 deletions tokio-util/src/time/wheel/mod.rs
Expand Up @@ -118,6 +118,7 @@ where
}

/// Remove `item` from the timing wheel.
#[track_caller]
pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) {
let when = T::when(item, store);

Expand Down
226 changes: 226 additions & 0 deletions tokio-util/tests/panic.rs
@@ -0,0 +1,226 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

use parking_lot::{const_mutex, Mutex};
use std::error::Error;
use std::panic;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::channel;
use tokio::time::{Duration, Instant};
use tokio_test::task;
use tokio_util::io::SyncIoBridge;
use tokio_util::sync::PollSender;
use tokio_util::task::LocalPoolHandle;
use tokio_util::time::DelayQueue;

// Taken from tokio-util::time::wheel, if that changes then
const MAX_DURATION_MS: u64 = (1 << (36)) - 1;

fn test_panic<Func: FnOnce() + panic::UnwindSafe>(func: Func) -> Option<String> {
static PANIC_MUTEX: Mutex<()> = const_mutex(());

{
let _guard = PANIC_MUTEX.lock();
let panic_file: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));

let prev_hook = panic::take_hook();
{
let panic_file = panic_file.clone();
panic::set_hook(Box::new(move |panic_info| {
let panic_location = panic_info.location().unwrap();
panic_file
.lock()
.clone_from(&Some(panic_location.file().to_string()));
}));
}

let result = panic::catch_unwind(func);
// Return to the previously set panic hook (maybe default) so that we get nice error
// messages in the tests.
panic::set_hook(prev_hook);

if result.is_err() {
panic_file.lock().clone()
} else {
None
}
}
}

#[test]
fn sync_bridge_new_panic_caller() -> Result<(), Box<dyn Error>> {
let panic_location_file = test_panic(|| {
let _ = SyncIoBridge::new(tokio::io::empty());
});

// The panic location should be in this file
assert_eq!(&panic_location_file.unwrap(), file!());

Ok(())
}

#[test]
fn poll_sender_send_item_panic_caller() -> Result<(), Box<dyn Error>> {
let panic_location_file = test_panic(|| {
let (send, _) = channel::<u32>(3);
let mut send = PollSender::new(send);

let _ = send.send_item(42);
});

// The panic location should be in this file
assert_eq!(&panic_location_file.unwrap(), file!());

Ok(())
}

#[test]

fn local_pool_handle_new_panic_caller() -> Result<(), Box<dyn Error>> {
let panic_location_file = test_panic(|| {
let _ = LocalPoolHandle::new(0);
});

// The panic location should be in this file
assert_eq!(&panic_location_file.unwrap(), file!());

Ok(())
}

#[test]

fn local_pool_handle_spawn_pinned_by_idx_panic_caller() -> Result<(), Box<dyn Error>> {
let panic_location_file = test_panic(|| {
let rt = basic();

rt.block_on(async {
let handle = LocalPoolHandle::new(2);
handle.spawn_pinned_by_idx(|| async { "test" }, 3);
});
});

// The panic location should be in this file
assert_eq!(&panic_location_file.unwrap(), file!());

Ok(())
}
#[test]
fn delay_queue_insert_at_panic_caller() -> Result<(), Box<dyn Error>> {
let panic_location_file = test_panic(|| {
let rt = basic();
rt.block_on(async {
let mut queue = task::spawn(DelayQueue::with_capacity(3));

//let st = std::time::Instant::from(SystemTime::UNIX_EPOCH);
let _k = queue.insert_at(
"1",
Instant::now() + Duration::from_millis(MAX_DURATION_MS + 1),
);
});
});

// The panic location should be in this file
assert_eq!(&panic_location_file.unwrap(), file!());

Ok(())
}

#[test]
fn delay_queue_insert_panic_caller() -> Result<(), Box<dyn Error>> {
let panic_location_file = test_panic(|| {
let rt = basic();
rt.block_on(async {
let mut queue = task::spawn(DelayQueue::with_capacity(3));

let _k = queue.insert("1", Duration::from_millis(MAX_DURATION_MS + 1));
});
});

// The panic location should be in this file
assert_eq!(&panic_location_file.unwrap(), file!());

Ok(())
}

#[test]
fn delay_queue_remove_panic_caller() -> Result<(), Box<dyn Error>> {
let panic_location_file = test_panic(|| {
let rt = basic();
rt.block_on(async {
let mut queue = task::spawn(DelayQueue::with_capacity(3));

let key = queue.insert_at("1", Instant::now());
queue.remove(&key);
queue.remove(&key);
});
});

// The panic location should be in this file
assert_eq!(&panic_location_file.unwrap(), file!());

Ok(())
}

#[test]
fn delay_queue_reset_at_panic_caller() -> Result<(), Box<dyn Error>> {
let panic_location_file = test_panic(|| {
let rt = basic();
rt.block_on(async {
let mut queue = task::spawn(DelayQueue::with_capacity(3));

let key = queue.insert_at("1", Instant::now());
queue.reset_at(
&key,
Instant::now() + Duration::from_millis(MAX_DURATION_MS + 1),
);
});
});

// The panic location should be in this file
assert_eq!(&panic_location_file.unwrap(), file!());

Ok(())
}

#[test]
fn delay_queue_reset_panic_caller() -> Result<(), Box<dyn Error>> {
let panic_location_file = test_panic(|| {
let rt = basic();
rt.block_on(async {
let mut queue = task::spawn(DelayQueue::with_capacity(3));

let key = queue.insert_at("1", Instant::now());
queue.reset(&key, Duration::from_millis(MAX_DURATION_MS + 1));
});
});

// The panic location should be in this file
assert_eq!(&panic_location_file.unwrap(), file!());

Ok(())
}

#[test]
fn delay_queue_reserve_panic_caller() -> Result<(), Box<dyn Error>> {
let panic_location_file = test_panic(|| {
let rt = basic();
rt.block_on(async {
let mut queue = task::spawn(DelayQueue::<u32>::with_capacity(3));

queue.reserve((1 << 30) as usize);
});
});

// The panic location should be in this file
assert_eq!(&panic_location_file.unwrap(), file!());

Ok(())
}

fn basic() -> Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
}