diff --git a/src/persistence/task.rs b/src/persistence/task.rs index 809334fd..546187e8 100644 --- a/src/persistence/task.rs +++ b/src/persistence/task.rs @@ -1,5 +1,5 @@ use std::fmt::Debug; -use std::sync::atomic::{AtomicU16, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU16, Ordering}; use std::sync::Arc; use tokio::sync::Notify; @@ -66,6 +66,8 @@ pub struct PersistenceTask { engine_task_handle: tokio::task::AbortHandle, queue: Arc>, progress_notify: Arc, + // True if non-empty, false either. + wait_state: Arc, } impl @@ -84,16 +86,21 @@ impl { let queue = Arc::new(Queue::new()); let progress_notify = Arc::new(Notify::new()); + let wait_state = Arc::new(AtomicBool::new(false)); let engine_queue = queue.clone(); let engine_progress_notify = progress_notify.clone(); + let engine_wait_state = wait_state.clone(); let task = async move { loop { let next_op = if let Some(next_op) = engine_queue.immediate_pop() { next_op } else { + engine_wait_state.store(true, Ordering::Relaxed); engine_progress_notify.notify_waiters(); - engine_queue.pop().await + let res = engine_queue.pop().await; + engine_wait_state.store(false, Ordering::Relaxed); + res }; tracing::debug!("Applying operation {:?}", next_op); let res = engine.apply_operation(next_op).await; @@ -107,12 +114,15 @@ impl queue, engine_task_handle, progress_notify, + wait_state, } } pub async fn wait_for_ops(&self) { - let count = self.queue.len(); - tracing::info!("Waiting for {} operations", count); - self.progress_notify.notified().await + if !self.wait_state.load(Ordering::Relaxed) { + let count = self.queue.len(); + println!("Waiting for {} operations", count); + self.progress_notify.notified().await + } } } diff --git a/tests/persistence/sync/mod.rs b/tests/persistence/sync/mod.rs index ad886a4c..0edf11a9 100644 --- a/tests/persistence/sync/mod.rs +++ b/tests/persistence/sync/mod.rs @@ -1,4 +1,5 @@ use crate::remove_dir_if_exists; +use std::time::Duration; use worktable::prelude::*; use worktable::worktable; @@ -32,6 +33,26 @@ worktable! ( } ); +#[test] +fn test_wait_for_ops_for_empty() { + let config = PersistenceConfig::new("tests/data/sync/insert", "tests/data/sync/insert"); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_io() + .enable_time() + .build() + .unwrap(); + + runtime.block_on(async { + let table = TestSyncWorkTable::load_from_file(config.clone()) + .await + .unwrap(); + tokio::time::sleep(Duration::from_millis(200)).await; + table.wait_for_ops().await; + }); +} + #[test] fn test_space_insert_sync() { let config = PersistenceConfig::new("tests/data/sync/insert", "tests/data/sync/insert");