diff --git a/core/crates/sync/src/backfill.rs b/core/crates/sync/src/backfill.rs index 027bb8d0bcf8..78d8a280efbf 100644 --- a/core/crates/sync/src/backfill.rs +++ b/core/crates/sync/src/backfill.rs @@ -15,6 +15,8 @@ use crate::crdt_op_unchecked_db; /// Takes all the syncable data in the database and generates CRDTOperations for it. /// This is a requirement before the library can sync. pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, instance_id: i32) { + let lock = sync.timestamp_lock.acquire().await; + db._transaction() .with_timeout(9999999999) .run(|db| async move { @@ -412,6 +414,8 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta }) .await .unwrap(); + + drop(lock); } async fn paginate< diff --git a/core/crates/sync/src/manager.rs b/core/crates/sync/src/manager.rs index a25581738555..e368a40c6275 100644 --- a/core/crates/sync/src/manager.rs +++ b/core/crates/sync/src/manager.rs @@ -24,6 +24,7 @@ pub struct Manager { pub tx: broadcast::Sender, pub ingest: ingest::Handler, pub shared: Arc, + pub timestamp_lock: tokio::sync::Semaphore, } impl fmt::Debug for Manager { @@ -68,7 +69,12 @@ impl Manager { let ingest = ingest::Actor::spawn(shared.clone()); New { - manager: Self { tx, ingest, shared }, + manager: Self { + tx, + ingest, + shared, + timestamp_lock: tokio::sync::Semaphore::new(1), + }, rx, } } @@ -80,9 +86,15 @@ impl Manager { pub async fn write_ops<'item, I: prisma_client_rust::BatchItem<'item>>( &self, tx: &PrismaClient, - (_ops, queries): (Vec, I), + (mut _ops, queries): (Vec, I), ) -> prisma_client_rust::Result<::ReturnValue> { let ret = if self.emit_messages_flag.load(atomic::Ordering::Relaxed) { + let lock = self.timestamp_lock.acquire().await; + + _ops.iter_mut().for_each(|op| { + op.timestamp = *self.get_clock().new_timestamp().get_time(); + }); + let (res, _) = tx ._batch(( queries, @@ -94,6 +106,8 @@ impl Manager { self.tx.send(SyncMessage::Created).ok(); + drop(lock); + res } else { tx._batch([queries]).await?.remove(0) @@ -106,14 +120,20 @@ impl Manager { pub async fn write_op<'item, Q: prisma_client_rust::BatchItem<'item>>( &self, tx: &PrismaClient, - op: CRDTOperation, + mut op: CRDTOperation, query: Q, ) -> prisma_client_rust::Result<::ReturnValue> { let ret = if self.emit_messages_flag.load(atomic::Ordering::Relaxed) { + let lock = self.timestamp_lock.acquire().await; + + op.timestamp = *self.get_clock().new_timestamp().get_time(); + let ret = tx._batch((crdt_op_db(&op).to_query(tx), query)).await?.1; self.tx.send(SyncMessage::Created).ok(); + drop(lock); + ret } else { tx._batch(vec![query]).await?.remove(0)