Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/shiny-pugs-train.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
'@powersync/service-module-postgres-storage': minor
'@powersync/service-module-mongodb-storage': minor
'@powersync/service-core-tests': minor
'@powersync/service-module-postgres': minor
'@powersync/service-module-mongodb': minor
'@powersync/service-core': minor
'@powersync/service-module-mysql': minor
---

Delay switching over to new sync rules until we have a consistent checkpoint.
8 changes: 4 additions & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,10 @@ jobs:
shell: bash
run: pnpm build

- name: Test
- name: Test Replication
run: pnpm test --filter='./modules/module-postgres'

- name: Test
- name: Test Storage
run: pnpm test --filter='./modules/module-postgres-storage'

run-mysql-tests:
Expand Down Expand Up @@ -252,7 +252,7 @@ jobs:
shell: bash
run: pnpm build

- name: Test
- name: Test Replication
run: pnpm test --filter='./modules/module-mysql'

run-mongodb-tests:
Expand Down Expand Up @@ -320,7 +320,7 @@ jobs:
shell: bash
run: pnpm build

- name: Test
- name: Test Replication
run: pnpm test --filter='./modules/module-mongodb'

- name: Test Storage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
BucketStorageMarkRecordUnavailable,
deserializeBson,
InternalOpId,
isCompleteRow,
SaveOperationTag,
storage,
utils
Expand Down Expand Up @@ -49,6 +50,7 @@ export interface MongoBucketBatchOptions {
lastCheckpointLsn: string | null;
keepaliveOp: InternalOpId | null;
noCheckpointBeforeLsn: string;
resumeFromLsn: string | null;
storeCurrentData: boolean;
/**
* Set to true for initial replication.
Expand Down Expand Up @@ -99,6 +101,20 @@ export class MongoBucketBatch
*/
public last_flushed_op: InternalOpId | null = null;

/**
* lastCheckpointLsn is the last consistent commit.
*
* While that is generally a "safe" point to resume from, there are cases where we may want to resume from a different point:
* 1. After an initial snapshot, we don't have a consistent commit yet, but need to resume from the snapshot LSN.
* 2. If "no_checkpoint_before_lsn" is set far in advance, it may take a while to reach that point. We
* may want to resume at incremental points before that.
*
* This is set when creating the batch, but may not be updated afterwards.
*/
public resumeFromLsn: string | null = null;

private needsActivation = true;

constructor(options: MongoBucketBatchOptions) {
super();
this.logger = options.logger ?? defaultLogger;
Expand All @@ -107,6 +123,7 @@ export class MongoBucketBatch
this.group_id = options.groupId;
this.last_checkpoint_lsn = options.lastCheckpointLsn;
this.no_checkpoint_before_lsn = options.noCheckpointBeforeLsn;
this.resumeFromLsn = options.resumeFromLsn;
this.session = this.client.startSession();
this.slot_name = options.slotName;
this.sync_rules = options.syncRules;
Expand Down Expand Up @@ -332,7 +349,7 @@ export class MongoBucketBatch
// Not an error if we re-apply a transaction
existing_buckets = [];
existing_lookups = [];
if (this.storeCurrentData) {
if (!isCompleteRow(this.storeCurrentData, after!)) {
if (this.markRecordUnavailable != null) {
// This will trigger a "resnapshot" of the record.
// This is not relevant if storeCurrentData is false, since we'll get the full row
Expand Down Expand Up @@ -685,6 +702,7 @@ export class MongoBucketBatch

if (!createEmptyCheckpoints && this.persisted_op == null) {
// Nothing to commit - also return true
await this.autoActivate(lsn);
return true;
}

Expand Down Expand Up @@ -729,12 +747,65 @@ export class MongoBucketBatch
},
{ session: this.session }
);
await this.autoActivate(lsn);
await this.db.notifyCheckpoint();
this.persisted_op = null;
this.last_checkpoint_lsn = lsn;
return true;
}

/**
* Switch from processing -> active if relevant.
*
* Called on new commits.
*/
private async autoActivate(lsn: string) {
if (!this.needsActivation) {
return;
}

// Activate the batch, so it can start processing.
// This is done automatically when the first save() is called.

const session = this.session;
let activated = false;
await session.withTransaction(async () => {
const doc = await this.db.sync_rules.findOne({ _id: this.group_id }, { session });
if (doc && doc.state == 'PROCESSING') {
await this.db.sync_rules.updateOne(
{
_id: this.group_id
},
{
$set: {
state: storage.SyncRuleState.ACTIVE
}
},
{ session }
);

await this.db.sync_rules.updateMany(
{
_id: { $ne: this.group_id },
state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] }
},
{
$set: {
state: storage.SyncRuleState.STOP
}
},
{ session }
);
activated = true;
}
});
if (activated) {
this.logger.info(`Activated new sync rules at ${lsn}`);
await this.db.notifyCheckpoint();
}
this.needsActivation = false;
}

async keepalive(lsn: string): Promise<boolean> {
if (this.last_checkpoint_lsn != null && lsn <= this.last_checkpoint_lsn) {
// No-op
Expand Down Expand Up @@ -782,13 +853,14 @@ export class MongoBucketBatch
},
{ session: this.session }
);
await this.autoActivate(lsn);
await this.db.notifyCheckpoint();
this.last_checkpoint_lsn = lsn;

return true;
}

async setSnapshotLsn(lsn: string): Promise<void> {
async setResumeLsn(lsn: string): Promise<void> {
const update: Partial<SyncRuleDocument> = {
snapshot_lsn: lsn
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
GetCheckpointChangesOptions,
InternalOpId,
internalToExternalOpId,
maxLsn,
ProtocolOpId,
ReplicationCheckpoint,
storage,
Expand Down Expand Up @@ -131,7 +132,7 @@ export class MongoSyncBucketStorage
{
_id: this.group_id
},
{ projection: { last_checkpoint_lsn: 1, no_checkpoint_before: 1, keepalive_op: 1 } }
{ projection: { last_checkpoint_lsn: 1, no_checkpoint_before: 1, keepalive_op: 1, snapshot_lsn: 1 } }
);
const checkpoint_lsn = doc?.last_checkpoint_lsn ?? null;

Expand All @@ -142,6 +143,7 @@ export class MongoSyncBucketStorage
groupId: this.group_id,
slotName: this.slot_name,
lastCheckpointLsn: checkpoint_lsn,
resumeFromLsn: maxLsn(checkpoint_lsn, doc?.snapshot_lsn),
noCheckpointBeforeLsn: doc?.no_checkpoint_before ?? options.zeroLSN,
keepaliveOp: doc?.keepalive_op ? BigInt(doc.keepalive_op) : null,
storeCurrentData: options.storeCurrentData,
Expand Down Expand Up @@ -640,41 +642,6 @@ export class MongoSyncBucketStorage
);
}

async autoActivate(): Promise<void> {
await this.db.client.withSession(async (session) => {
await session.withTransaction(async () => {
const doc = await this.db.sync_rules.findOne({ _id: this.group_id }, { session });
if (doc && doc.state == 'PROCESSING') {
await this.db.sync_rules.updateOne(
{
_id: this.group_id
},
{
$set: {
state: storage.SyncRuleState.ACTIVE
}
},
{ session }
);

await this.db.sync_rules.updateMany(
{
_id: { $ne: this.group_id },
state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] }
},
{
$set: {
state: storage.SyncRuleState.STOP
}
},
{ session }
);
await this.db.notifyCheckpoint();
}
});
});
}

async reportError(e: any): Promise<void> {
const message = String(e.message ?? 'Replication failure');
await this.db.sync_rules.updateOne(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,15 @@ export interface SyncRuleDocument {
snapshot_done: boolean;

/**
* This is now used for "resumeLsn".
*
* If snapshot_done = false, this may be the lsn at which we started the snapshot.
*
* This can be used for resuming the snapshot after a restart.
*
* If snapshot_done is true, this is treated as the point to restart replication from.
*
* More specifically, we resume replication from max(snapshot_lsn, last_checkpoint_lsn).
*/
snapshot_lsn: string | undefined;

Expand Down
Loading