Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions src/persistence/task.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::fmt::Debug;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
use std::sync::Arc;

use tokio::sync::Notify;
Expand Down Expand Up @@ -66,6 +66,8 @@ pub struct PersistenceTask<PrimaryKeyGenState, PrimaryKey, SecondaryKeys> {
engine_task_handle: tokio::task::AbortHandle,
queue: Arc<Queue<PrimaryKeyGenState, PrimaryKey, SecondaryKeys>>,
progress_notify: Arc<Notify>,
// True if non-empty, false either.
wait_state: Arc<AtomicBool>,
}

impl<PrimaryKeyGenState, PrimaryKey, SecondaryKeys>
Expand All @@ -84,16 +86,21 @@ impl<PrimaryKeyGenState, PrimaryKey, SecondaryKeys>
{
let queue = Arc::new(Queue::new());
let progress_notify = Arc::new(Notify::new());
let wait_state = Arc::new(AtomicBool::new(false));

let engine_queue = queue.clone();
let engine_progress_notify = progress_notify.clone();
let engine_wait_state = wait_state.clone();
let task = async move {
loop {
let next_op = if let Some(next_op) = engine_queue.immediate_pop() {
next_op
} else {
engine_wait_state.store(true, Ordering::Relaxed);
engine_progress_notify.notify_waiters();
engine_queue.pop().await
let res = engine_queue.pop().await;
engine_wait_state.store(false, Ordering::Relaxed);
res
};
tracing::debug!("Applying operation {:?}", next_op);
let res = engine.apply_operation(next_op).await;
Expand All @@ -107,12 +114,15 @@ impl<PrimaryKeyGenState, PrimaryKey, SecondaryKeys>
queue,
engine_task_handle,
progress_notify,
wait_state,
}
}

pub async fn wait_for_ops(&self) {
let count = self.queue.len();
tracing::info!("Waiting for {} operations", count);
self.progress_notify.notified().await
if !self.wait_state.load(Ordering::Relaxed) {
let count = self.queue.len();
println!("Waiting for {} operations", count);
self.progress_notify.notified().await
}
}
}
21 changes: 21 additions & 0 deletions tests/persistence/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::remove_dir_if_exists;
use std::time::Duration;

use worktable::prelude::*;
use worktable::worktable;
Expand Down Expand Up @@ -32,6 +33,26 @@ worktable! (
}
);

#[test]
fn test_wait_for_ops_for_empty() {
let config = PersistenceConfig::new("tests/data/sync/insert", "tests/data/sync/insert");

let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_io()
.enable_time()
.build()
.unwrap();

runtime.block_on(async {
let table = TestSyncWorkTable::load_from_file(config.clone())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
table.wait_for_ops().await;
});
}

#[test]
fn test_space_insert_sync() {
let config = PersistenceConfig::new("tests/data/sync/insert", "tests/data/sync/insert");
Expand Down
Loading