diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 22f0e51..e31701d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -52,14 +52,20 @@ jobs: uses: actions-rs/cargo@v1.0.3 with: command: test - args: --features ${{ matrix.features }} + args: --features ${{ matrix.features }} -- --nocapture + env: + RUST_TEST_THREADS: 3 # So the parallel tests have enough threads + RUST_LOG: debug if: ${{ matrix.features != 'all' }} - name: Build and test all features uses: actions-rs/cargo@v1.0.3 with: command: test - args: --all-features + args: --all-features -- --nocapture if: ${{ matrix.features == 'all' }} + env: + RUST_TEST_THREADS: 3 # So the parallel tests have enough threads + RUST_LOG: debug multi-os-testing: name: Test suite @@ -81,7 +87,10 @@ jobs: uses: actions-rs/cargo@v1.0.3 with: command: test - args: --all-features + args: --all-features -- --nocapture + env: + RUST_TEST_THREADS: 3 # So the parallel tests have enough threads + RUST_LOG: debug minimal-versions: name: minimal versions check diff --git a/Cargo.lock b/Cargo.lock index 4d5f5b0..7717d1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -391,6 +391,7 @@ dependencies = [ "fslock", "itertools", "lazy_static", + "log", "parking_lot", "serial_test_derive", ] @@ -416,6 +417,7 @@ dependencies = [ "env_logger", "futures-util", "lazy_static", + "parking_lot", "serial_test", "tokio", ] diff --git a/README.md b/README.md index ce7a35d..49a803e 100644 --- a/README.md +++ b/README.md @@ -27,14 +27,14 @@ async fn test_serial_another() { // Do things asynchronously } ``` -Multiple tests with the `serial` attribute are guaranteed to be executed in serial. Ordering of the tests is not guaranteed however. -Tests without the `serial` attribute may run at any time, including in parallel to tests marked as `serial`. Note that if you're using -an async test reactor attribute (e.g. `tokio::test` or `actix_rt::test`) then they should be listed *before* `serial`, otherwise we +Multiple tests with the `serial` attribute are guaranteed to be executed in serial. Ordering of the tests is not guaranteed however. Other tests with the `parallel` attribute may run at the same time as each other, but not at the same time as a test with `serial`. Tests with neither attribute may run at any time and no guarantees are made about their timing! + +Note that if you're using an async test reactor attribute (e.g. `tokio::test` or `actix_rt::test`) then they should be listed *before* `serial`, otherwise we don't get an async function and things break. There's now an error for this case to improve debugging. For cases like doctests and integration tests where the tests are run as separate processes, we also support `file_serial`, with similar properties but based off file locking. Note that there are no guarantees about one test with `serial` and another with -`file_serial` as they lock using different methods. +`file_serial` as they lock using different methods, and `parallel` doesn't support `file_serial` yet (patches welcomed!). ## Usage We require at least Rust 1.51. Upgrades to this will require at least a minor version bump (while in 0.x versions) and a major version bump post-1.0. diff --git a/serial_test/Cargo.toml b/serial_test/Cargo.toml index e29b18d..db25746 100644 --- a/serial_test/Cargo.toml +++ b/serial_test/Cargo.toml @@ -16,6 +16,7 @@ parking_lot = "^0.12" serial_test_derive = { version = "~0.7.0", path = "../serial_test_derive" } fslock = {version = "0.2", optional = true} document-features = {version = "0.2", optional=true} +log = "0.4" [dev-dependencies] itertools = "0.10" diff --git a/serial_test/src/code_lock.rs b/serial_test/src/code_lock.rs index 5cfd8fa..24534c4 100644 --- a/serial_test/src/code_lock.rs +++ b/serial_test/src/code_lock.rs @@ -1,29 +1,39 @@ +use crate::rwlock::{Locks, MutexGuardWrapper}; use lazy_static::lazy_static; -use parking_lot::{Mutex, ReentrantMutex, ReentrantMutexGuard, RwLock}; +use log::debug; +use parking_lot::{Mutex, RwLock}; use std::{ cell::RefCell, collections::HashMap, ops::{Deref, DerefMut}, sync::{atomic::AtomicU32, Arc}, - time::Duration, + time::{Duration, Instant}, }; -struct UniqueReentrantMutex { - mutex: ReentrantMutex<()>, +pub(crate) struct UniqueReentrantMutex { + locks: Locks, // Only actually used for tests #[allow(dead_code)] - id: u32, + pub(crate) id: u32, } impl UniqueReentrantMutex { - fn lock(&self) -> ReentrantMutexGuard<()> { - self.mutex.lock() + pub(crate) fn lock(&self) -> MutexGuardWrapper { + self.locks.serial() + } + + pub(crate) fn start_parallel(&self) { + self.locks.start_parallel(); + } + + pub(crate) fn end_parallel(&self) { + self.locks.end_parallel(); } } lazy_static! { - static ref LOCKS: Arc>> = + pub(crate) static ref LOCKS: Arc>> = Arc::new(RwLock::new(HashMap::new())); static ref MAX_WAIT: Arc>> = Arc::new(Mutex::new(RefCell::new(Duration::from_secs(60)))); @@ -33,14 +43,14 @@ lazy_static! { impl Default for UniqueReentrantMutex { fn default() -> Self { Self { - mutex: Default::default(), + locks: Locks::new(), id: MUTEX_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst), } } } -/// Sets the maximum amount of time the serial locks will wait to unlock -/// By default, this is set to 60 seconds, which is almost always much longer than is needed +/// Sets the maximum amount of time the serial locks will wait to unlock. +/// By default, this is set to 60 seconds, which is almost always much longer than is needed. /// This is deliberately set high to try and avoid situations where we accidentally hit the limits /// but is set at all so we can timeout rather than hanging forever. /// @@ -50,123 +60,40 @@ pub fn set_max_wait(max_wait: Duration) { MAX_WAIT.lock().replace(max_wait); } -fn wait_duration() -> Duration { +pub(crate) fn wait_duration() -> Duration { *MAX_WAIT.lock().borrow() } -fn check_new_key(name: &str) { - // Check if a new key is needed. Just need a read lock, which can be done in sync with everyone else - let new_key = { - let unlock = LOCKS - .try_read_recursive_for(wait_duration()) - .expect("read lock didn't work"); - !unlock.deref().contains_key(name) - }; - if new_key { - // This is the rare path, which avoids the multi-writer situation mostly - let mut lock = LOCKS - .try_write_for(wait_duration()) - .expect("write lock didn't work"); - - lock.deref_mut().entry(name.to_string()).or_default(); - } -} - -#[doc(hidden)] -pub fn local_serial_core_with_return( - name: &str, - function: fn() -> Result<(), E>, -) -> Result<(), E> { - check_new_key(name); - - let unlock = LOCKS.read_recursive(); - // _guard needs to be named to avoid being instant dropped - let _guard = unlock.deref()[name].lock(); - function() -} - -#[doc(hidden)] -pub fn local_serial_core(name: &str, function: fn()) { - check_new_key(name); - - let unlock = LOCKS.read_recursive(); - // _guard needs to be named to avoid being instant dropped - let _guard = unlock.deref()[name].lock(); - function(); -} - -#[doc(hidden)] -pub async fn local_async_serial_core_with_return( - name: &str, - fut: impl std::future::Future>, -) -> Result<(), E> { - check_new_key(name); - - let unlock = LOCKS.read_recursive(); - // _guard needs to be named to avoid being instant dropped - let _guard = unlock.deref()[name].lock(); - fut.await -} - -#[doc(hidden)] -pub async fn local_async_serial_core(name: &str, fut: impl std::future::Future) { - check_new_key(name); - - let unlock = LOCKS.read_recursive(); - // _guard needs to be named to avoid being instant dropped - let _guard = unlock.deref()[name].lock(); - fut.await; -} - -#[cfg(test)] -mod tests { - use super::{check_new_key, wait_duration, LOCKS}; - use itertools::Itertools; - use parking_lot::RwLock; - use std::{ - ops::Deref, - sync::{Arc, Barrier}, - thread, - }; - - #[test] - fn test_hammer_check_new_key() { - let ptrs = Arc::new(RwLock::new(Vec::new())); - let mut threads = Vec::new(); +pub(crate) fn check_new_key(name: &str) { + let start = Instant::now(); + loop { + let duration = Instant::now() - start; + debug!("Waiting for '{}' {:?}", name, duration); + // Check if a new key is needed. Just need a read lock, which can be done in sync with everyone else + let try_unlock = LOCKS.try_read_recursive_for(Duration::from_secs(1)); + if let Some(unlock) = try_unlock { + if unlock.deref().contains_key(name) { + return; + } + drop(unlock); // so that we don't hold the read lock and so the writer can maybe succeed + } else { + continue; // wasn't able to get read lock + } - let count = 100; - let barrier = Arc::new(Barrier::new(count)); + // This is the rare path, which avoids the multi-writer situation mostly + let try_lock = LOCKS.try_write_for(Duration::from_secs(1)); - for _ in 0..count { - let local_locks = LOCKS.clone(); - let local_ptrs = ptrs.clone(); - let c = barrier.clone(); - threads.push(thread::spawn(move || { - c.wait(); - check_new_key("foo"); - { - let unlock = local_locks - .try_read_recursive_for(wait_duration()) - .expect("read lock didn't work"); - let mutex = unlock.deref().get("foo").unwrap(); + if let Some(mut lock) = try_lock { + lock.deref_mut().entry(name.to_string()).or_default(); + return; + } - let mut ptr_guard = local_ptrs - .try_write_for(wait_duration()) - .expect("write lock didn't work"); - ptr_guard.push(mutex.id); - } + // If the try_lock fails, then go around the loop again + // Odds are another test was also locking on the write and has now written the key - c.wait(); - })); - } - for thread in threads { - thread.join().expect("thread join worked"); + let duration = Instant::now() - start; + if duration >= wait_duration() { + panic!("check_new_key timed out!"); } - let ptrs_read_lock = ptrs - .try_read_recursive_for(wait_duration()) - .expect("ptrs read work"); - assert_eq!(ptrs_read_lock.len(), count); - println!("{:?}", ptrs_read_lock); - assert_eq!(ptrs_read_lock.iter().unique().count(), 1); } } diff --git a/serial_test/src/lib.rs b/serial_test/src/lib.rs index 57bb07b..78eaa46 100644 --- a/serial_test/src/lib.rs +++ b/serial_test/src/lib.rs @@ -15,10 +15,19 @@ //! fn test_serial_another() { //! // Do things //! } +//! +//! #[test] +//! #[parallel] +//! fn test_parallel_another() { +//! // Do parallel things +//! } //! ```` //! Multiple tests with the [serial](macro@serial) attribute are guaranteed to be executed in serial. Ordering -//! of the tests is not guaranteed however. Tests without the `serial` attribute may run at any time, including -//! in parallel to tests marked as `serial`. Note that if you're using an async test reactor attribute (e.g. +//! of the tests is not guaranteed however. Other tests with the [parallel](macro@parallel) attribute may run +//! at the same time as each other, but not at the same time as a test with [serial](macro@serial). Tests with +//! neither attribute may run at any time and no guarantees are made about their timing! +//! +//! Note that if you're using an async test reactor attribute (e.g. //! `tokio::test` or `actix_rt::test`) then they should be listed *before* `serial`, otherwise we don't get an //! async function and things break. There's now an error for this case to improve debugging. //! @@ -41,12 +50,21 @@ )] mod code_lock; +mod parallel_code_lock; +mod rwlock; +mod serial_code_lock; + #[cfg(feature = "file_locks")] mod file_lock; -pub use code_lock::{ +pub use code_lock::set_max_wait; +pub use parallel_code_lock::{ + local_async_parallel_core, local_async_parallel_core_with_return, local_parallel_core, + local_parallel_core_with_return, +}; +pub use serial_code_lock::{ local_async_serial_core, local_async_serial_core_with_return, local_serial_core, - local_serial_core_with_return, set_max_wait, + local_serial_core_with_return, }; #[cfg(feature = "file_locks")] @@ -56,6 +74,7 @@ pub use file_lock::{ }; // Re-export #[serial/file_serial]. +pub use serial_test_derive::parallel; #[allow(unused_imports)] pub use serial_test_derive::serial; diff --git a/serial_test/src/parallel_code_lock.rs b/serial_test/src/parallel_code_lock.rs new file mode 100644 index 0000000..e521389 --- /dev/null +++ b/serial_test/src/parallel_code_lock.rs @@ -0,0 +1,52 @@ +#![allow(clippy::await_holding_lock)] + +use crate::code_lock::{check_new_key, LOCKS}; +use std::ops::Deref; + +#[doc(hidden)] +pub fn local_parallel_core_with_return( + name: &str, + function: fn() -> Result<(), E>, +) -> Result<(), E> { + check_new_key(name); + + let unlock = LOCKS.read_recursive(); + unlock.deref()[name].start_parallel(); + let ret = function(); + unlock.deref()[name].end_parallel(); + ret +} + +#[doc(hidden)] +pub fn local_parallel_core(name: &str, function: fn()) { + check_new_key(name); + + let unlock = LOCKS.read_recursive(); + unlock.deref()[name].start_parallel(); + function(); + unlock.deref()[name].end_parallel(); +} + +#[doc(hidden)] +pub async fn local_async_parallel_core_with_return( + name: &str, + fut: impl std::future::Future>, +) -> Result<(), E> { + check_new_key(name); + + let unlock = LOCKS.read_recursive(); + unlock.deref()[name].start_parallel(); + let ret = fut.await; + unlock.deref()[name].end_parallel(); + ret +} + +#[doc(hidden)] +pub async fn local_async_parallel_core(name: &str, fut: impl std::future::Future) { + check_new_key(name); + + let unlock = LOCKS.read_recursive(); + unlock.deref()[name].start_parallel(); + fut.await; + unlock.deref()[name].end_parallel(); +} diff --git a/serial_test/src/rwlock.rs b/serial_test/src/rwlock.rs new file mode 100644 index 0000000..13d8ab2 --- /dev/null +++ b/serial_test/src/rwlock.rs @@ -0,0 +1,91 @@ +use parking_lot::{Condvar, Mutex, ReentrantMutex, ReentrantMutexGuard}; +use std::{sync::Arc, time::Duration}; + +struct LockState { + parallels: u32, +} + +struct LockData { + mutex: Mutex, + serial: ReentrantMutex<()>, + condvar: Condvar, +} + +#[derive(Clone)] +pub(crate) struct Locks { + arc: Arc, +} + +pub(crate) struct MutexGuardWrapper<'a> { + #[allow(dead_code)] // need it around to get dropped + mutex_guard: ReentrantMutexGuard<'a, ()>, + locks: Locks, +} + +impl<'a> Drop for MutexGuardWrapper<'a> { + fn drop(&mut self) { + self.locks.arc.condvar.notify_one(); + } +} + +impl Locks { + pub fn new() -> Locks { + Locks { + arc: Arc::new(LockData { + mutex: Mutex::new(LockState { parallels: 0 }), + condvar: Condvar::new(), + serial: Default::default(), + }), + } + } + + pub fn serial(&self) -> MutexGuardWrapper { + let mut lock_state = self.arc.mutex.lock(); + loop { + // If all the things we want are true, try to lock out serial + if lock_state.parallels == 0 { + let possible_serial_lock = self.arc.serial.try_lock(); + if let Some(serial_lock) = possible_serial_lock { + return MutexGuardWrapper { + mutex_guard: serial_lock, + locks: self.clone(), + }; + } + } + + self.arc + .condvar + .wait_for(&mut lock_state, Duration::from_secs(1)); + } + } + + pub fn start_parallel(&self) { + let mut lock_state = self.arc.mutex.lock(); + loop { + if lock_state.parallels > 0 { + // fast path, as someone else already has it locked + lock_state.parallels += 1; + return; + } + + let possible_serial_lock = self.arc.serial.try_lock(); + if possible_serial_lock.is_some() { + // We now know no-one else has the serial lock, so we can add to parallel + lock_state.parallels = 1; // Had to have been 0 before, as otherwise we'd have hit the fast path + return; + } + + self.arc + .condvar + .wait_for(&mut lock_state, Duration::from_secs(1)); + } + } + + pub fn end_parallel(&self) { + let mut lock_state = self.arc.mutex.lock(); + assert!(lock_state.parallels > 0); + lock_state.parallels -= 1; + drop(lock_state); + self.arc.condvar.notify_one(); + } +} diff --git a/serial_test/src/serial_code_lock.rs b/serial_test/src/serial_code_lock.rs new file mode 100644 index 0000000..5bd689d --- /dev/null +++ b/serial_test/src/serial_code_lock.rs @@ -0,0 +1,103 @@ +#![allow(clippy::await_holding_lock)] + +use crate::code_lock::{check_new_key, LOCKS}; +use std::ops::Deref; + +#[doc(hidden)] +pub fn local_serial_core_with_return( + name: &str, + function: fn() -> Result<(), E>, +) -> Result<(), E> { + check_new_key(name); + + let unlock = LOCKS.read_recursive(); + // _guard needs to be named to avoid being instant dropped + let _guard = unlock.deref()[name].lock(); + function() +} + +#[doc(hidden)] +pub fn local_serial_core(name: &str, function: fn()) { + check_new_key(name); + + let unlock = LOCKS.read_recursive(); + // _guard needs to be named to avoid being instant dropped + let _guard = unlock.deref()[name].lock(); + function(); +} + +#[doc(hidden)] +pub async fn local_async_serial_core_with_return( + name: &str, + fut: impl std::future::Future>, +) -> Result<(), E> { + check_new_key(name); + + let unlock = LOCKS.read_recursive(); + // _guard needs to be named to avoid being instant dropped + let _guard = unlock.deref()[name].lock(); + fut.await +} + +#[doc(hidden)] +pub async fn local_async_serial_core(name: &str, fut: impl std::future::Future) { + check_new_key(name); + + let unlock = LOCKS.read_recursive(); + // _guard needs to be named to avoid being instant dropped + let _guard = unlock.deref()[name].lock(); + fut.await; +} + +#[cfg(test)] +mod tests { + use crate::code_lock::{check_new_key, wait_duration, LOCKS}; + use itertools::Itertools; + use parking_lot::RwLock; + use std::{ + ops::Deref, + sync::{Arc, Barrier}, + thread, + }; + + #[test] + fn test_hammer_check_new_key() { + let ptrs = Arc::new(RwLock::new(Vec::new())); + let mut threads = Vec::new(); + + let count = 100; + let barrier = Arc::new(Barrier::new(count)); + + for _ in 0..count { + let local_locks = LOCKS.clone(); + let local_ptrs = ptrs.clone(); + let c = barrier.clone(); + threads.push(thread::spawn(move || { + c.wait(); + check_new_key("foo"); + { + let unlock = local_locks + .try_read_recursive_for(wait_duration()) + .expect("read lock didn't work"); + let mutex = unlock.deref().get("foo").unwrap(); + + let mut ptr_guard = local_ptrs + .try_write_for(wait_duration()) + .expect("write lock didn't work"); + ptr_guard.push(mutex.id); + } + + c.wait(); + })); + } + for thread in threads { + thread.join().expect("thread join worked"); + } + let ptrs_read_lock = ptrs + .try_read_recursive_for(wait_duration()) + .expect("ptrs read work"); + assert_eq!(ptrs_read_lock.len(), count); + println!("{:?}", ptrs_read_lock); + assert_eq!(ptrs_read_lock.iter().unique().count(), 1); + } +} diff --git a/serial_test_derive/src/lib.rs b/serial_test_derive/src/lib.rs index 3e827cb..972ff4f 100644 --- a/serial_test_derive/src/lib.rs +++ b/serial_test_derive/src/lib.rs @@ -26,7 +26,10 @@ use std::ops::Deref; /// } /// ```` /// Multiple tests with the [serial](macro@serial) attribute are guaranteed to be executed in serial. Ordering -/// of the tests is not guaranteed however. If you want different subsets of tests to be serialised with each +/// of the tests is not guaranteed however. If you have other tests that can be run in parallel, but would clash +/// if run at the same time as the [serial](macro@serial) tests, you can use the [parallel](macro@parallel) attribute. +/// +/// If you want different subsets of tests to be serialised with each /// other, but not depend on other subsets, you can add an argument to [serial](macro@serial), and all calls /// with identical arguments will be called in serial. e.g. /// ```` @@ -57,13 +60,47 @@ use std::ops::Deref; /// `test_serial_one` and `test_serial_another` will be executed in serial, as will `test_serial_third` and `test_serial_fourth` /// but neither sequence will be blocked by the other /// -/// Nested serialised tests (i.e. a [serial](macro@serial) tagged test calling another) is supported +/// Nested serialised tests (i.e. a [serial](macro@serial) tagged test calling another) are supported #[proc_macro_attribute] #[proc_macro_error] pub fn serial(attr: TokenStream, input: TokenStream) -> TokenStream { local_serial_core(attr.into(), input.into()).into() } +/// Allows for the creation of parallel Rust tests that won't clash with serial tests +/// ```` +/// #[test] +/// #[serial] +/// fn test_serial_one() { +/// // Do things +/// } +/// +/// #[test] +/// #[parallel] +/// fn test_parallel_one() { +/// // Do things +/// } +/// +/// #[test] +/// #[parallel] +/// fn test_parallel_two() { +/// // Do things +/// } +/// ```` +/// Multiple tests with the [parallel](macro@parallel) attribute may run in parallel, but not at the +/// same time as [serial](macro@serial) tests. e.g. in the example code above, `test_parallel_one` +/// and `test_parallel_two` may run at the same time, but `test_serial_one` is guaranteed not to run +/// at the same time as either of them. [parallel](macro@parallel) also takes key arguments for groups +/// of tests as per [serial](macro@serial). +/// +/// Note that this has zero effect on [file_serial](macro@file_serial) tests, as that uses a different +/// serialisation mechanism. +#[proc_macro_attribute] +#[proc_macro_error] +pub fn parallel(attr: TokenStream, input: TokenStream) -> TokenStream { + local_parallel_core(attr.into(), input.into()).into() +} + /// Allows for the creation of file-serialised Rust tests /// ```` /// #[test] @@ -150,12 +187,9 @@ fn get_raw_args(attr: proc_macro2::TokenStream) -> Vec { raw_args } -fn local_serial_core( - attr: proc_macro2::TokenStream, - input: proc_macro2::TokenStream, -) -> proc_macro2::TokenStream { +fn get_core_key(attr: proc_macro2::TokenStream) -> String { let mut raw_args = get_raw_args(attr); - let key = match raw_args.len() { + match raw_args.len() { 0 => "".to_string(), 1 => raw_args.pop().unwrap(), n => { @@ -164,10 +198,25 @@ fn local_serial_core( n, raw_args ); } - }; + } +} + +fn local_serial_core( + attr: proc_macro2::TokenStream, + input: proc_macro2::TokenStream, +) -> proc_macro2::TokenStream { + let key = get_core_key(attr); serial_setup(input, vec![Box::new(key)], "local") } +fn local_parallel_core( + attr: proc_macro2::TokenStream, + input: proc_macro2::TokenStream, +) -> proc_macro2::TokenStream { + let key = get_core_key(attr); + parallel_setup(input, vec![Box::new(key)], "local") +} + fn fs_serial_core( attr: proc_macro2::TokenStream, input: proc_macro2::TokenStream, @@ -197,10 +246,11 @@ fn fs_serial_core( serial_setup(input, args, "fs") } -fn serial_setup( +fn core_setup( input: proc_macro2::TokenStream, args: Vec>, prefix: &str, + kind: &str, ) -> proc_macro2::TokenStream where T: quote::ToTokens + ?Sized, @@ -225,7 +275,9 @@ where { // We assume that any 2-part attribute with the second part as "test" on an async function // is the "do this test with reactor" wrapper. This is true for actix, tokio and async_std. - abort_call_site!("Found async test attribute after serial, which will break"); + abort_call_site!( + "Found async test attribute after serial/parallel, which will break" + ); } // we skip ignore/should_panic because the test framework already deals with it @@ -238,7 +290,7 @@ where if let Some(ret) = return_type { match asyncness { Some(_) => { - let fnname = format_ident!("{}_async_serial_core_with_return", prefix); + let fnname = format_ident!("{}_async_{}_core_with_return", prefix, kind); quote! { #(#attrs) * @@ -248,7 +300,7 @@ where } } None => { - let fnname = format_ident!("{}_serial_core_with_return", prefix); + let fnname = format_ident!("{}_{}_core_with_return", prefix, kind); quote! { #(#attrs) * @@ -261,7 +313,7 @@ where } else { match asyncness { Some(_) => { - let fnname = format_ident!("{}_async_serial_core", prefix); + let fnname = format_ident!("{}_async_{}_core", prefix, kind); quote! { #(#attrs) * @@ -271,7 +323,7 @@ where } } None => { - let fnname = format_ident!("{}_serial_core", prefix); + let fnname = format_ident!("{}_{}_core", prefix, kind); quote! { #(#attrs) * @@ -284,11 +336,33 @@ where } } +fn serial_setup( + input: proc_macro2::TokenStream, + args: Vec>, + prefix: &str, +) -> proc_macro2::TokenStream +where + T: quote::ToTokens + ?Sized, +{ + core_setup(input, args, prefix, "serial") +} + +fn parallel_setup( + input: proc_macro2::TokenStream, + args: Vec>, + prefix: &str, +) -> proc_macro2::TokenStream +where + T: quote::ToTokens + ?Sized, +{ + core_setup(input, args, prefix, "parallel") +} + #[cfg(test)] mod tests { - use proc_macro2::{Literal, Punct, Spacing}; - - use super::{format_ident, fs_serial_core, local_serial_core, quote, TokenTree}; + use super::{fs_serial_core, local_serial_core}; + use proc_macro2::{Literal, Punct, Spacing, TokenTree}; + use quote::{format_ident, quote}; use std::iter::FromIterator; #[test] diff --git a/serial_test_derive/tests/broken/test_serial_async_before_wrapper.stderr b/serial_test_derive/tests/broken/test_serial_async_before_wrapper.stderr index f37e867..35703d1 100644 --- a/serial_test_derive/tests/broken/test_serial_async_before_wrapper.stderr +++ b/serial_test_derive/tests/broken/test_serial_async_before_wrapper.stderr @@ -1,4 +1,4 @@ -error: Found async test attribute after serial, which will break +error: Found async test attribute after serial/parallel, which will break --> tests/broken/test_serial_async_before_wrapper.rs:3:1 | 3 | #[serial] diff --git a/serial_test_test/Cargo.toml b/serial_test_test/Cargo.toml index f9c3b92..3f5c5bf 100644 --- a/serial_test_test/Cargo.toml +++ b/serial_test_test/Cargo.toml @@ -10,6 +10,7 @@ edition = "2018" serial_test = { path="../serial_test" } lazy_static = "^1.2" env_logger = "^0.9" +parking_lot = "^0.12" [dev-dependencies] tokio = { version = "^1.17", features = ["macros", "rt"] } diff --git a/serial_test_test/src/lib.rs b/serial_test_test/src/lib.rs index a5cdb62..f9a1d2e 100644 --- a/serial_test_test/src/lib.rs +++ b/serial_test_test/src/lib.rs @@ -37,12 +37,13 @@ //! ``` use lazy_static::lazy_static; +use parking_lot::Mutex; use std::{ convert::TryInto, env, fs, sync::{ atomic::{AtomicUsize, Ordering}, - Arc, + Arc, Barrier, }, thread, time::Duration, @@ -50,29 +51,31 @@ use std::{ lazy_static! { static ref LOCK: Arc = Arc::new(AtomicUsize::new(0)); + static ref THREAD_ORDERINGS: Arc>> = Arc::new(Mutex::new(Vec::new())); + static ref PARALLEL_BARRIER: Barrier = Barrier::new(3); } fn init() { - let _ = env_logger::builder().is_test(true).try_init(); + let _ = env_logger::builder().is_test(false).try_init(); } pub fn test_fn(count: usize) { init(); - println!("Start {}", count); + println!("(non-fs) Start {}", count); LOCK.store(count, Ordering::Relaxed); thread::sleep(Duration::from_millis(1000 * (count as u64))); - println!("End {}", count); + println!("(non-fs) End {}", count); assert_eq!(LOCK.load(Ordering::Relaxed), count); } pub fn fs_test_fn(count: usize) { init(); - println!("Start {}", count); + println!("(fs) Start {}", count); let mut pathbuf = env::temp_dir(); pathbuf.push("serial-test-test"); fs::write(pathbuf.as_path(), count.to_ne_bytes()).unwrap(); thread::sleep(Duration::from_millis(1000 * (count as u64))); - println!("End {}", count); + println!("(fs) End {}", count); let loaded = fs::read(pathbuf.as_path()) .map(|bytes| usize::from_ne_bytes(bytes.as_slice().try_into().unwrap())) @@ -82,8 +85,9 @@ pub fn fs_test_fn(count: usize) { #[cfg(test)] mod tests { - use super::{init, test_fn}; - use serial_test::serial; + use super::{init, test_fn, PARALLEL_BARRIER, THREAD_ORDERINGS}; + use serial_test::{parallel, serial}; + use std::{thread, time::Duration}; #[cfg(feature = "file_locks")] use super::fs_test_fn; @@ -194,4 +198,50 @@ mod tests { fn test_with_key() { init(); } + + #[test] + #[serial(ordering_key)] + fn serial_with_parallel_key_1() { + let count = THREAD_ORDERINGS.lock().len(); + // Can't guarantee before or after the parallels + assert!(count == 0 || count == 3, "count = {}", count); + } + + #[test] + #[parallel(ordering_key)] + fn parallel_with_key_1() { + thread::sleep(Duration::from_secs(1)); + println!("Waiting barrier 1"); + PARALLEL_BARRIER.wait(); + println!("Waiting lock 1"); + THREAD_ORDERINGS.lock().push(false); + } + + #[test] + #[parallel(ordering_key)] + fn parallel_with_key_2() { + thread::sleep(Duration::from_secs(2)); + println!("Waiting barrier 2"); + PARALLEL_BARRIER.wait(); + println!("Waiting lock 2"); + THREAD_ORDERINGS.lock().push(false); + } + + #[test] + #[parallel(ordering_key)] + fn parallel_with_key_3() { + thread::sleep(Duration::from_secs(3)); + println!("Waiting barrier 3"); + PARALLEL_BARRIER.wait(); + println!("Waiting lock 3"); + THREAD_ORDERINGS.lock().push(false); + } + + #[test] + #[serial(ordering_key)] + fn serial_with_parallel_key_2() { + let count = THREAD_ORDERINGS.lock().len(); + // Can't guarantee before or after the parallels + assert!(count == 0 || count == 3, "count = {}", count); + } }