diff --git a/.changeset/shiny-pugs-train.md b/.changeset/shiny-pugs-train.md new file mode 100644 index 000000000..52277e778 --- /dev/null +++ b/.changeset/shiny-pugs-train.md @@ -0,0 +1,11 @@ +--- +'@powersync/service-module-postgres-storage': minor +'@powersync/service-module-mongodb-storage': minor +'@powersync/service-core-tests': minor +'@powersync/service-module-postgres': minor +'@powersync/service-module-mongodb': minor +'@powersync/service-core': minor +'@powersync/service-module-mysql': minor +--- + +Delay switching over to new sync rules until we have a consistent checkpoint. diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6af3a0552..1cee0f71b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -168,10 +168,10 @@ jobs: shell: bash run: pnpm build - - name: Test + - name: Test Replication run: pnpm test --filter='./modules/module-postgres' - - name: Test + - name: Test Storage run: pnpm test --filter='./modules/module-postgres-storage' run-mysql-tests: @@ -252,7 +252,7 @@ jobs: shell: bash run: pnpm build - - name: Test + - name: Test Replication run: pnpm test --filter='./modules/module-mysql' run-mongodb-tests: @@ -320,7 +320,7 @@ jobs: shell: bash run: pnpm build - - name: Test + - name: Test Replication run: pnpm test --filter='./modules/module-mongodb' - name: Test Storage diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index 6ad06a327..ceee3542e 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -16,6 +16,7 @@ import { BucketStorageMarkRecordUnavailable, deserializeBson, InternalOpId, + isCompleteRow, SaveOperationTag, storage, utils @@ -49,6 +50,7 @@ export interface MongoBucketBatchOptions { lastCheckpointLsn: string | null; keepaliveOp: InternalOpId | null; noCheckpointBeforeLsn: string; + resumeFromLsn: string | null; storeCurrentData: boolean; /** * Set to true for initial replication. @@ -99,6 +101,20 @@ export class MongoBucketBatch */ public last_flushed_op: InternalOpId | null = null; + /** + * lastCheckpointLsn is the last consistent commit. + * + * While that is generally a "safe" point to resume from, there are cases where we may want to resume from a different point: + * 1. After an initial snapshot, we don't have a consistent commit yet, but need to resume from the snapshot LSN. + * 2. If "no_checkpoint_before_lsn" is set far in advance, it may take a while to reach that point. We + * may want to resume at incremental points before that. + * + * This is set when creating the batch, but may not be updated afterwards. + */ + public resumeFromLsn: string | null = null; + + private needsActivation = true; + constructor(options: MongoBucketBatchOptions) { super(); this.logger = options.logger ?? defaultLogger; @@ -107,6 +123,7 @@ export class MongoBucketBatch this.group_id = options.groupId; this.last_checkpoint_lsn = options.lastCheckpointLsn; this.no_checkpoint_before_lsn = options.noCheckpointBeforeLsn; + this.resumeFromLsn = options.resumeFromLsn; this.session = this.client.startSession(); this.slot_name = options.slotName; this.sync_rules = options.syncRules; @@ -332,7 +349,7 @@ export class MongoBucketBatch // Not an error if we re-apply a transaction existing_buckets = []; existing_lookups = []; - if (this.storeCurrentData) { + if (!isCompleteRow(this.storeCurrentData, after!)) { if (this.markRecordUnavailable != null) { // This will trigger a "resnapshot" of the record. // This is not relevant if storeCurrentData is false, since we'll get the full row @@ -685,6 +702,7 @@ export class MongoBucketBatch if (!createEmptyCheckpoints && this.persisted_op == null) { // Nothing to commit - also return true + await this.autoActivate(lsn); return true; } @@ -729,12 +747,65 @@ export class MongoBucketBatch }, { session: this.session } ); + await this.autoActivate(lsn); await this.db.notifyCheckpoint(); this.persisted_op = null; this.last_checkpoint_lsn = lsn; return true; } + /** + * Switch from processing -> active if relevant. + * + * Called on new commits. + */ + private async autoActivate(lsn: string) { + if (!this.needsActivation) { + return; + } + + // Activate the batch, so it can start processing. + // This is done automatically when the first save() is called. + + const session = this.session; + let activated = false; + await session.withTransaction(async () => { + const doc = await this.db.sync_rules.findOne({ _id: this.group_id }, { session }); + if (doc && doc.state == 'PROCESSING') { + await this.db.sync_rules.updateOne( + { + _id: this.group_id + }, + { + $set: { + state: storage.SyncRuleState.ACTIVE + } + }, + { session } + ); + + await this.db.sync_rules.updateMany( + { + _id: { $ne: this.group_id }, + state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] } + }, + { + $set: { + state: storage.SyncRuleState.STOP + } + }, + { session } + ); + activated = true; + } + }); + if (activated) { + this.logger.info(`Activated new sync rules at ${lsn}`); + await this.db.notifyCheckpoint(); + } + this.needsActivation = false; + } + async keepalive(lsn: string): Promise { if (this.last_checkpoint_lsn != null && lsn <= this.last_checkpoint_lsn) { // No-op @@ -782,13 +853,14 @@ export class MongoBucketBatch }, { session: this.session } ); + await this.autoActivate(lsn); await this.db.notifyCheckpoint(); this.last_checkpoint_lsn = lsn; return true; } - async setSnapshotLsn(lsn: string): Promise { + async setResumeLsn(lsn: string): Promise { const update: Partial = { snapshot_lsn: lsn }; diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index d069d28f6..6a0cfcf04 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -16,6 +16,7 @@ import { GetCheckpointChangesOptions, InternalOpId, internalToExternalOpId, + maxLsn, ProtocolOpId, ReplicationCheckpoint, storage, @@ -131,7 +132,7 @@ export class MongoSyncBucketStorage { _id: this.group_id }, - { projection: { last_checkpoint_lsn: 1, no_checkpoint_before: 1, keepalive_op: 1 } } + { projection: { last_checkpoint_lsn: 1, no_checkpoint_before: 1, keepalive_op: 1, snapshot_lsn: 1 } } ); const checkpoint_lsn = doc?.last_checkpoint_lsn ?? null; @@ -142,6 +143,7 @@ export class MongoSyncBucketStorage groupId: this.group_id, slotName: this.slot_name, lastCheckpointLsn: checkpoint_lsn, + resumeFromLsn: maxLsn(checkpoint_lsn, doc?.snapshot_lsn), noCheckpointBeforeLsn: doc?.no_checkpoint_before ?? options.zeroLSN, keepaliveOp: doc?.keepalive_op ? BigInt(doc.keepalive_op) : null, storeCurrentData: options.storeCurrentData, @@ -640,41 +642,6 @@ export class MongoSyncBucketStorage ); } - async autoActivate(): Promise { - await this.db.client.withSession(async (session) => { - await session.withTransaction(async () => { - const doc = await this.db.sync_rules.findOne({ _id: this.group_id }, { session }); - if (doc && doc.state == 'PROCESSING') { - await this.db.sync_rules.updateOne( - { - _id: this.group_id - }, - { - $set: { - state: storage.SyncRuleState.ACTIVE - } - }, - { session } - ); - - await this.db.sync_rules.updateMany( - { - _id: { $ne: this.group_id }, - state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] } - }, - { - $set: { - state: storage.SyncRuleState.STOP - } - }, - { session } - ); - await this.db.notifyCheckpoint(); - } - }); - }); - } - async reportError(e: any): Promise { const message = String(e.message ?? 'Replication failure'); await this.db.sync_rules.updateOne( diff --git a/modules/module-mongodb-storage/src/storage/implementation/models.ts b/modules/module-mongodb-storage/src/storage/implementation/models.ts index 181d50fff..33eac22d8 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/models.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/models.ts @@ -118,9 +118,15 @@ export interface SyncRuleDocument { snapshot_done: boolean; /** + * This is now used for "resumeLsn". + * * If snapshot_done = false, this may be the lsn at which we started the snapshot. * * This can be used for resuming the snapshot after a restart. + * + * If snapshot_done is true, this is treated as the point to restart replication from. + * + * More specifically, we resume replication from max(snapshot_lsn, last_checkpoint_lsn). */ snapshot_lsn: string | undefined; diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 9d0b19821..21d83d27a 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -218,6 +218,11 @@ export class ChangeStream { return await db.collection(table.table).estimatedDocumentCount(); } + /** + * This gets a LSN before starting a snapshot, which we can resume streaming from after the snapshot. + * + * This LSN can survive initial replication restarts. + */ private async getSnapshotLsn(): Promise { const hello = await this.defaultDb.command({ hello: 1 }); // Basic sanity check @@ -292,6 +297,9 @@ export class ChangeStream { ); } + /** + * Given a snapshot LSN, validate that we can read from it, by opening a change stream. + */ private async validateSnapshotLsn(lsn: string) { await using streamManager = this.openChangeStream({ lsn: lsn, maxAwaitTimeMs: 0 }); const { stream } = streamManager; @@ -321,7 +329,7 @@ export class ChangeStream { if (snapshotLsn == null) { // First replication attempt - get a snapshot and store the timestamp snapshotLsn = await this.getSnapshotLsn(); - await batch.setSnapshotLsn(snapshotLsn); + await batch.setResumeLsn(snapshotLsn); this.logger.info(`Marking snapshot at ${snapshotLsn}`); } else { this.logger.info(`Resuming snapshot at ${snapshotLsn}`); @@ -359,11 +367,20 @@ export class ChangeStream { await this.snapshotTable(batch, table); await batch.markSnapshotDone([table], MongoLSN.ZERO.comparable); - await touch(); + this.touch(); } - this.logger.info(`Snapshot commit at ${snapshotLsn}`); + // The checkpoint here is a marker - we need to replicate up to at least this + // point before the data can be considered consistent. + // We could do this for each individual table, but may as well just do it once for the entire snapshot. + const checkpoint = await createCheckpoint(this.client, this.defaultDb, STANDALONE_CHECKPOINT_ID); + await batch.markSnapshotDone([], checkpoint); + + // This will not create a consistent checkpoint yet, but will persist the op. + // Actual checkpoint will be created when streaming replication caught up. await batch.commit(snapshotLsn); + + this.logger.info(`Snapshot done. Need to replicate from ${snapshotLsn} to ${checkpoint} to be consistent`); } ); } @@ -492,7 +509,7 @@ export class ChangeStream { this.logger.info( `Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()} in ${duration.toFixed(0)}ms` ); - await touch(); + this.touch(); } // In case the loop was interrupted, make sure we await the last promise. await nextChunkPromise; @@ -656,7 +673,6 @@ export class ChangeStream { try { // If anything errors here, the entire replication process is halted, and // all connections automatically closed, including this one. - await this.initReplication(); await this.streamChanges(); } catch (e) { @@ -757,19 +773,20 @@ export class ChangeStream { } async streamChangesInternal() { - // Auto-activate as soon as initial replication is done - await this.storage.autoActivate(); - await this.storage.startBatch( { logger: this.logger, zeroLSN: MongoLSN.ZERO.comparable, defaultSchema: this.defaultDb.databaseName, + // We get a complete postimage for every change, so we don't need to store the current data. storeCurrentData: false }, async (batch) => { - const { lastCheckpointLsn } = batch; - const lastLsn = MongoLSN.fromSerialized(lastCheckpointLsn!); + const { resumeFromLsn } = batch; + if (resumeFromLsn == null) { + throw new ReplicationAssertionError(`No LSN found to resume from`); + } + const lastLsn = MongoLSN.fromSerialized(resumeFromLsn); const startAfter = lastLsn?.timestamp; // It is normal for this to be a minute or two old when there is a low volume @@ -778,7 +795,7 @@ export class ChangeStream { this.logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn} | Token age: ${tokenAgeSeconds}s`); - await using streamManager = this.openChangeStream({ lsn: lastCheckpointLsn }); + await using streamManager = this.openChangeStream({ lsn: resumeFromLsn }); const { stream, filters } = streamManager; if (this.abort_signal.aborted) { await stream.close(); @@ -797,6 +814,7 @@ export class ChangeStream { let splitDocument: mongo.ChangeStreamDocument | null = null; let flexDbNameWorkaroundLogged = false; + let changesSinceLastCheckpoint = 0; let lastEmptyResume = performance.now(); @@ -831,7 +849,7 @@ export class ChangeStream { if (waitForCheckpointLsn == null && performance.now() - lastEmptyResume > 60_000) { const { comparable: lsn, timestamp } = MongoLSN.fromResumeToken(stream.resumeToken); await batch.keepalive(lsn); - await touch(); + this.touch(); lastEmptyResume = performance.now(); // Log the token update. This helps as a general "replication is still active" message in the logs. // This token would typically be around 10s behind. @@ -843,7 +861,7 @@ export class ChangeStream { continue; } - await touch(); + this.touch(); if (startAfter != null && originalChangeDocument.clusterTime?.lte(startAfter)) { continue; @@ -966,6 +984,7 @@ export class ChangeStream { if (didCommit) { this.oldestUncommittedChange = null; this.isStartingReplication = false; + changesSinceLastCheckpoint = 0; } } else if ( changeDocument.operationType == 'insert' || @@ -988,7 +1007,21 @@ export class ChangeStream { if (this.oldestUncommittedChange == null && changeDocument.clusterTime != null) { this.oldestUncommittedChange = timestampToDate(changeDocument.clusterTime); } - await this.writeChange(batch, table, changeDocument); + const flushResult = await this.writeChange(batch, table, changeDocument); + changesSinceLastCheckpoint += 1; + if (flushResult != null && changesSinceLastCheckpoint >= 20_000) { + // When we are catching up replication after an initial snapshot, there may be a very long delay + // before we do a commit(). In that case, we need to periodically persist the resume LSN, so + // we don't restart from scratch if we restart replication. + // The same could apply if we need to catch up on replication after some downtime. + const { comparable: lsn } = new MongoLSN({ + timestamp: changeDocument.clusterTime!, + resume_token: changeDocument._id + }); + this.logger.info(`Updating resume LSN to ${lsn} after ${changesSinceLastCheckpoint} changes`); + await batch.setResumeLsn(lsn); + changesSinceLastCheckpoint = 0; + } } } else if (changeDocument.operationType == 'drop') { const rel = getMongoRelation(changeDocument.ns); @@ -1036,13 +1069,18 @@ export class ChangeStream { } return Date.now() - this.oldestUncommittedChange.getTime(); } -} -async function touch() { - // FIXME: The hosted Kubernetes probe does not actually check the timestamp on this. - // FIXME: We need a timeout of around 5+ minutes in Kubernetes if we do start checking the timestamp, - // or reduce PING_INTERVAL here. - return container.probes.touch(); + private lastTouchedAt = performance.now(); + + private touch() { + if (performance.now() - this.lastTouchedAt > 1_000) { + this.lastTouchedAt = performance.now(); + // Update the probes, but don't wait for it + container.probes.touch().catch((e) => { + this.logger.error(`Failed to touch the container probe: ${e.message}`, e); + }); + } + } } function mapChangeStreamError(e: any) { diff --git a/modules/module-mongodb/test/src/change_stream.test.ts b/modules/module-mongodb/test/src/change_stream.test.ts index 2aa524e77..c6aacee0a 100644 --- a/modules/module-mongodb/test/src/change_stream.test.ts +++ b/modules/module-mongodb/test/src/change_stream.test.ts @@ -23,7 +23,9 @@ describe('change stream', () => { function defineChangeStreamTests(factory: storage.TestStorageFactory) { test('replicating basic values', async () => { - await using context = await ChangeStreamTestContext.open(factory); + await using context = await ChangeStreamTestContext.open(factory, { + mongoOptions: { postImages: PostImagesOption.READ_ONLY } + }); const { db } = context; await context.updateSyncRules(` bucket_definitions: @@ -32,7 +34,7 @@ bucket_definitions: - SELECT _id as id, description, num FROM "test_data"`); await db.createCollection('test_data', { - changeStreamPreAndPostImages: { enabled: false } + changeStreamPreAndPostImages: { enabled: true } }); const collection = db.collection('test_data'); @@ -42,11 +44,8 @@ bucket_definitions: const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }); const test_id = result.insertedId; - await setTimeout(30); await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }); - await setTimeout(30); await collection.replaceOne({ _id: test_id }, { description: 'test3' }); - await setTimeout(30); await collection.deleteOne({ _id: test_id }); const data = await context.getBucketData('global[]'); @@ -354,6 +353,9 @@ bucket_definitions: const test_id = result.insertedId.toHexString(); await context.replicateSnapshot(); + // Note: snapshot is only consistent some time into the streaming request. + // At the point that we get the first acknowledged checkpoint, as is required + // for getBucketData(), the data should be consistent. context.startStreaming(); const data = await context.getBucketData('global[]'); @@ -512,10 +514,13 @@ bucket_definitions: await collection.insertOne({ description: 'test1', num: 1152921504606846976n }); await context.replicateSnapshot(); + await context.markSnapshotConsistent(); // Simulate an error await context.storage!.reportError(new Error('simulated error')); - expect((await context.factory.getActiveSyncRulesContent())?.last_fatal_error).toEqual('simulated error'); + const syncRules = await context.factory.getActiveSyncRulesContent(); + expect(syncRules).toBeTruthy(); + expect(syncRules?.last_fatal_error).toEqual('simulated error'); // startStreaming() should automatically clear the error. context.startStreaming(); diff --git a/modules/module-mongodb/test/src/change_stream_utils.ts b/modules/module-mongodb/test/src/change_stream_utils.ts index cc8d17732..b79d12fb4 100644 --- a/modules/module-mongodb/test/src/change_stream_utils.ts +++ b/modules/module-mongodb/test/src/change_stream_utils.ts @@ -17,7 +17,7 @@ import { MongoManager } from '@module/replication/MongoManager.js'; import { createCheckpoint, STANDALONE_CHECKPOINT_ID } from '@module/replication/MongoRelation.js'; import { NormalizedMongoConnectionConfig } from '@module/types/types.js'; -import { TEST_CONNECTION_OPTIONS, clearTestDb } from './util.js'; +import { clearTestDb, TEST_CONNECTION_OPTIONS } from './util.js'; export class ChangeStreamTestContext { private _walStream?: ChangeStream; @@ -119,7 +119,20 @@ export class ChangeStreamTestContext { async replicateSnapshot() { await this.walStream.initReplication(); - await this.storage!.autoActivate(); + } + + /** + * A snapshot is not consistent until streaming replication has caught up. + * We simulate that for tests. + * Do not use if there are any writes performed while doing the snapshot, as that + * would result in inconsistent data. + */ + async markSnapshotConsistent() { + const checkpoint = await createCheckpoint(this.client, this.db, STANDALONE_CHECKPOINT_ID); + + await this.storage!.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive(checkpoint); + }); } startStreaming() { @@ -195,12 +208,11 @@ export async function getClientCheckpoint( while (Date.now() - start < timeout) { const storage = await storageFactory.getActiveStorage(); const cp = await storage?.getCheckpoint(); - if (cp == null) { - throw new Error('No sync rules available'); - } - lastCp = cp; - if (cp.lsn && cp.lsn >= lsn) { - return cp.checkpoint; + if (cp != null) { + lastCp = cp; + if (cp.lsn && cp.lsn >= lsn) { + return cp.checkpoint; + } } await new Promise((resolve) => setTimeout(resolve, 30)); } diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index 6907e842c..3a4b76d71 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -430,8 +430,6 @@ AND table_type = 'BASE TABLE';`, } async streamChanges() { - // Auto-activate as soon as initial replication is done - await this.storage.autoActivate(); const serverId = createRandomServerId(this.storage.group_id); const connection = await this.connections.getConnection(); diff --git a/modules/module-mysql/test/src/BinlogStreamUtils.ts b/modules/module-mysql/test/src/BinlogStreamUtils.ts index 575f50aea..b83aa2bd8 100644 --- a/modules/module-mysql/test/src/BinlogStreamUtils.ts +++ b/modules/module-mysql/test/src/BinlogStreamUtils.ts @@ -112,7 +112,6 @@ export class BinlogStreamTestContext { async replicateSnapshot() { await this.binlogStream.initReplication(); - await this.storage!.autoActivate(); this.replicationDone = true; } diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index c3eeb641b..df050372b 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -7,6 +7,7 @@ import { InternalOpId, internalToExternalOpId, LastValueSink, + maxLsn, storage, utils, WatchWriteCheckpointOptions @@ -310,13 +311,14 @@ export class PostgresSyncRulesStorage SELECT last_checkpoint_lsn, no_checkpoint_before, - keepalive_op + keepalive_op, + snapshot_lsn FROM sync_rules WHERE id = ${{ type: 'int4', value: this.group_id }} ` - .decoded(pick(models.SyncRules, ['last_checkpoint_lsn', 'no_checkpoint_before', 'keepalive_op'])) + .decoded(pick(models.SyncRules, ['last_checkpoint_lsn', 'no_checkpoint_before', 'keepalive_op', 'snapshot_lsn'])) .first(); const checkpoint_lsn = syncRules?.last_checkpoint_lsn ?? null; @@ -330,6 +332,7 @@ export class PostgresSyncRulesStorage last_checkpoint_lsn: checkpoint_lsn, keep_alive_op: syncRules?.keepalive_op, no_checkpoint_before_lsn: syncRules?.no_checkpoint_before ?? options.zeroLSN, + resumeFromLsn: maxLsn(syncRules?.snapshot_lsn, checkpoint_lsn), store_current_data: options.storeCurrentData, skip_existing_rows: options.skipExistingRows ?? false, batch_limits: this.options.batchLimits, @@ -644,43 +647,6 @@ export class PostgresSyncRulesStorage `.execute(); } - async autoActivate(): Promise { - await this.db.transaction(async (db) => { - const syncRulesRow = await db.sql` - SELECT - state - FROM - sync_rules - WHERE - id = ${{ type: 'int4', value: this.group_id }} - ` - .decoded(pick(models.SyncRules, ['state'])) - .first(); - - if (syncRulesRow && syncRulesRow.state == storage.SyncRuleState.PROCESSING) { - await db.sql` - UPDATE sync_rules - SET - state = ${{ type: 'varchar', value: storage.SyncRuleState.ACTIVE }} - WHERE - id = ${{ type: 'int4', value: this.group_id }} - `.execute(); - } - - await db.sql` - UPDATE sync_rules - SET - state = ${{ type: 'varchar', value: storage.SyncRuleState.STOP }} - WHERE - ( - state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }} - OR state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }} - ) - AND id != ${{ type: 'int4', value: this.group_id }} - `.execute(); - }); - } - private async getChecksumsInternal(batch: storage.FetchPartialBucketChecksum[]): Promise { if (batch.length == 0) { return new Map(); diff --git a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts index ab0c942c8..aa0eb0125 100644 --- a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts +++ b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts @@ -31,6 +31,7 @@ export interface PostgresBucketBatchOptions { no_checkpoint_before_lsn: string; store_current_data: boolean; keep_alive_op?: InternalOpId | null; + resumeFromLsn: string | null; /** * Set to true for initial replication. */ @@ -61,6 +62,8 @@ export class PostgresBucketBatch public last_flushed_op: InternalOpId | null = null; + public resumeFromLsn: string | null; + protected db: lib_postgres.DatabaseClient; protected group_id: number; protected last_checkpoint_lsn: string | null; @@ -73,6 +76,7 @@ export class PostgresBucketBatch protected batch: OperationBatch | null; private lastWaitingLogThrottled = 0; private markRecordUnavailable: BucketStorageMarkRecordUnavailable | undefined; + private needsActivation = true; constructor(protected options: PostgresBucketBatchOptions) { super(); @@ -81,6 +85,7 @@ export class PostgresBucketBatch this.group_id = options.group_id; this.last_checkpoint_lsn = options.last_checkpoint_lsn; this.no_checkpoint_before_lsn = options.no_checkpoint_before_lsn; + this.resumeFromLsn = options.resumeFromLsn; this.write_checkpoint_batch = []; this.sync_rules = options.sync_rules; this.markRecordUnavailable = options.markRecordUnavailable; @@ -321,6 +326,7 @@ export class PostgresBucketBatch // Don't create a checkpoint if there were no changes if (!createEmptyCheckpoints && this.persisted_op == null) { // Nothing to commit - return true + await this.autoActivate(lsn); return true; } @@ -363,6 +369,7 @@ export class PostgresBucketBatch .decoded(StatefulCheckpoint) .first(); + await this.autoActivate(lsn); await notifySyncRulesUpdate(this.db, doc!); this.persisted_op = null; @@ -406,13 +413,14 @@ export class PostgresBucketBatch .decoded(StatefulCheckpoint) .first(); + await this.autoActivate(lsn); await notifySyncRulesUpdate(this.db, updated!); this.last_checkpoint_lsn = lsn; return true; } - async setSnapshotLsn(lsn: string): Promise { + async setResumeLsn(lsn: string): Promise { await this.db.sql` UPDATE sync_rules SET @@ -916,6 +924,59 @@ export class PostgresBucketBatch return result; } + /** + * Switch from processing -> active if relevant. + * + * Called on new commits. + */ + private async autoActivate(lsn: string): Promise { + if (!this.needsActivation) { + // Already activated + return; + } + + let didActivate = false; + await this.db.transaction(async (db) => { + const syncRulesRow = await db.sql` + SELECT + state + FROM + sync_rules + WHERE + id = ${{ type: 'int4', value: this.group_id }} + ` + .decoded(pick(models.SyncRules, ['state'])) + .first(); + + if (syncRulesRow && syncRulesRow.state == storage.SyncRuleState.PROCESSING) { + await db.sql` + UPDATE sync_rules + SET + state = ${{ type: 'varchar', value: storage.SyncRuleState.ACTIVE }} + WHERE + id = ${{ type: 'int4', value: this.group_id }} + `.execute(); + didActivate = true; + } + + await db.sql` + UPDATE sync_rules + SET + state = ${{ type: 'varchar', value: storage.SyncRuleState.STOP }} + WHERE + ( + state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }} + OR state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }} + ) + AND id != ${{ type: 'int4', value: this.group_id }} + `.execute(); + }); + if (didActivate) { + this.logger.info(`Activated new sync rules at ${lsn}`); + } + this.needsActivation = false; + } + /** * Gets relevant {@link SqlEventDescriptor}s for the given {@link SourceTable} * TODO maybe share this with an abstract class diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 01c99082e..022426694 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -321,7 +321,7 @@ export class WalStream { // Check that replication slot exists for (let i = 120; i >= 0; i--) { - await touch(); + this.touch(); if (i == 0) { container.reporter.captureException(last_error, { @@ -479,7 +479,7 @@ WHERE oid = $1::regclass`, for (let table of tablesWithStatus) { await this.snapshotTableInTx(batch, db, table); - await touch(); + this.touch(); } // Always commit the initial snapshot at zero. @@ -628,7 +628,7 @@ WHERE oid = $1::regclass`, at += rows.length; this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(rows.length); - await touch(); + this.touch(); } // Important: flush before marking progress @@ -737,6 +737,9 @@ WHERE oid = $1::regclass`, rows.map((r) => r.key) ); } + // Even with resnapshot, we need to wait until we get a new consistent checkpoint + // after the snapshot, so we need to send a keepalive message. + await sendKeepAlive(db); } finally { await db.end(); } @@ -837,7 +840,6 @@ WHERE oid = $1::regclass`, async streamChanges(replicationConnection: pgwire.PgConnection) { // When changing any logic here, check /docs/wal-lsns.md. - const { createEmptyCheckpoints } = await this.ensureStorageCompatibility(); const replicationOptions: Record = { @@ -867,9 +869,6 @@ WHERE oid = $1::regclass`, this.startedStreaming = true; - // Auto-activate as soon as initial replication is done - await this.storage.autoActivate(); - let resnapshot: { table: storage.SourceTable; key: PrimaryKeyValue }[] = []; const markRecordUnavailable = (record: SaveUpdate) => { @@ -911,7 +910,7 @@ WHERE oid = $1::regclass`, let count = 0; for await (const chunk of replicationStream.pgoutputDecode()) { - await touch(); + this.touch(); if (this.abort_signal.aborted) { break; @@ -1091,11 +1090,10 @@ WHERE oid = $1::regclass`, } return Date.now() - this.oldestUncommittedChange.getTime(); } -} -async function touch() { - // FIXME: The hosted Kubernetes probe does not actually check the timestamp on this. - // FIXME: We need a timeout of around 5+ minutes in Kubernetes if we do start checking the timestamp, - // or reduce PING_INTERVAL here. - return container.probes.touch(); + private touch() { + container.probes.touch().catch((e) => { + this.logger.error(`Error touching probe`, e); + }); + } } diff --git a/modules/module-postgres/test/src/checkpoints.test.ts b/modules/module-postgres/test/src/checkpoints.test.ts index fbae9f5e8..70431f613 100644 --- a/modules/module-postgres/test/src/checkpoints.test.ts +++ b/modules/module-postgres/test/src/checkpoints.test.ts @@ -36,6 +36,8 @@ const checkpointTests = (factory: TestStorageFactory) => { await context.replicateSnapshot(); context.startStreaming(); + // Wait for a consistent checkpoint before we start. + await context.getCheckpoint(); const storage = context.storage!; const controller = new AbortController(); diff --git a/modules/module-postgres/test/src/large_batch.test.ts b/modules/module-postgres/test/src/large_batch.test.ts index e084b65a7..777662f12 100644 --- a/modules/module-postgres/test/src/large_batch.test.ts +++ b/modules/module-postgres/test/src/large_batch.test.ts @@ -87,7 +87,6 @@ function defineBatchTests(factory: storage.TestStorageFactory) { const start = Date.now(); await context.replicateSnapshot(); - await context.storage!.autoActivate(); context.startStreaming(); const checkpoint = await context.getCheckpoint({ timeout: 100_000 }); diff --git a/modules/module-postgres/test/src/slow_tests.test.ts b/modules/module-postgres/test/src/slow_tests.test.ts index 551eea9fe..ae5294887 100644 --- a/modules/module-postgres/test/src/slow_tests.test.ts +++ b/modules/module-postgres/test/src/slow_tests.test.ts @@ -97,7 +97,6 @@ bucket_definitions: await pool.query(`ALTER TABLE test_data REPLICA IDENTITY FULL`); await walStream.initReplication(replicationConnection); - await storage.autoActivate(); let abort = false; streamPromise = walStream.streamChanges(replicationConnection).finally(() => { abort = true; @@ -348,7 +347,6 @@ bucket_definitions: let initialReplicationDone = false; streamPromise = (async () => { await walStream.initReplication(replicationConnection); - await storage.autoActivate(); initialReplicationDone = true; await walStream.streamChanges(replicationConnection); })() diff --git a/modules/module-postgres/test/src/util.ts b/modules/module-postgres/test/src/util.ts index 7a75db622..130b70fe9 100644 --- a/modules/module-postgres/test/src/util.ts +++ b/modules/module-postgres/test/src/util.ts @@ -90,10 +90,8 @@ export async function getClientCheckpoint( while (Date.now() - start < timeout) { const storage = await storageFactory.getActiveStorage(); const cp = await storage?.getCheckpoint(); - if (cp == null) { - throw new Error('No sync rules available'); - } - if (cp.lsn && cp.lsn >= lsn) { + + if (cp?.lsn != null && cp.lsn >= lsn) { logger.info(`Got write checkpoint: ${lsn} : ${cp.checkpoint}`); return cp.checkpoint; } diff --git a/modules/module-postgres/test/src/wal_stream.test.ts b/modules/module-postgres/test/src/wal_stream.test.ts index 2293c54c3..53f426171 100644 --- a/modules/module-postgres/test/src/wal_stream.test.ts +++ b/modules/module-postgres/test/src/wal_stream.test.ts @@ -34,13 +34,11 @@ bucket_definitions: `CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text, num int8)` ); - await context.replicateSnapshot(); + await context.initializeReplication(); const startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; const startTxCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED)) ?? 0; - context.startStreaming(); - const [{ test_id }] = pgwireRows( await pool.query( `INSERT INTO test_data(description, num) VALUES('test1', 1152921504606846976) returning id as test_id` @@ -53,7 +51,8 @@ bucket_definitions: const endRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; const endTxCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED)) ?? 0; expect(endRowCount - startRowCount).toEqual(1); - expect(endTxCount - startTxCount).toEqual(1); + // In some rare cases there may be additional empty transactions, so we allow for that. + expect(endTxCount - startTxCount).toBeGreaterThanOrEqual(1); }); test('replicating case sensitive table', async () => { @@ -69,13 +68,11 @@ bucket_definitions: await pool.query(`DROP TABLE IF EXISTS "test_DATA"`); await pool.query(`CREATE TABLE "test_DATA"(id uuid primary key default uuid_generate_v4(), description text)`); - await context.replicateSnapshot(); + await context.initializeReplication(); const startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; const startTxCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED)) ?? 0; - context.startStreaming(); - const [{ test_id }] = pgwireRows( await pool.query(`INSERT INTO "test_DATA"(description) VALUES('test1') returning id as test_id`) ); @@ -86,7 +83,7 @@ bucket_definitions: const endRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; const endTxCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED)) ?? 0; expect(endRowCount - startRowCount).toEqual(1); - expect(endTxCount - startTxCount).toEqual(1); + expect(endTxCount - startTxCount).toBeGreaterThanOrEqual(1); }); test('replicating TOAST values', async () => { @@ -143,8 +140,7 @@ bucket_definitions: await pool.query(`DROP TABLE IF EXISTS test_data`); await pool.query(`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)`); - await context.replicateSnapshot(); - context.startStreaming(); + await context.initializeReplication(); const [{ test_id }] = pgwireRows( await pool.query(`INSERT INTO test_data(description) VALUES('test1') returning id as test_id`) @@ -166,8 +162,7 @@ bucket_definitions: await pool.query(`DROP TABLE IF EXISTS test_data`); await pool.query(`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)`); - await context.replicateSnapshot(); - context.startStreaming(); + await context.initializeReplication(); const [{ test_id }] = pgwireRows( await pool.query(`INSERT INTO test_data(description) VALUES('test1') returning id as test_id`) @@ -179,8 +174,8 @@ bucket_definitions: ) ); - // This update may fail replicating with: - // Error: Update on missing record public.test_data:074a601e-fc78-4c33-a15d-f89fdd4af31d :: {"g":1,"t":"651e9fbe9fec6155895057ec","k":"1a0b34da-fb8c-5e6f-8421-d7a3c5d4df4f"} + // Since we don't have an old copy of the record with the new primary key, this + // may trigger a "resnapshot". await pool.query(`UPDATE test_data SET description = 'test2b' WHERE id = '${test_id2}'`); // Re-use old id again @@ -264,16 +259,12 @@ bucket_definitions: await pool.query(`CREATE TABLE test_donotsync(id uuid primary key default uuid_generate_v4(), description text)`); - await context.replicateSnapshot(); + await context.initializeReplication(); const startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; const startTxCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED)) ?? 0; - context.startStreaming(); - - const [{ test_id }] = pgwireRows( - await pool.query(`INSERT INTO test_donotsync(description) VALUES('test1') returning id as test_id`) - ); + await pool.query(`INSERT INTO test_donotsync(description) VALUES('test1') returning id as test_id`); const data = await context.getBucketData('global[]'); @@ -283,7 +274,7 @@ bucket_definitions: // There was a transaction, but we should not replicate any actual data expect(endRowCount - startRowCount).toEqual(0); - expect(endTxCount - startTxCount).toEqual(1); + expect(endTxCount - startTxCount).toBeGreaterThanOrEqual(1); }); test('reporting slot issues', async () => { diff --git a/modules/module-postgres/test/src/wal_stream_utils.ts b/modules/module-postgres/test/src/wal_stream_utils.ts index aa1170279..b94c73c5a 100644 --- a/modules/module-postgres/test/src/wal_stream_utils.ts +++ b/modules/module-postgres/test/src/wal_stream_utils.ts @@ -118,11 +118,20 @@ export class WalStreamTestContext implements AsyncDisposable { return this._walStream!; } + /** + * Replicate a snapshot, start streaming, and wait for a consistent checkpoint. + */ + async initializeReplication() { + await this.replicateSnapshot(); + this.startStreaming(); + // Make sure we're up to date + await this.getCheckpoint(); + } + async replicateSnapshot() { const promise = (async () => { this.replicationConnection = await this.connectionManager.replicationConnection(); await this.walStream.initReplication(this.replicationConnection); - await this.storage!.autoActivate(); })(); this.snapshotPromise = promise.catch((e) => e); await promise; diff --git a/packages/service-core-tests/src/tests/register-data-storage-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-tests.ts index a766fef47..57e56db8c 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-tests.ts @@ -1603,7 +1603,9 @@ bucket_definitions: const r = await f.configureSyncRules({ content: 'bucket_definitions: {}', validate: false }); const storage = f.getInstance(r.persisted_sync_rules!); - await storage.autoActivate(); + await storage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('1/0'); + }); const metrics2 = await f.getStorageMetrics(); expect(metrics2).toMatchSnapshot(); @@ -1656,7 +1658,6 @@ bucket_definitions: validate: false }); const bucketStorage = factory.getInstance(r.persisted_sync_rules!); - await bucketStorage.autoActivate(); const abortController = new AbortController(); context.onTestFinished(() => abortController.abort()); @@ -1697,7 +1698,6 @@ bucket_definitions: validate: false }); const bucketStorage = factory.getInstance(r.persisted_sync_rules!); - await bucketStorage.autoActivate(); const abortController = new AbortController(); context.onTestFinished(() => abortController.abort()); @@ -1760,7 +1760,6 @@ bucket_definitions: validate: false }); const bucketStorage = factory.getInstance(r.persisted_sync_rules!); - await bucketStorage.autoActivate(); bucketStorage.setWriteCheckpointMode(storage.WriteCheckpointMode.CUSTOM); const abortController = new AbortController(); @@ -1801,7 +1800,6 @@ bucket_definitions: validate: false }); const bucketStorage = factory.getInstance(r.persisted_sync_rules!); - await bucketStorage.autoActivate(); bucketStorage.setWriteCheckpointMode(storage.WriteCheckpointMode.CUSTOM); const abortController = new AbortController(); @@ -1845,7 +1843,6 @@ bucket_definitions: validate: false }); const bucketStorage = factory.getInstance(r.persisted_sync_rules!); - await bucketStorage.autoActivate(); bucketStorage.setWriteCheckpointMode(storage.WriteCheckpointMode.CUSTOM); const abortController = new AbortController(); diff --git a/packages/service-core-tests/src/tests/register-sync-tests.ts b/packages/service-core-tests/src/tests/register-sync-tests.ts index 99e989512..fc661dd48 100644 --- a/packages/service-core-tests/src/tests/register-sync-tests.ts +++ b/packages/service-core-tests/src/tests/register-sync-tests.ts @@ -54,7 +54,6 @@ export function registerSyncTests(factory: storage.TestStorageFactory) { }); const bucketStorage = f.getInstance(syncRules); - await bucketStorage.autoActivate(); const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ @@ -116,7 +115,6 @@ bucket_definitions: }); const bucketStorage = f.getInstance(syncRules); - await bucketStorage.autoActivate(); const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ @@ -178,7 +176,6 @@ bucket_definitions: }); const bucketStorage = f.getInstance(syncRules); - await bucketStorage.autoActivate(); await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { // Initial data: Add one priority row and 10k low-priority rows. @@ -289,7 +286,6 @@ bucket_definitions: }); const bucketStorage = f.getInstance(syncRules); - await bucketStorage.autoActivate(); await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { // Initial data: Add one priority row and 10k low-priority rows. @@ -431,7 +427,6 @@ bucket_definitions: }); const bucketStorage = f.getInstance(syncRules); - await bucketStorage.autoActivate(); await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { // Initial data: Add one priority row and 10k low-priority rows. @@ -561,7 +556,6 @@ bucket_definitions: content: BASIC_SYNC_RULES }); const bucketStorage = f.getInstance(syncRules); - await bucketStorage.autoActivate(); await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ @@ -626,7 +620,6 @@ bucket_definitions: }); const bucketStorage = await f.getInstance(syncRules); - await bucketStorage.autoActivate(); const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ @@ -671,7 +664,6 @@ bucket_definitions: }); const bucketStorage = await f.getInstance(syncRules); - await bucketStorage.autoActivate(); const stream = sync.streamResponse({ syncContext, @@ -699,7 +691,10 @@ bucket_definitions: }); const bucketStorage = await f.getInstance(syncRules); - await bucketStorage.autoActivate(); + // Activate + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('0/0'); + }); const stream = sync.streamResponse({ syncContext, @@ -770,7 +765,10 @@ bucket_definitions: const listsTable = test_utils.makeTestTable('lists', ['id']); const bucketStorage = await f.getInstance(syncRules); - await bucketStorage.autoActivate(); + // Activate + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('0/0'); + }); const stream = sync.streamResponse({ syncContext, @@ -833,7 +831,6 @@ bucket_definitions: const listsTable = test_utils.makeTestTable('lists', ['id']); const bucketStorage = await f.getInstance(syncRules); - await bucketStorage.autoActivate(); await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ @@ -911,7 +908,10 @@ bucket_definitions: const listsTable = test_utils.makeTestTable('lists', ['id']); const bucketStorage = await f.getInstance(syncRules); - await bucketStorage.autoActivate(); + // Activate + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('0/0'); + }); const stream = sync.streamResponse({ syncContext, @@ -974,7 +974,10 @@ bucket_definitions: }); const bucketStorage = await f.getInstance(syncRules); - await bucketStorage.autoActivate(); + // Activate + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('0/0'); + }); const exp = Date.now() / 1000 + 0.1; @@ -1016,7 +1019,6 @@ bucket_definitions: }); const bucketStorage = await f.getInstance(syncRules); - await bucketStorage.autoActivate(); await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ @@ -1157,7 +1159,6 @@ bucket_definitions: }); const bucketStorage = f.getInstance(syncRules); - await bucketStorage.autoActivate(); await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { // <= the managed write checkpoint LSN below diff --git a/packages/service-core/src/storage/BucketStorageBatch.ts b/packages/service-core/src/storage/BucketStorageBatch.ts index 9499af594..62db7dd43 100644 --- a/packages/service-core/src/storage/BucketStorageBatch.ts +++ b/packages/service-core/src/storage/BucketStorageBatch.ts @@ -60,18 +60,29 @@ export interface BucketStorageBatch extends ObserverClient; /** - * Set the LSN for a snapshot, before starting replication. + * Set the LSN that replication should resume from. + * + * This can be used for: + * 1. Setting the LSN for a snapshot, before starting replication. + * 2. Setting the LSN to resume from after a replication restart, without advancing the checkpoint LSN via a commit. * * Not required if the source database keeps track of this, for example with * PostgreSQL logical replication slots. */ - setSnapshotLsn(lsn: string): Promise; + setResumeLsn(lsn: string): Promise; /** * Get the last checkpoint LSN, from either commit or keepalive. */ lastCheckpointLsn: string | null; + /** + * LSN to resume from. + * + * Not relevant for streams where the source keeps track of replication progress, such as Postgres. + */ + resumeFromLsn: string | null; + markSnapshotDone(tables: SourceTable[], no_checkpoint_before_lsn: string): Promise; updateTableProgress(table: SourceTable, progress: Partial): Promise; diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index 2317b830f..6b27e3c7c 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -50,8 +50,6 @@ export interface SyncRulesBucketStorage */ clear(options?: ClearStorageOptions): Promise; - autoActivate(): Promise; - /** * Record a replication error. * diff --git a/packages/service-core/src/util/lsn.ts b/packages/service-core/src/util/lsn.ts new file mode 100644 index 000000000..4b1175628 --- /dev/null +++ b/packages/service-core/src/util/lsn.ts @@ -0,0 +1,8 @@ +/** + * Return the larger of two LSNs. + */ +export function maxLsn(a: string | null | undefined, b: string | null | undefined): string | null { + if (a == null) return b ?? null; + if (b == null) return a; + return a > b ? a : b; +} diff --git a/packages/service-core/src/util/util-index.ts b/packages/service-core/src/util/util-index.ts index acdd09c4a..56e9cbc0e 100644 --- a/packages/service-core/src/util/util-index.ts +++ b/packages/service-core/src/util/util-index.ts @@ -1,5 +1,6 @@ export * from './alerting.js'; export * from './env.js'; +export * from './lsn.js'; export * from './memory-tracking.js'; export * from './Mutex.js'; export * from './protocol-types.js';