From cf419676c12390fe83cda85ef63a3c38ea636144 Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Sat, 14 May 2022 22:20:27 +0100 Subject: [PATCH 01/23] Add new lock type for serial cases --- serial_test/src/code_lock.rs | 12 +++++--- serial_test/src/lib.rs | 2 ++ serial_test/src/rwlock.rs | 60 ++++++++++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 5 deletions(-) create mode 100644 serial_test/src/rwlock.rs diff --git a/serial_test/src/code_lock.rs b/serial_test/src/code_lock.rs index c125e08..b41f741 100644 --- a/serial_test/src/code_lock.rs +++ b/serial_test/src/code_lock.rs @@ -1,5 +1,5 @@ use lazy_static::lazy_static; -use parking_lot::{Mutex, ReentrantMutex, ReentrantMutexGuard, RwLock}; +use parking_lot::{Mutex, ReentrantMutexGuard, RwLock}; use std::cell::RefCell; use std::collections::HashMap; use std::ops::{Deref, DerefMut}; @@ -7,8 +7,10 @@ use std::sync::atomic::AtomicU32; use std::sync::Arc; use std::time::Duration; +use crate::rwlock::{Locks, MutexGuardWrapper}; + struct UniqueReentrantMutex { - mutex: ReentrantMutex<()>, + locks: Locks, // Only actually used for tests #[allow(dead_code)] @@ -16,8 +18,8 @@ struct UniqueReentrantMutex { } impl UniqueReentrantMutex { - fn lock(&self) -> ReentrantMutexGuard<()> { - self.mutex.lock() + fn lock(&self) -> MutexGuardWrapper { + self.locks.serial() } } @@ -32,7 +34,7 @@ lazy_static! { impl Default for UniqueReentrantMutex { fn default() -> Self { Self { - mutex: Default::default(), + locks: Locks::new(), id: MUTEX_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst), } } diff --git a/serial_test/src/lib.rs b/serial_test/src/lib.rs index 57bb07b..131c8a8 100644 --- a/serial_test/src/lib.rs +++ b/serial_test/src/lib.rs @@ -41,6 +41,8 @@ )] mod code_lock; +mod rwlock; + #[cfg(feature = "file_locks")] mod file_lock; diff --git a/serial_test/src/rwlock.rs b/serial_test/src/rwlock.rs new file mode 100644 index 0000000..d42aad5 --- /dev/null +++ b/serial_test/src/rwlock.rs @@ -0,0 +1,60 @@ +use std::sync::{Arc, Condvar, Mutex}; + +use parking_lot::{ReentrantMutex, ReentrantMutexGuard}; + +struct LockState { + parallels: u32, +} + +struct LockData { + mutex: Mutex, + serial: ReentrantMutex<()>, + condvar: Condvar, +} + +#[derive(Clone)] +pub(crate) struct Locks { + arc: Arc, +} + +pub(crate) struct MutexGuardWrapper<'a> { + #[allow(dead_code)] // need it around to get dropped + mutex_guard: ReentrantMutexGuard<'a, ()>, + locks: Locks, +} + +impl<'a> Drop for MutexGuardWrapper<'a> { + fn drop(&mut self) { + self.locks.arc.condvar.notify_one(); + } +} + +impl Locks { + pub fn new() -> Locks { + Locks { + arc: Arc::new(LockData { + mutex: Mutex::new(LockState { parallels: 0 }), + condvar: Condvar::new(), + serial: Default::default(), + }), + } + } + + pub fn serial(&self) -> MutexGuardWrapper { + let mut lock_state = self.arc.mutex.lock().unwrap(); + loop { + // If all the things we want are true, try to lock out serial + if lock_state.parallels == 0 { + let possible_serial_lock = self.arc.serial.try_lock(); + if let Some(serial_lock) = possible_serial_lock { + return MutexGuardWrapper { + mutex_guard: serial_lock, + locks: self.clone(), + }; + } + } + + lock_state = self.arc.condvar.wait(lock_state).unwrap(); + } + } +} From 40a9cc157f747f9d58b56e348584f2bc8a5b325c Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Sat, 14 May 2022 22:58:37 +0100 Subject: [PATCH 02/23] Add basic parallel work --- serial_test/src/code_lock.rs | 121 +++--------------- serial_test/src/lib.rs | 12 +- serial_test/src/parallel_code_lock.rs | 51 ++++++++ serial_test/src/rwlock.rs | 28 ++++ serial_test/src/serial_code_lock.rs | 104 +++++++++++++++ serial_test_derive/src/lib.rs | 67 ++++++++-- .../test_serial_async_before_wrapper.stderr | 2 +- serial_test_test/src/lib.rs | 49 ++++++- 8 files changed, 309 insertions(+), 125 deletions(-) create mode 100644 serial_test/src/parallel_code_lock.rs create mode 100644 serial_test/src/serial_code_lock.rs diff --git a/serial_test/src/code_lock.rs b/serial_test/src/code_lock.rs index b41f741..6a38d35 100644 --- a/serial_test/src/code_lock.rs +++ b/serial_test/src/code_lock.rs @@ -1,5 +1,5 @@ use lazy_static::lazy_static; -use parking_lot::{Mutex, ReentrantMutexGuard, RwLock}; +use parking_lot::{Mutex, RwLock}; use std::cell::RefCell; use std::collections::HashMap; use std::ops::{Deref, DerefMut}; @@ -9,22 +9,30 @@ use std::time::Duration; use crate::rwlock::{Locks, MutexGuardWrapper}; -struct UniqueReentrantMutex { +pub(crate) struct UniqueReentrantMutex { locks: Locks, // Only actually used for tests #[allow(dead_code)] - id: u32, + pub(crate) id: u32, } impl UniqueReentrantMutex { - fn lock(&self) -> MutexGuardWrapper { + pub(crate) fn lock(&self) -> MutexGuardWrapper { self.locks.serial() } + + pub(crate) fn start_parallel(&self) { + self.locks.start_parallel(); + } + + pub(crate) fn end_parallel(&self) { + self.locks.end_parallel(); + } } lazy_static! { - static ref LOCKS: Arc>> = + pub(crate) static ref LOCKS: Arc>> = Arc::new(RwLock::new(HashMap::new())); static ref MAX_WAIT: Arc>> = Arc::new(Mutex::new(RefCell::new(Duration::from_secs(60)))); @@ -51,11 +59,11 @@ pub fn set_max_wait(max_wait: Duration) { MAX_WAIT.lock().replace(max_wait); } -fn wait_duration() -> Duration { +pub(crate) fn wait_duration() -> Duration { *MAX_WAIT.lock().borrow() } -fn check_new_key(name: &str) { +pub(crate) fn check_new_key(name: &str) { // Check if a new key is needed. Just need a read lock, which can be done in sync with everyone else let new_key = { let unlock = LOCKS @@ -72,102 +80,3 @@ fn check_new_key(name: &str) { lock.deref_mut().entry(name.to_string()).or_default(); } } - -#[doc(hidden)] -pub fn local_serial_core_with_return( - name: &str, - function: fn() -> Result<(), E>, -) -> Result<(), E> { - check_new_key(name); - - let unlock = LOCKS.read_recursive(); - // _guard needs to be named to avoid being instant dropped - let _guard = unlock.deref()[name].lock(); - function() -} - -#[doc(hidden)] -pub fn local_serial_core(name: &str, function: fn()) { - check_new_key(name); - - let unlock = LOCKS.read_recursive(); - // _guard needs to be named to avoid being instant dropped - let _guard = unlock.deref()[name].lock(); - function(); -} - -#[doc(hidden)] -pub async fn local_async_serial_core_with_return( - name: &str, - fut: impl std::future::Future>, -) -> Result<(), E> { - check_new_key(name); - - let unlock = LOCKS.read_recursive(); - // _guard needs to be named to avoid being instant dropped - let _guard = unlock.deref()[name].lock(); - fut.await -} - -#[doc(hidden)] -pub async fn local_async_serial_core(name: &str, fut: impl std::future::Future) { - check_new_key(name); - - let unlock = LOCKS.read_recursive(); - // _guard needs to be named to avoid being instant dropped - let _guard = unlock.deref()[name].lock(); - fut.await; -} - -#[cfg(test)] -mod tests { - use super::{check_new_key, wait_duration, LOCKS}; - use itertools::Itertools; - use parking_lot::RwLock; - use std::ops::Deref; - use std::{ - sync::{Arc, Barrier}, - thread, - }; - - #[test] - fn test_hammer_check_new_key() { - let ptrs = Arc::new(RwLock::new(Vec::new())); - let mut threads = Vec::new(); - - let count = 100; - let barrier = Arc::new(Barrier::new(count)); - - for _ in 0..count { - let local_locks = LOCKS.clone(); - let local_ptrs = ptrs.clone(); - let c = barrier.clone(); - threads.push(thread::spawn(move || { - c.wait(); - check_new_key("foo"); - { - let unlock = local_locks - .try_read_recursive_for(wait_duration()) - .expect("read lock didn't work"); - let mutex = unlock.deref().get("foo").unwrap(); - - let mut ptr_guard = local_ptrs - .try_write_for(wait_duration()) - .expect("write lock didn't work"); - ptr_guard.push(mutex.id); - } - - c.wait(); - })); - } - for thread in threads { - thread.join().expect("thread join worked"); - } - let ptrs_read_lock = ptrs - .try_read_recursive_for(wait_duration()) - .expect("ptrs read work"); - assert_eq!(ptrs_read_lock.len(), count); - println!("{:?}", ptrs_read_lock); - assert_eq!(ptrs_read_lock.iter().unique().count(), 1); - } -} diff --git a/serial_test/src/lib.rs b/serial_test/src/lib.rs index 131c8a8..932d1e8 100644 --- a/serial_test/src/lib.rs +++ b/serial_test/src/lib.rs @@ -41,14 +41,21 @@ )] mod code_lock; +mod parallel_code_lock; mod rwlock; +mod serial_code_lock; #[cfg(feature = "file_locks")] mod file_lock; -pub use code_lock::{ +pub use code_lock::set_max_wait; +pub use parallel_code_lock::{ + local_async_parallel_core, local_async_parallel_core_with_return, local_parallel_core, + local_parallel_core_with_return, +}; +pub use serial_code_lock::{ local_async_serial_core, local_async_serial_core_with_return, local_serial_core, - local_serial_core_with_return, set_max_wait, + local_serial_core_with_return, }; #[cfg(feature = "file_locks")] @@ -58,6 +65,7 @@ pub use file_lock::{ }; // Re-export #[serial/file_serial]. +pub use serial_test_derive::parallel; #[allow(unused_imports)] pub use serial_test_derive::serial; diff --git a/serial_test/src/parallel_code_lock.rs b/serial_test/src/parallel_code_lock.rs new file mode 100644 index 0000000..da55f2a --- /dev/null +++ b/serial_test/src/parallel_code_lock.rs @@ -0,0 +1,51 @@ +use std::ops::Deref; + +use crate::code_lock::{check_new_key, LOCKS}; + +#[doc(hidden)] +pub fn local_parallel_core_with_return( + name: &str, + function: fn() -> Result<(), E>, +) -> Result<(), E> { + check_new_key(name); + + let unlock = LOCKS.read_recursive(); + unlock.deref()[name].start_parallel(); + let ret = function(); + unlock.deref()[name].end_parallel(); + return ret; +} + +#[doc(hidden)] +pub fn local_parallel_core(name: &str, function: fn()) { + check_new_key(name); + + let unlock = LOCKS.read_recursive(); + unlock.deref()[name].start_parallel(); + function(); + unlock.deref()[name].end_parallel(); +} + +#[doc(hidden)] +pub async fn local_async_parallel_core_with_return( + name: &str, + fut: impl std::future::Future>, +) -> Result<(), E> { + check_new_key(name); + + let unlock = LOCKS.read_recursive(); + unlock.deref()[name].start_parallel(); + let ret = fut.await; + unlock.deref()[name].end_parallel(); + return ret; +} + +#[doc(hidden)] +pub async fn local_async_parallel_core(name: &str, fut: impl std::future::Future) { + check_new_key(name); + + let unlock = LOCKS.read_recursive(); + unlock.deref()[name].start_parallel(); + fut.await; + unlock.deref()[name].end_parallel(); +} diff --git a/serial_test/src/rwlock.rs b/serial_test/src/rwlock.rs index d42aad5..72d1b91 100644 --- a/serial_test/src/rwlock.rs +++ b/serial_test/src/rwlock.rs @@ -57,4 +57,32 @@ impl Locks { lock_state = self.arc.condvar.wait(lock_state).unwrap(); } } + + pub fn start_parallel(&self) { + let mut lock_state = self.arc.mutex.lock().unwrap(); + loop { + if lock_state.parallels > 0 { + // fast path, as someone else already has it locked + lock_state.parallels += 1; + return; + } + + let possible_serial_lock = self.arc.serial.try_lock(); + if let Some(_) = possible_serial_lock { + // We now know no-one has the serial lock, so we can add to parallel + lock_state.parallels = 1; // Had to have been 0 before, as otherwise we'd have hit the fast path + return; + } + + lock_state = self.arc.condvar.wait(lock_state).unwrap(); + } + } + + pub fn end_parallel(&self) { + let mut lock_state = self.arc.mutex.lock().unwrap(); + assert!(lock_state.parallels > 0); + lock_state.parallels -= 1; + drop(lock_state); + self.arc.condvar.notify_one(); + } } diff --git a/serial_test/src/serial_code_lock.rs b/serial_test/src/serial_code_lock.rs new file mode 100644 index 0000000..53acdbe --- /dev/null +++ b/serial_test/src/serial_code_lock.rs @@ -0,0 +1,104 @@ +use std::ops::Deref; + +use crate::code_lock::{check_new_key, LOCKS}; + +#[doc(hidden)] +pub fn local_serial_core_with_return( + name: &str, + function: fn() -> Result<(), E>, +) -> Result<(), E> { + check_new_key(name); + + let unlock = LOCKS.read_recursive(); + // _guard needs to be named to avoid being instant dropped + let _guard = unlock.deref()[name].lock(); + function() +} + +#[doc(hidden)] +pub fn local_serial_core(name: &str, function: fn()) { + check_new_key(name); + + let unlock = LOCKS.read_recursive(); + // _guard needs to be named to avoid being instant dropped + let _guard = unlock.deref()[name].lock(); + function(); +} + +#[doc(hidden)] +pub async fn local_async_serial_core_with_return( + name: &str, + fut: impl std::future::Future>, +) -> Result<(), E> { + check_new_key(name); + + let unlock = LOCKS.read_recursive(); + // _guard needs to be named to avoid being instant dropped + let _guard = unlock.deref()[name].lock(); + fut.await +} + +#[doc(hidden)] +pub async fn local_async_serial_core(name: &str, fut: impl std::future::Future) { + check_new_key(name); + + let unlock = LOCKS.read_recursive(); + // _guard needs to be named to avoid being instant dropped + let _guard = unlock.deref()[name].lock(); + fut.await; +} + +#[cfg(test)] +mod tests { + use crate::code_lock::wait_duration; + + use super::{check_new_key, LOCKS}; + use itertools::Itertools; + use parking_lot::RwLock; + use std::ops::Deref; + use std::{ + sync::{Arc, Barrier}, + thread, + }; + + #[test] + fn test_hammer_check_new_key() { + let ptrs = Arc::new(RwLock::new(Vec::new())); + let mut threads = Vec::new(); + + let count = 100; + let barrier = Arc::new(Barrier::new(count)); + + for _ in 0..count { + let local_locks = LOCKS.clone(); + let local_ptrs = ptrs.clone(); + let c = barrier.clone(); + threads.push(thread::spawn(move || { + c.wait(); + check_new_key("foo"); + { + let unlock = local_locks + .try_read_recursive_for(wait_duration()) + .expect("read lock didn't work"); + let mutex = unlock.deref().get("foo").unwrap(); + + let mut ptr_guard = local_ptrs + .try_write_for(wait_duration()) + .expect("write lock didn't work"); + ptr_guard.push(mutex.id); + } + + c.wait(); + })); + } + for thread in threads { + thread.join().expect("thread join worked"); + } + let ptrs_read_lock = ptrs + .try_read_recursive_for(wait_duration()) + .expect("ptrs read work"); + assert_eq!(ptrs_read_lock.len(), count); + println!("{:?}", ptrs_read_lock); + assert_eq!(ptrs_read_lock.iter().unique().count(), 1); + } +} diff --git a/serial_test_derive/src/lib.rs b/serial_test_derive/src/lib.rs index 3e827cb..6ce06a9 100644 --- a/serial_test_derive/src/lib.rs +++ b/serial_test_derive/src/lib.rs @@ -64,6 +64,12 @@ pub fn serial(attr: TokenStream, input: TokenStream) -> TokenStream { local_serial_core(attr.into(), input.into()).into() } +#[proc_macro_attribute] +#[proc_macro_error] +pub fn parallel(attr: TokenStream, input: TokenStream) -> TokenStream { + local_parallel_core(attr.into(), input.into()).into() +} + /// Allows for the creation of file-serialised Rust tests /// ```` /// #[test] @@ -150,12 +156,9 @@ fn get_raw_args(attr: proc_macro2::TokenStream) -> Vec { raw_args } -fn local_serial_core( - attr: proc_macro2::TokenStream, - input: proc_macro2::TokenStream, -) -> proc_macro2::TokenStream { +fn get_core_key(attr: proc_macro2::TokenStream) -> String { let mut raw_args = get_raw_args(attr); - let key = match raw_args.len() { + match raw_args.len() { 0 => "".to_string(), 1 => raw_args.pop().unwrap(), n => { @@ -164,10 +167,25 @@ fn local_serial_core( n, raw_args ); } - }; + } +} + +fn local_serial_core( + attr: proc_macro2::TokenStream, + input: proc_macro2::TokenStream, +) -> proc_macro2::TokenStream { + let key = get_core_key(attr); serial_setup(input, vec![Box::new(key)], "local") } +fn local_parallel_core( + attr: proc_macro2::TokenStream, + input: proc_macro2::TokenStream, +) -> proc_macro2::TokenStream { + let key = get_core_key(attr); + parallel_setup(input, vec![Box::new(key)], "local") +} + fn fs_serial_core( attr: proc_macro2::TokenStream, input: proc_macro2::TokenStream, @@ -197,10 +215,11 @@ fn fs_serial_core( serial_setup(input, args, "fs") } -fn serial_setup( +fn core_setup( input: proc_macro2::TokenStream, args: Vec>, prefix: &str, + kind: &str, ) -> proc_macro2::TokenStream where T: quote::ToTokens + ?Sized, @@ -225,7 +244,9 @@ where { // We assume that any 2-part attribute with the second part as "test" on an async function // is the "do this test with reactor" wrapper. This is true for actix, tokio and async_std. - abort_call_site!("Found async test attribute after serial, which will break"); + abort_call_site!( + "Found async test attribute after serial/parallel, which will break" + ); } // we skip ignore/should_panic because the test framework already deals with it @@ -238,7 +259,7 @@ where if let Some(ret) = return_type { match asyncness { Some(_) => { - let fnname = format_ident!("{}_async_serial_core_with_return", prefix); + let fnname = format_ident!("{}_async_{}_core_with_return", prefix, kind); quote! { #(#attrs) * @@ -248,7 +269,7 @@ where } } None => { - let fnname = format_ident!("{}_serial_core_with_return", prefix); + let fnname = format_ident!("{}_{}_core_with_return", prefix, kind); quote! { #(#attrs) * @@ -261,7 +282,7 @@ where } else { match asyncness { Some(_) => { - let fnname = format_ident!("{}_async_serial_core", prefix); + let fnname = format_ident!("{}_async_{}_core", prefix, kind); quote! { #(#attrs) * @@ -271,7 +292,7 @@ where } } None => { - let fnname = format_ident!("{}_serial_core", prefix); + let fnname = format_ident!("{}_{}_core", prefix, kind); quote! { #(#attrs) * @@ -284,6 +305,28 @@ where } } +fn serial_setup( + input: proc_macro2::TokenStream, + args: Vec>, + prefix: &str, +) -> proc_macro2::TokenStream +where + T: quote::ToTokens + ?Sized, +{ + core_setup(input, args, prefix, "serial") +} + +fn parallel_setup( + input: proc_macro2::TokenStream, + args: Vec>, + prefix: &str, +) -> proc_macro2::TokenStream +where + T: quote::ToTokens + ?Sized, +{ + core_setup(input, args, prefix, "parallel") +} + #[cfg(test)] mod tests { use proc_macro2::{Literal, Punct, Spacing}; diff --git a/serial_test_derive/tests/broken/test_serial_async_before_wrapper.stderr b/serial_test_derive/tests/broken/test_serial_async_before_wrapper.stderr index f37e867..35703d1 100644 --- a/serial_test_derive/tests/broken/test_serial_async_before_wrapper.stderr +++ b/serial_test_derive/tests/broken/test_serial_async_before_wrapper.stderr @@ -1,4 +1,4 @@ -error: Found async test attribute after serial, which will break +error: Found async test attribute after serial/parallel, which will break --> tests/broken/test_serial_async_before_wrapper.rs:3:1 | 3 | #[serial] diff --git a/serial_test_test/src/lib.rs b/serial_test_test/src/lib.rs index b63e9a4..0d079f7 100644 --- a/serial_test_test/src/lib.rs +++ b/serial_test_test/src/lib.rs @@ -40,14 +40,16 @@ use lazy_static::lazy_static; use std::convert::TryInto; use std::env; use std::fs; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; -use std::sync::Arc; +use std::sync::Barrier; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; lazy_static! { static ref LOCK: Arc = Arc::new(AtomicUsize::new(0)); + static ref THREAD_ORDERINGS: Arc>> = Arc::new(Mutex::new(Vec::new())); + static ref PARALLEL_BARRIER: Arc = Arc::new(Barrier::new(3)); } fn init() { @@ -80,8 +82,10 @@ pub fn fs_test_fn(count: usize) { #[cfg(test)] mod tests { + use crate::{THREAD_ORDERINGS, PARALLEL_BARRIER}; + use super::{init, test_fn}; - use serial_test::serial; + use serial_test::{serial, parallel}; #[cfg(feature = "file_locks")] use super::fs_test_fn; @@ -192,4 +196,41 @@ mod tests { fn test_with_key() { init(); } + + #[test] + #[parallel(ordering_key)] + fn serial_with_parallel_key_1() { + let count = THREAD_ORDERINGS.lock().unwrap().len(); + // Can't guarantee before or after the parallels + assert!(count == 0 || count == 3, "count = {}", count); + } + + #[test] + #[parallel(ordering_key)] + fn parallel_with_key_1() { + PARALLEL_BARRIER.wait(); + THREAD_ORDERINGS.lock().unwrap().push(false); + } + + #[test] + #[parallel(ordering_key)] + fn parallel_with_key_2() { + PARALLEL_BARRIER.wait(); + THREAD_ORDERINGS.lock().unwrap().push(false); + } + + #[test] + #[parallel(ordering_key)] + fn parallel_with_key_3() { + PARALLEL_BARRIER.wait(); + THREAD_ORDERINGS.lock().unwrap().push(false); + } + + #[test] + #[parallel(ordering_key)] + fn serial_with_parallel_key_2() { + let count = THREAD_ORDERINGS.lock().unwrap().len(); + // Can't guarantee before or after the parallels + assert!(count == 0 || count == 3, "count = {}", count); + } } From 0ec71cc0d6fc3812454078b18f28923309bb0cff Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Sat, 14 May 2022 22:59:47 +0100 Subject: [PATCH 03/23] Fix clippy issues --- serial_test/src/parallel_code_lock.rs | 4 ++-- serial_test/src/rwlock.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/serial_test/src/parallel_code_lock.rs b/serial_test/src/parallel_code_lock.rs index da55f2a..3d63fea 100644 --- a/serial_test/src/parallel_code_lock.rs +++ b/serial_test/src/parallel_code_lock.rs @@ -13,7 +13,7 @@ pub fn local_parallel_core_with_return( unlock.deref()[name].start_parallel(); let ret = function(); unlock.deref()[name].end_parallel(); - return ret; + ret } #[doc(hidden)] @@ -37,7 +37,7 @@ pub async fn local_async_parallel_core_with_return( unlock.deref()[name].start_parallel(); let ret = fut.await; unlock.deref()[name].end_parallel(); - return ret; + ret } #[doc(hidden)] diff --git a/serial_test/src/rwlock.rs b/serial_test/src/rwlock.rs index 72d1b91..1f9b0ee 100644 --- a/serial_test/src/rwlock.rs +++ b/serial_test/src/rwlock.rs @@ -68,8 +68,8 @@ impl Locks { } let possible_serial_lock = self.arc.serial.try_lock(); - if let Some(_) = possible_serial_lock { - // We now know no-one has the serial lock, so we can add to parallel + if possible_serial_lock.is_some() { + // We now know no-one else has the serial lock, so we can add to parallel lock_state.parallels = 1; // Had to have been 0 before, as otherwise we'd have hit the fast path return; } From 12847f0a18e56874ab11ac96ad490ef78ba0045e Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Sat, 14 May 2022 23:00:04 +0100 Subject: [PATCH 04/23] Reformat --- serial_test_test/src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/serial_test_test/src/lib.rs b/serial_test_test/src/lib.rs index 0d079f7..4e089bd 100644 --- a/serial_test_test/src/lib.rs +++ b/serial_test_test/src/lib.rs @@ -40,8 +40,8 @@ use lazy_static::lazy_static; use std::convert::TryInto; use std::env; use std::fs; -use std::sync::Barrier; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Barrier; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; @@ -82,10 +82,10 @@ pub fn fs_test_fn(count: usize) { #[cfg(test)] mod tests { - use crate::{THREAD_ORDERINGS, PARALLEL_BARRIER}; + use crate::{PARALLEL_BARRIER, THREAD_ORDERINGS}; use super::{init, test_fn}; - use serial_test::{serial, parallel}; + use serial_test::{parallel, serial}; #[cfg(feature = "file_locks")] use super::fs_test_fn; @@ -203,7 +203,7 @@ mod tests { let count = THREAD_ORDERINGS.lock().unwrap().len(); // Can't guarantee before or after the parallels assert!(count == 0 || count == 3, "count = {}", count); - } + } #[test] #[parallel(ordering_key)] @@ -217,7 +217,7 @@ mod tests { fn parallel_with_key_2() { PARALLEL_BARRIER.wait(); THREAD_ORDERINGS.lock().unwrap().push(false); - } + } #[test] #[parallel(ordering_key)] From 3d6da725773eca2f904d60838aeef1fc11233b66 Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Sun, 15 May 2022 16:40:00 +0100 Subject: [PATCH 05/23] Add timeout hack to parallel --- serial_test/src/rwlock.rs | 8 ++++++-- serial_test_test/src/lib.rs | 4 +--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/serial_test/src/rwlock.rs b/serial_test/src/rwlock.rs index 1f9b0ee..88a5c1e 100644 --- a/serial_test/src/rwlock.rs +++ b/serial_test/src/rwlock.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, Condvar, Mutex}; +use std::{sync::{Arc, Condvar, Mutex, WaitTimeoutResult}, time::Duration}; use parking_lot::{ReentrantMutex, ReentrantMutexGuard}; @@ -74,7 +74,11 @@ impl Locks { return; } - lock_state = self.arc.condvar.wait(lock_state).unwrap(); + // FIXME: remove timeout, as it's a hack to debug some things + let duration = Duration::from_secs(1); + let res: WaitTimeoutResult; + (lock_state, res) = self.arc.condvar.wait_timeout(lock_state, duration).unwrap(); + assert!(!res.timed_out(), "timeout!"); } } diff --git a/serial_test_test/src/lib.rs b/serial_test_test/src/lib.rs index 4e089bd..729476d 100644 --- a/serial_test_test/src/lib.rs +++ b/serial_test_test/src/lib.rs @@ -82,9 +82,7 @@ pub fn fs_test_fn(count: usize) { #[cfg(test)] mod tests { - use crate::{PARALLEL_BARRIER, THREAD_ORDERINGS}; - - use super::{init, test_fn}; + use super::{init, test_fn, PARALLEL_BARRIER, THREAD_ORDERINGS}; use serial_test::{parallel, serial}; #[cfg(feature = "file_locks")] From c92f4a610f0dab2491bdc6b452a4dc88304bbb62 Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Sun, 15 May 2022 16:42:35 +0100 Subject: [PATCH 06/23] Fix import format --- serial_test/src/rwlock.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/serial_test/src/rwlock.rs b/serial_test/src/rwlock.rs index 88a5c1e..1bdef39 100644 --- a/serial_test/src/rwlock.rs +++ b/serial_test/src/rwlock.rs @@ -1,4 +1,7 @@ -use std::{sync::{Arc, Condvar, Mutex, WaitTimeoutResult}, time::Duration}; +use std::{ + sync::{Arc, Condvar, Mutex, WaitTimeoutResult}, + time::Duration, +}; use parking_lot::{ReentrantMutex, ReentrantMutexGuard}; From adf1d5f7a7e9b0663d0ece68775f28c0cc73d75a Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Sun, 15 May 2022 16:49:02 +0100 Subject: [PATCH 07/23] Reformat and merge lots of imports --- serial_test/src/code_lock.rs | 16 ++++++++-------- serial_test/src/parallel_code_lock.rs | 3 +-- serial_test/src/rwlock.rs | 3 +-- serial_test/src/serial_code_lock.rs | 9 +++------ serial_test_derive/src/lib.rs | 6 +++--- 5 files changed, 16 insertions(+), 21 deletions(-) diff --git a/serial_test/src/code_lock.rs b/serial_test/src/code_lock.rs index 6a38d35..a80ff20 100644 --- a/serial_test/src/code_lock.rs +++ b/serial_test/src/code_lock.rs @@ -1,13 +1,13 @@ +use crate::rwlock::{Locks, MutexGuardWrapper}; use lazy_static::lazy_static; use parking_lot::{Mutex, RwLock}; -use std::cell::RefCell; -use std::collections::HashMap; -use std::ops::{Deref, DerefMut}; -use std::sync::atomic::AtomicU32; -use std::sync::Arc; -use std::time::Duration; - -use crate::rwlock::{Locks, MutexGuardWrapper}; +use std::{ + cell::RefCell, + collections::HashMap, + ops::{Deref, DerefMut}, + sync::{atomic::AtomicU32, Arc}, + time::Duration, +}; pub(crate) struct UniqueReentrantMutex { locks: Locks, diff --git a/serial_test/src/parallel_code_lock.rs b/serial_test/src/parallel_code_lock.rs index 3d63fea..23354f7 100644 --- a/serial_test/src/parallel_code_lock.rs +++ b/serial_test/src/parallel_code_lock.rs @@ -1,6 +1,5 @@ -use std::ops::Deref; - use crate::code_lock::{check_new_key, LOCKS}; +use std::ops::Deref; #[doc(hidden)] pub fn local_parallel_core_with_return( diff --git a/serial_test/src/rwlock.rs b/serial_test/src/rwlock.rs index 1bdef39..1cfad67 100644 --- a/serial_test/src/rwlock.rs +++ b/serial_test/src/rwlock.rs @@ -1,10 +1,9 @@ +use parking_lot::{ReentrantMutex, ReentrantMutexGuard}; use std::{ sync::{Arc, Condvar, Mutex, WaitTimeoutResult}, time::Duration, }; -use parking_lot::{ReentrantMutex, ReentrantMutexGuard}; - struct LockState { parallels: u32, } diff --git a/serial_test/src/serial_code_lock.rs b/serial_test/src/serial_code_lock.rs index 53acdbe..6d41c48 100644 --- a/serial_test/src/serial_code_lock.rs +++ b/serial_test/src/serial_code_lock.rs @@ -1,6 +1,5 @@ -use std::ops::Deref; - use crate::code_lock::{check_new_key, LOCKS}; +use std::ops::Deref; #[doc(hidden)] pub fn local_serial_core_with_return( @@ -50,13 +49,11 @@ pub async fn local_async_serial_core(name: &str, fut: impl std::future::Future Date: Sun, 15 May 2022 16:51:58 +0100 Subject: [PATCH 08/23] 1.51.0 doesn't support destructuring assignments --- serial_test/src/rwlock.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/serial_test/src/rwlock.rs b/serial_test/src/rwlock.rs index 1cfad67..ff52448 100644 --- a/serial_test/src/rwlock.rs +++ b/serial_test/src/rwlock.rs @@ -1,6 +1,6 @@ use parking_lot::{ReentrantMutex, ReentrantMutexGuard}; use std::{ - sync::{Arc, Condvar, Mutex, WaitTimeoutResult}, + sync::{Arc, Condvar, Mutex}, time::Duration, }; @@ -78,9 +78,9 @@ impl Locks { // FIXME: remove timeout, as it's a hack to debug some things let duration = Duration::from_secs(1); - let res: WaitTimeoutResult; - (lock_state, res) = self.arc.condvar.wait_timeout(lock_state, duration).unwrap(); - assert!(!res.timed_out(), "timeout!"); + let results = self.arc.condvar.wait_timeout(lock_state, duration).unwrap(); + assert!(!results.1.timed_out(), "timeout!"); + lock_state = results.0; } } From df386ca41577069bd17efecd8187979b7f6970c4 Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Sun, 15 May 2022 18:13:03 +0100 Subject: [PATCH 09/23] Actually test serial+parallel --- serial_test_test/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/serial_test_test/src/lib.rs b/serial_test_test/src/lib.rs index 93f062b..070011e 100644 --- a/serial_test_test/src/lib.rs +++ b/serial_test_test/src/lib.rs @@ -198,7 +198,7 @@ mod tests { } #[test] - #[parallel(ordering_key)] + #[serial(ordering_key)] fn serial_with_parallel_key_1() { let count = THREAD_ORDERINGS.lock().unwrap().len(); // Can't guarantee before or after the parallels @@ -227,7 +227,7 @@ mod tests { } #[test] - #[parallel(ordering_key)] + #[serial(ordering_key)] fn serial_with_parallel_key_2() { let count = THREAD_ORDERINGS.lock().unwrap().len(); // Can't guarantee before or after the parallels From bfe787526305fb7b0ad19ddb30382b69e30bb505 Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Sun, 15 May 2022 18:19:12 +0100 Subject: [PATCH 10/23] Use parking_lot mutex throughout --- Cargo.lock | 1 + serial_test/src/rwlock.rs | 20 ++++++++------------ serial_test_test/Cargo.toml | 1 + serial_test_test/src/lib.rs | 13 +++++++------ 4 files changed, 17 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4d5f5b0..c30035c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -416,6 +416,7 @@ dependencies = [ "env_logger", "futures-util", "lazy_static", + "parking_lot", "serial_test", "tokio", ] diff --git a/serial_test/src/rwlock.rs b/serial_test/src/rwlock.rs index ff52448..c8d93b5 100644 --- a/serial_test/src/rwlock.rs +++ b/serial_test/src/rwlock.rs @@ -1,8 +1,5 @@ -use parking_lot::{ReentrantMutex, ReentrantMutexGuard}; -use std::{ - sync::{Arc, Condvar, Mutex}, - time::Duration, -}; +use parking_lot::{Condvar, Mutex, ReentrantMutex, ReentrantMutexGuard}; +use std::{sync::Arc, time::Duration}; struct LockState { parallels: u32, @@ -43,7 +40,7 @@ impl Locks { } pub fn serial(&self) -> MutexGuardWrapper { - let mut lock_state = self.arc.mutex.lock().unwrap(); + let mut lock_state = self.arc.mutex.lock(); loop { // If all the things we want are true, try to lock out serial if lock_state.parallels == 0 { @@ -56,12 +53,12 @@ impl Locks { } } - lock_state = self.arc.condvar.wait(lock_state).unwrap(); + self.arc.condvar.wait(&mut lock_state); } } pub fn start_parallel(&self) { - let mut lock_state = self.arc.mutex.lock().unwrap(); + let mut lock_state = self.arc.mutex.lock(); loop { if lock_state.parallels > 0 { // fast path, as someone else already has it locked @@ -78,14 +75,13 @@ impl Locks { // FIXME: remove timeout, as it's a hack to debug some things let duration = Duration::from_secs(1); - let results = self.arc.condvar.wait_timeout(lock_state, duration).unwrap(); - assert!(!results.1.timed_out(), "timeout!"); - lock_state = results.0; + let timeout_result = self.arc.condvar.wait_for(&mut lock_state, duration); + assert!(!timeout_result.timed_out(), "timeout!"); } } pub fn end_parallel(&self) { - let mut lock_state = self.arc.mutex.lock().unwrap(); + let mut lock_state = self.arc.mutex.lock(); assert!(lock_state.parallels > 0); lock_state.parallels -= 1; drop(lock_state); diff --git a/serial_test_test/Cargo.toml b/serial_test_test/Cargo.toml index f9c3b92..3f5c5bf 100644 --- a/serial_test_test/Cargo.toml +++ b/serial_test_test/Cargo.toml @@ -10,6 +10,7 @@ edition = "2018" serial_test = { path="../serial_test" } lazy_static = "^1.2" env_logger = "^0.9" +parking_lot = "^0.12" [dev-dependencies] tokio = { version = "^1.17", features = ["macros", "rt"] } diff --git a/serial_test_test/src/lib.rs b/serial_test_test/src/lib.rs index 070011e..3e35fe6 100644 --- a/serial_test_test/src/lib.rs +++ b/serial_test_test/src/lib.rs @@ -37,12 +37,13 @@ //! ``` use lazy_static::lazy_static; +use parking_lot::Mutex; use std::{ convert::TryInto, env, fs, sync::{ atomic::{AtomicUsize, Ordering}, - Arc, Barrier, Mutex, + Arc, Barrier, }, thread, time::Duration, @@ -200,7 +201,7 @@ mod tests { #[test] #[serial(ordering_key)] fn serial_with_parallel_key_1() { - let count = THREAD_ORDERINGS.lock().unwrap().len(); + let count = THREAD_ORDERINGS.lock().len(); // Can't guarantee before or after the parallels assert!(count == 0 || count == 3, "count = {}", count); } @@ -209,27 +210,27 @@ mod tests { #[parallel(ordering_key)] fn parallel_with_key_1() { PARALLEL_BARRIER.wait(); - THREAD_ORDERINGS.lock().unwrap().push(false); + THREAD_ORDERINGS.lock().push(false); } #[test] #[parallel(ordering_key)] fn parallel_with_key_2() { PARALLEL_BARRIER.wait(); - THREAD_ORDERINGS.lock().unwrap().push(false); + THREAD_ORDERINGS.lock().push(false); } #[test] #[parallel(ordering_key)] fn parallel_with_key_3() { PARALLEL_BARRIER.wait(); - THREAD_ORDERINGS.lock().unwrap().push(false); + THREAD_ORDERINGS.lock().push(false); } #[test] #[serial(ordering_key)] fn serial_with_parallel_key_2() { - let count = THREAD_ORDERINGS.lock().unwrap().len(); + let count = THREAD_ORDERINGS.lock().len(); // Can't guarantee before or after the parallels assert!(count == 0 || count == 3, "count = {}", count); } From 2ae7516ce694f1e41d7c9c44fb21a0e5945ab45a Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Sun, 15 May 2022 18:22:08 +0100 Subject: [PATCH 11/23] Don't capture stdout on tests --- .github/workflows/ci.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 22f0e51..a80a247 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -52,13 +52,13 @@ jobs: uses: actions-rs/cargo@v1.0.3 with: command: test - args: --features ${{ matrix.features }} + args: --features ${{ matrix.features }} -- --nocapture if: ${{ matrix.features != 'all' }} - name: Build and test all features uses: actions-rs/cargo@v1.0.3 with: command: test - args: --all-features + args: --all-features -- --nocapture if: ${{ matrix.features == 'all' }} multi-os-testing: @@ -81,7 +81,7 @@ jobs: uses: actions-rs/cargo@v1.0.3 with: command: test - args: --all-features + args: --all-features -- --nocapture minimal-versions: name: minimal versions check From 082b0fa8950861f91c6a701ceb2ff9208adcd3cd Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Sun, 15 May 2022 18:24:10 +0100 Subject: [PATCH 12/23] Fallover start_parallel after 10 tries --- serial_test/src/rwlock.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/serial_test/src/rwlock.rs b/serial_test/src/rwlock.rs index c8d93b5..04d27ef 100644 --- a/serial_test/src/rwlock.rs +++ b/serial_test/src/rwlock.rs @@ -59,6 +59,7 @@ impl Locks { pub fn start_parallel(&self) { let mut lock_state = self.arc.mutex.lock(); + let mut resets: u8 = 0; loop { if lock_state.parallels > 0 { // fast path, as someone else already has it locked @@ -77,6 +78,10 @@ impl Locks { let duration = Duration::from_secs(1); let timeout_result = self.arc.condvar.wait_for(&mut lock_state, duration); assert!(!timeout_result.timed_out(), "timeout!"); + resets += 1; + if resets == 10 { + panic!("Tried loop 10 times!"); + } } } From da32f5b5c76f3c7ebf9cbdf0b46350fceeb166cf Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Sun, 15 May 2022 21:44:59 +0100 Subject: [PATCH 13/23] Extra debug for parallel test --- serial_test_test/src/lib.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/serial_test_test/src/lib.rs b/serial_test_test/src/lib.rs index 3e35fe6..1064318 100644 --- a/serial_test_test/src/lib.rs +++ b/serial_test_test/src/lib.rs @@ -52,7 +52,7 @@ use std::{ lazy_static! { static ref LOCK: Arc = Arc::new(AtomicUsize::new(0)); static ref THREAD_ORDERINGS: Arc>> = Arc::new(Mutex::new(Vec::new())); - static ref PARALLEL_BARRIER: Arc = Arc::new(Barrier::new(3)); + static ref PARALLEL_BARRIER: Barrier = Barrier::new(3); } fn init() { @@ -87,6 +87,7 @@ pub fn fs_test_fn(count: usize) { mod tests { use super::{init, test_fn, PARALLEL_BARRIER, THREAD_ORDERINGS}; use serial_test::{parallel, serial}; + use std::{thread, time::Duration}; #[cfg(feature = "file_locks")] use super::fs_test_fn; @@ -209,21 +210,30 @@ mod tests { #[test] #[parallel(ordering_key)] fn parallel_with_key_1() { + thread::sleep(Duration::from_secs(1)); + println!("Waiting barrier 1"); PARALLEL_BARRIER.wait(); + println!("Waiting lock 1"); THREAD_ORDERINGS.lock().push(false); } #[test] #[parallel(ordering_key)] fn parallel_with_key_2() { + thread::sleep(Duration::from_secs(2)); + println!("Waiting barrier 2"); PARALLEL_BARRIER.wait(); + println!("Waiting lock 2"); THREAD_ORDERINGS.lock().push(false); } #[test] #[parallel(ordering_key)] fn parallel_with_key_3() { + thread::sleep(Duration::from_secs(3)); + println!("Waiting barrier 3"); PARALLEL_BARRIER.wait(); + println!("Waiting lock 3"); THREAD_ORDERINGS.lock().push(false); } From a52306a2565e46c2cf1ef2128284efc154c22b7c Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Sun, 15 May 2022 21:57:14 +0100 Subject: [PATCH 14/23] Increase default concurrency in CI --- .github/workflows/ci.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a80a247..18b978c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -53,6 +53,8 @@ jobs: with: command: test args: --features ${{ matrix.features }} -- --nocapture + env: + RUST_TEST_THREADS: 3 # So the parallel tests have enough threads if: ${{ matrix.features != 'all' }} - name: Build and test all features uses: actions-rs/cargo@v1.0.3 @@ -60,6 +62,8 @@ jobs: command: test args: --all-features -- --nocapture if: ${{ matrix.features == 'all' }} + env: + RUST_TEST_THREADS: 3 # So the parallel tests have enough threads multi-os-testing: name: Test suite @@ -82,6 +86,8 @@ jobs: with: command: test args: --all-features -- --nocapture + env: + RUST_TEST_THREADS: 3 # So the parallel tests have enough threads minimal-versions: name: minimal versions check From 856c592f304b5d1b0f5f12a7158716c02eee5619 Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Sat, 28 May 2022 16:07:03 +0100 Subject: [PATCH 15/23] Redo check_new_key with more robust multi-writer checks --- serial_test/src/code_lock.rs | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/serial_test/src/code_lock.rs b/serial_test/src/code_lock.rs index a80ff20..1057748 100644 --- a/serial_test/src/code_lock.rs +++ b/serial_test/src/code_lock.rs @@ -64,19 +64,25 @@ pub(crate) fn wait_duration() -> Duration { } pub(crate) fn check_new_key(name: &str) { - // Check if a new key is needed. Just need a read lock, which can be done in sync with everyone else - let new_key = { + loop { + // Check if a new key is needed. Just need a read lock, which can be done in sync with everyone else let unlock = LOCKS .try_read_recursive_for(wait_duration()) .expect("read lock didn't work"); - !unlock.deref().contains_key(name) - }; - if new_key { + if unlock.deref().contains_key(name) { + break; + } + drop(unlock); // so that we don't hold the read lock and so the writer can maybe succeed + // This is the rare path, which avoids the multi-writer situation mostly - let mut lock = LOCKS - .try_write_for(wait_duration()) - .expect("write lock didn't work"); + let try_lock = LOCKS.try_write(); + + if let Some(mut lock) = try_lock { + lock.deref_mut().entry(name.to_string()).or_default(); + break; + } - lock.deref_mut().entry(name.to_string()).or_default(); + // If the try_lock fails, then go around the loop again + // Odds are another test was also locking on the write and has now written the key } } From 7d0a63776456eff6dda362bd9304dfd69005fbf9 Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Sat, 28 May 2022 16:19:27 +0100 Subject: [PATCH 16/23] Permit await_holding_lock --- serial_test/src/parallel_code_lock.rs | 2 ++ serial_test/src/serial_code_lock.rs | 2 ++ 2 files changed, 4 insertions(+) diff --git a/serial_test/src/parallel_code_lock.rs b/serial_test/src/parallel_code_lock.rs index 23354f7..e521389 100644 --- a/serial_test/src/parallel_code_lock.rs +++ b/serial_test/src/parallel_code_lock.rs @@ -1,3 +1,5 @@ +#![allow(clippy::await_holding_lock)] + use crate::code_lock::{check_new_key, LOCKS}; use std::ops::Deref; diff --git a/serial_test/src/serial_code_lock.rs b/serial_test/src/serial_code_lock.rs index 6d41c48..5bd689d 100644 --- a/serial_test/src/serial_code_lock.rs +++ b/serial_test/src/serial_code_lock.rs @@ -1,3 +1,5 @@ +#![allow(clippy::await_holding_lock)] + use crate::code_lock::{check_new_key, LOCKS}; use std::ops::Deref; From 96457b8b30523ffb352b7413990f8c0390d8f924 Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Sat, 28 May 2022 16:35:00 +0100 Subject: [PATCH 17/23] Slightly better test logging --- serial_test_test/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/serial_test_test/src/lib.rs b/serial_test_test/src/lib.rs index 1064318..2dfb29b 100644 --- a/serial_test_test/src/lib.rs +++ b/serial_test_test/src/lib.rs @@ -61,21 +61,21 @@ fn init() { pub fn test_fn(count: usize) { init(); - println!("Start {}", count); + println!("(non-fs) Start {}", count); LOCK.store(count, Ordering::Relaxed); thread::sleep(Duration::from_millis(1000 * (count as u64))); - println!("End {}", count); + println!("(non-fs) End {}", count); assert_eq!(LOCK.load(Ordering::Relaxed), count); } pub fn fs_test_fn(count: usize) { init(); - println!("Start {}", count); + println!("(fs) Start {}", count); let mut pathbuf = env::temp_dir(); pathbuf.push("serial-test-test"); fs::write(pathbuf.as_path(), count.to_ne_bytes()).unwrap(); thread::sleep(Duration::from_millis(1000 * (count as u64))); - println!("End {}", count); + println!("(fs) End {}", count); let loaded = fs::read(pathbuf.as_path()) .map(|bytes| usize::from_ne_bytes(bytes.as_slice().try_into().unwrap())) From 36ea2dada077fbf55e6f64dd801700a5f49bc0b6 Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Sat, 28 May 2022 21:24:59 +0100 Subject: [PATCH 18/23] check_new_key now waits a short period for both locks --- Cargo.lock | 1 + serial_test/Cargo.toml | 1 + serial_test/src/code_lock.rs | 29 ++++++++++++++++++++--------- serial_test_test/src/lib.rs | 2 +- 4 files changed, 23 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c30035c..7717d1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -391,6 +391,7 @@ dependencies = [ "fslock", "itertools", "lazy_static", + "log", "parking_lot", "serial_test_derive", ] diff --git a/serial_test/Cargo.toml b/serial_test/Cargo.toml index e29b18d..db25746 100644 --- a/serial_test/Cargo.toml +++ b/serial_test/Cargo.toml @@ -16,6 +16,7 @@ parking_lot = "^0.12" serial_test_derive = { version = "~0.7.0", path = "../serial_test_derive" } fslock = {version = "0.2", optional = true} document-features = {version = "0.2", optional=true} +log = "0.4" [dev-dependencies] itertools = "0.10" diff --git a/serial_test/src/code_lock.rs b/serial_test/src/code_lock.rs index 1057748..274d875 100644 --- a/serial_test/src/code_lock.rs +++ b/serial_test/src/code_lock.rs @@ -1,12 +1,13 @@ use crate::rwlock::{Locks, MutexGuardWrapper}; use lazy_static::lazy_static; +use log::debug; use parking_lot::{Mutex, RwLock}; use std::{ cell::RefCell, collections::HashMap, ops::{Deref, DerefMut}, sync::{atomic::AtomicU32, Arc}, - time::Duration, + time::{Duration, Instant}, }; pub(crate) struct UniqueReentrantMutex { @@ -64,25 +65,35 @@ pub(crate) fn wait_duration() -> Duration { } pub(crate) fn check_new_key(name: &str) { + let start = Instant::now(); loop { + let duration = Instant::now() - start; + debug!("Waiting for '{}' {:?}", name, duration); // Check if a new key is needed. Just need a read lock, which can be done in sync with everyone else - let unlock = LOCKS - .try_read_recursive_for(wait_duration()) - .expect("read lock didn't work"); - if unlock.deref().contains_key(name) { - break; + let try_unlock = LOCKS.try_read_recursive_for(Duration::from_secs(1)); + if let Some(unlock) = try_unlock { + if unlock.deref().contains_key(name) { + return; + } + drop(unlock); // so that we don't hold the read lock and so the writer can maybe succeed + } else { + continue; // wasn't able to get read lock } - drop(unlock); // so that we don't hold the read lock and so the writer can maybe succeed // This is the rare path, which avoids the multi-writer situation mostly - let try_lock = LOCKS.try_write(); + let try_lock = LOCKS.try_write_for(Duration::from_secs(1)); if let Some(mut lock) = try_lock { lock.deref_mut().entry(name.to_string()).or_default(); - break; + return; } // If the try_lock fails, then go around the loop again // Odds are another test was also locking on the write and has now written the key + + let duration = Instant::now() - start; + if duration >= wait_duration() { + panic!("check_new_key timed out!"); + } } } diff --git a/serial_test_test/src/lib.rs b/serial_test_test/src/lib.rs index 2dfb29b..f9a1d2e 100644 --- a/serial_test_test/src/lib.rs +++ b/serial_test_test/src/lib.rs @@ -56,7 +56,7 @@ lazy_static! { } fn init() { - let _ = env_logger::builder().is_test(true).try_init(); + let _ = env_logger::builder().is_test(false).try_init(); } pub fn test_fn(count: usize) { From cda69935c3c8b820f71f9bb2ac87190b6713cbe6 Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Sat, 28 May 2022 21:33:11 +0100 Subject: [PATCH 19/23] Add debug logging --- .github/workflows/ci.yml | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 18b978c..e31701d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -55,6 +55,7 @@ jobs: args: --features ${{ matrix.features }} -- --nocapture env: RUST_TEST_THREADS: 3 # So the parallel tests have enough threads + RUST_LOG: debug if: ${{ matrix.features != 'all' }} - name: Build and test all features uses: actions-rs/cargo@v1.0.3 @@ -63,7 +64,8 @@ jobs: args: --all-features -- --nocapture if: ${{ matrix.features == 'all' }} env: - RUST_TEST_THREADS: 3 # So the parallel tests have enough threads + RUST_TEST_THREADS: 3 # So the parallel tests have enough threads + RUST_LOG: debug multi-os-testing: name: Test suite @@ -87,7 +89,8 @@ jobs: command: test args: --all-features -- --nocapture env: - RUST_TEST_THREADS: 3 # So the parallel tests have enough threads + RUST_TEST_THREADS: 3 # So the parallel tests have enough threads + RUST_LOG: debug minimal-versions: name: minimal versions check From c6a6aca4ae87a2535431b1bc83383ea050c74427 Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Sat, 28 May 2022 21:48:34 +0100 Subject: [PATCH 20/23] Add condvar timeout hack --- serial_test/src/rwlock.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/serial_test/src/rwlock.rs b/serial_test/src/rwlock.rs index 04d27ef..e56c994 100644 --- a/serial_test/src/rwlock.rs +++ b/serial_test/src/rwlock.rs @@ -41,6 +41,7 @@ impl Locks { pub fn serial(&self) -> MutexGuardWrapper { let mut lock_state = self.arc.mutex.lock(); + let mut resets: u8 = 0; loop { // If all the things we want are true, try to lock out serial if lock_state.parallels == 0 { @@ -53,7 +54,14 @@ impl Locks { } } - self.arc.condvar.wait(&mut lock_state); + // FIXME: hack + let duration = Duration::from_secs(10); + let timeout_result = self.arc.condvar.wait_for(&mut lock_state, duration); + assert!(!timeout_result.timed_out(), "timeout!"); + resets += 1; + if resets == 10 { + panic!("Tried loop 10 times!"); + } } } From d9c9b2282969d3a48deca13735cbbcf95a646c4e Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Sat, 28 May 2022 21:49:49 +0100 Subject: [PATCH 21/23] Fix format --- serial_test/src/rwlock.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/serial_test/src/rwlock.rs b/serial_test/src/rwlock.rs index e56c994..2f995ef 100644 --- a/serial_test/src/rwlock.rs +++ b/serial_test/src/rwlock.rs @@ -61,7 +61,7 @@ impl Locks { resets += 1; if resets == 10 { panic!("Tried loop 10 times!"); - } + } } } From 5be34eeebf559e29452c5a6607a8650f4047ae6f Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Sat, 28 May 2022 22:06:33 +0100 Subject: [PATCH 22/23] Condvars shouldn't need timeouts, but sometimes they do --- serial_test/src/rwlock.rs | 24 ++++++------------------ 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/serial_test/src/rwlock.rs b/serial_test/src/rwlock.rs index 2f995ef..13d8ab2 100644 --- a/serial_test/src/rwlock.rs +++ b/serial_test/src/rwlock.rs @@ -41,7 +41,6 @@ impl Locks { pub fn serial(&self) -> MutexGuardWrapper { let mut lock_state = self.arc.mutex.lock(); - let mut resets: u8 = 0; loop { // If all the things we want are true, try to lock out serial if lock_state.parallels == 0 { @@ -54,20 +53,14 @@ impl Locks { } } - // FIXME: hack - let duration = Duration::from_secs(10); - let timeout_result = self.arc.condvar.wait_for(&mut lock_state, duration); - assert!(!timeout_result.timed_out(), "timeout!"); - resets += 1; - if resets == 10 { - panic!("Tried loop 10 times!"); - } + self.arc + .condvar + .wait_for(&mut lock_state, Duration::from_secs(1)); } } pub fn start_parallel(&self) { let mut lock_state = self.arc.mutex.lock(); - let mut resets: u8 = 0; loop { if lock_state.parallels > 0 { // fast path, as someone else already has it locked @@ -82,14 +75,9 @@ impl Locks { return; } - // FIXME: remove timeout, as it's a hack to debug some things - let duration = Duration::from_secs(1); - let timeout_result = self.arc.condvar.wait_for(&mut lock_state, duration); - assert!(!timeout_result.timed_out(), "timeout!"); - resets += 1; - if resets == 10 { - panic!("Tried loop 10 times!"); - } + self.arc + .condvar + .wait_for(&mut lock_state, Duration::from_secs(1)); } } From 9c901839eac80db3b575b901b946f582be9b6a33 Mon Sep 17 00:00:00 2001 From: Tom Parker-Shemilt Date: Sat, 28 May 2022 22:29:42 +0100 Subject: [PATCH 23/23] Document parallel --- README.md | 8 ++++---- serial_test/src/code_lock.rs | 4 ++-- serial_test/src/lib.rs | 13 +++++++++++-- serial_test_derive/src/lib.rs | 35 +++++++++++++++++++++++++++++++++-- 4 files changed, 50 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index ce7a35d..49a803e 100644 --- a/README.md +++ b/README.md @@ -27,14 +27,14 @@ async fn test_serial_another() { // Do things asynchronously } ``` -Multiple tests with the `serial` attribute are guaranteed to be executed in serial. Ordering of the tests is not guaranteed however. -Tests without the `serial` attribute may run at any time, including in parallel to tests marked as `serial`. Note that if you're using -an async test reactor attribute (e.g. `tokio::test` or `actix_rt::test`) then they should be listed *before* `serial`, otherwise we +Multiple tests with the `serial` attribute are guaranteed to be executed in serial. Ordering of the tests is not guaranteed however. Other tests with the `parallel` attribute may run at the same time as each other, but not at the same time as a test with `serial`. Tests with neither attribute may run at any time and no guarantees are made about their timing! + +Note that if you're using an async test reactor attribute (e.g. `tokio::test` or `actix_rt::test`) then they should be listed *before* `serial`, otherwise we don't get an async function and things break. There's now an error for this case to improve debugging. For cases like doctests and integration tests where the tests are run as separate processes, we also support `file_serial`, with similar properties but based off file locking. Note that there are no guarantees about one test with `serial` and another with -`file_serial` as they lock using different methods. +`file_serial` as they lock using different methods, and `parallel` doesn't support `file_serial` yet (patches welcomed!). ## Usage We require at least Rust 1.51. Upgrades to this will require at least a minor version bump (while in 0.x versions) and a major version bump post-1.0. diff --git a/serial_test/src/code_lock.rs b/serial_test/src/code_lock.rs index 274d875..24534c4 100644 --- a/serial_test/src/code_lock.rs +++ b/serial_test/src/code_lock.rs @@ -49,8 +49,8 @@ impl Default for UniqueReentrantMutex { } } -/// Sets the maximum amount of time the serial locks will wait to unlock -/// By default, this is set to 60 seconds, which is almost always much longer than is needed +/// Sets the maximum amount of time the serial locks will wait to unlock. +/// By default, this is set to 60 seconds, which is almost always much longer than is needed. /// This is deliberately set high to try and avoid situations where we accidentally hit the limits /// but is set at all so we can timeout rather than hanging forever. /// diff --git a/serial_test/src/lib.rs b/serial_test/src/lib.rs index 932d1e8..78eaa46 100644 --- a/serial_test/src/lib.rs +++ b/serial_test/src/lib.rs @@ -15,10 +15,19 @@ //! fn test_serial_another() { //! // Do things //! } +//! +//! #[test] +//! #[parallel] +//! fn test_parallel_another() { +//! // Do parallel things +//! } //! ```` //! Multiple tests with the [serial](macro@serial) attribute are guaranteed to be executed in serial. Ordering -//! of the tests is not guaranteed however. Tests without the `serial` attribute may run at any time, including -//! in parallel to tests marked as `serial`. Note that if you're using an async test reactor attribute (e.g. +//! of the tests is not guaranteed however. Other tests with the [parallel](macro@parallel) attribute may run +//! at the same time as each other, but not at the same time as a test with [serial](macro@serial). Tests with +//! neither attribute may run at any time and no guarantees are made about their timing! +//! +//! Note that if you're using an async test reactor attribute (e.g. //! `tokio::test` or `actix_rt::test`) then they should be listed *before* `serial`, otherwise we don't get an //! async function and things break. There's now an error for this case to improve debugging. //! diff --git a/serial_test_derive/src/lib.rs b/serial_test_derive/src/lib.rs index f3b5e88..972ff4f 100644 --- a/serial_test_derive/src/lib.rs +++ b/serial_test_derive/src/lib.rs @@ -26,7 +26,10 @@ use std::ops::Deref; /// } /// ```` /// Multiple tests with the [serial](macro@serial) attribute are guaranteed to be executed in serial. Ordering -/// of the tests is not guaranteed however. If you want different subsets of tests to be serialised with each +/// of the tests is not guaranteed however. If you have other tests that can be run in parallel, but would clash +/// if run at the same time as the [serial](macro@serial) tests, you can use the [parallel](macro@parallel) attribute. +/// +/// If you want different subsets of tests to be serialised with each /// other, but not depend on other subsets, you can add an argument to [serial](macro@serial), and all calls /// with identical arguments will be called in serial. e.g. /// ```` @@ -57,13 +60,41 @@ use std::ops::Deref; /// `test_serial_one` and `test_serial_another` will be executed in serial, as will `test_serial_third` and `test_serial_fourth` /// but neither sequence will be blocked by the other /// -/// Nested serialised tests (i.e. a [serial](macro@serial) tagged test calling another) is supported +/// Nested serialised tests (i.e. a [serial](macro@serial) tagged test calling another) are supported #[proc_macro_attribute] #[proc_macro_error] pub fn serial(attr: TokenStream, input: TokenStream) -> TokenStream { local_serial_core(attr.into(), input.into()).into() } +/// Allows for the creation of parallel Rust tests that won't clash with serial tests +/// ```` +/// #[test] +/// #[serial] +/// fn test_serial_one() { +/// // Do things +/// } +/// +/// #[test] +/// #[parallel] +/// fn test_parallel_one() { +/// // Do things +/// } +/// +/// #[test] +/// #[parallel] +/// fn test_parallel_two() { +/// // Do things +/// } +/// ```` +/// Multiple tests with the [parallel](macro@parallel) attribute may run in parallel, but not at the +/// same time as [serial](macro@serial) tests. e.g. in the example code above, `test_parallel_one` +/// and `test_parallel_two` may run at the same time, but `test_serial_one` is guaranteed not to run +/// at the same time as either of them. [parallel](macro@parallel) also takes key arguments for groups +/// of tests as per [serial](macro@serial). +/// +/// Note that this has zero effect on [file_serial](macro@file_serial) tests, as that uses a different +/// serialisation mechanism. #[proc_macro_attribute] #[proc_macro_error] pub fn parallel(attr: TokenStream, input: TokenStream) -> TokenStream {