Skip to content

Commit

Permalink
[ENG-1740] Add sempahore for generating sync operation timestamps (#2335
Browse files Browse the repository at this point in the history
)

add sempahore for generating sync operation timestamps
  • Loading branch information
Brendonovich committed Apr 16, 2024
1 parent 20b8a2b commit b99a1ad
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 3 deletions.
4 changes: 4 additions & 0 deletions core/crates/sync/src/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use crate::crdt_op_unchecked_db;
/// Takes all the syncable data in the database and generates CRDTOperations for it.
/// This is a requirement before the library can sync.
pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, instance_id: i32) {
let lock = sync.timestamp_lock.acquire().await;

db._transaction()
.with_timeout(9999999999)
.run(|db| async move {
Expand Down Expand Up @@ -412,6 +414,8 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta
})
.await
.unwrap();

drop(lock);
}

async fn paginate<
Expand Down
26 changes: 23 additions & 3 deletions core/crates/sync/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct Manager {
pub tx: broadcast::Sender<SyncMessage>,
pub ingest: ingest::Handler,
pub shared: Arc<SharedState>,
pub timestamp_lock: tokio::sync::Semaphore,
}

impl fmt::Debug for Manager {
Expand Down Expand Up @@ -68,7 +69,12 @@ impl Manager {
let ingest = ingest::Actor::spawn(shared.clone());

New {
manager: Self { tx, ingest, shared },
manager: Self {
tx,
ingest,
shared,
timestamp_lock: tokio::sync::Semaphore::new(1),
},
rx,
}
}
Expand All @@ -80,9 +86,15 @@ impl Manager {
pub async fn write_ops<'item, I: prisma_client_rust::BatchItem<'item>>(
&self,
tx: &PrismaClient,
(_ops, queries): (Vec<CRDTOperation>, I),
(mut _ops, queries): (Vec<CRDTOperation>, I),
) -> prisma_client_rust::Result<<I as prisma_client_rust::BatchItemParent>::ReturnValue> {
let ret = if self.emit_messages_flag.load(atomic::Ordering::Relaxed) {
let lock = self.timestamp_lock.acquire().await;

_ops.iter_mut().for_each(|op| {
op.timestamp = *self.get_clock().new_timestamp().get_time();
});

let (res, _) = tx
._batch((
queries,
Expand All @@ -94,6 +106,8 @@ impl Manager {

self.tx.send(SyncMessage::Created).ok();

drop(lock);

res
} else {
tx._batch([queries]).await?.remove(0)
Expand All @@ -106,14 +120,20 @@ impl Manager {
pub async fn write_op<'item, Q: prisma_client_rust::BatchItem<'item>>(
&self,
tx: &PrismaClient,
op: CRDTOperation,
mut op: CRDTOperation,
query: Q,
) -> prisma_client_rust::Result<<Q as prisma_client_rust::BatchItemParent>::ReturnValue> {
let ret = if self.emit_messages_flag.load(atomic::Ordering::Relaxed) {
let lock = self.timestamp_lock.acquire().await;

op.timestamp = *self.get_clock().new_timestamp().get_time();

let ret = tx._batch((crdt_op_db(&op).to_query(tx), query)).await?.1;

self.tx.send(SyncMessage::Created).ok();

drop(lock);

ret
} else {
tx._batch(vec![query]).await?.remove(0)
Expand Down

0 comments on commit b99a1ad

Please sign in to comment.