From 177eb74c0641015b304c13ae3809287b1341b12b Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 11 Nov 2025 13:41:16 +0200 Subject: [PATCH 01/15] Clean up ended ConnectionManagerFactory instances. --- .../replication/ConnectionManagerFactory.ts | 15 ++++++++---- .../src/replication/PgManager.ts | 24 +++++++++++++++---- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/modules/module-postgres/src/replication/ConnectionManagerFactory.ts b/modules/module-postgres/src/replication/ConnectionManagerFactory.ts index 094ae339..d7ef07c6 100644 --- a/modules/module-postgres/src/replication/ConnectionManagerFactory.ts +++ b/modules/module-postgres/src/replication/ConnectionManagerFactory.ts @@ -5,7 +5,7 @@ import { logger } from '@powersync/lib-services-framework'; import { CustomTypeRegistry } from '../types/registry.js'; export class ConnectionManagerFactory { - private readonly connectionManagers: PgManager[]; + private readonly connectionManagers = new Set(); public readonly dbConnectionConfig: NormalizedPostgresConnectionConfig; constructor( @@ -13,18 +13,25 @@ export class ConnectionManagerFactory { private readonly registry: CustomTypeRegistry ) { this.dbConnectionConfig = dbConnectionConfig; - this.connectionManagers = []; } create(poolOptions: PgPoolOptions) { const manager = new PgManager(this.dbConnectionConfig, { ...poolOptions, registry: this.registry }); - this.connectionManagers.push(manager); + console.log('create ConnectionManagerFactory', this.connectionManagers.size); + this.connectionManagers.add(manager); + + manager.registerListener({ + onEnded: () => { + console.log('manager ended'); + this.connectionManagers.delete(manager); + } + }); return manager; } async shutdown() { logger.info('Shutting down Postgres connection Managers...'); - for (const manager of this.connectionManagers) { + for (const manager of [...this.connectionManagers]) { await manager.end(); } logger.info('Postgres connection Managers shutdown completed.'); diff --git a/modules/module-postgres/src/replication/PgManager.ts b/modules/module-postgres/src/replication/PgManager.ts index 3d034d32..8bcd7ab4 100644 --- a/modules/module-postgres/src/replication/PgManager.ts +++ b/modules/module-postgres/src/replication/PgManager.ts @@ -5,6 +5,7 @@ import { getApplicationName } from '../utils/application-name.js'; import { PostgresTypeResolver } from '../types/resolver.js'; import { getServerVersion } from '../utils/postgres_version.js'; import { CustomTypeRegistry } from '../types/registry.js'; +import { BaseObserver } from '@powersync/lib-services-framework'; export interface PgManagerOptions extends pgwire.PgPoolOptions { registry: CustomTypeRegistry; @@ -15,7 +16,11 @@ export interface PgManagerOptions extends pgwire.PgPoolOptions { */ const SNAPSHOT_SOCKET_TIMEOUT = 30_000; -export class PgManager { +export interface PgManagerListener { + onEnded(): void; +} + +export class PgManager extends BaseObserver { /** * Do not use this for any transactions. */ @@ -29,6 +34,7 @@ export class PgManager { public options: NormalizedPostgresConnectionConfig, public poolOptions: PgManagerOptions ) { + super(); // The pool is lazy - no connections are opened until a query is performed. this.pool = pgwire.connectPgWirePool(this.options, poolOptions); this.types = new PostgresTypeResolver(poolOptions.registry, this.pool); @@ -83,8 +89,9 @@ export class PgManager { for (let result of await Promise.allSettled([ this.pool.end(), ...this.connectionPromises.map(async (promise) => { - const connection = await promise; - return await connection.end(); + // Wait for connection attempts to finish, but do not throw connection errors here + const connection = await promise.catch((_) => {}); + return await connection?.end(); }) ])) { // Throw the first error, if any @@ -92,14 +99,18 @@ export class PgManager { throw result.reason; } } + this.iterateListeners((listener) => { + listener.onEnded?.(); + }); } async destroy() { this.pool.destroy(); for (let result of await Promise.allSettled([ ...this.connectionPromises.map(async (promise) => { - const connection = await promise; - return connection.destroy(); + // Wait for connection attempts to finish, but do not throw connection errors here + const connection = await promise.catch((_) => {}); + return connection?.destroy(); }) ])) { // Throw the first error, if any @@ -107,5 +118,8 @@ export class PgManager { throw result.reason; } } + this.iterateListeners((listener) => { + listener.onEnded?.(); + }); } } From f06d8a429e4251dd60bb66144de5a9421c6bb619 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 11 Nov 2025 13:44:34 +0200 Subject: [PATCH 02/15] Re-use connectionManager for keepAlive. --- .../replication/WalStreamReplicationJob.ts | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts index a9d38b5b..b94fb97e 100644 --- a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts +++ b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts @@ -12,19 +12,13 @@ export interface WalStreamReplicationJobOptions extends replication.AbstractRepl export class WalStreamReplicationJob extends replication.AbstractReplicationJob { private connectionFactory: ConnectionManagerFactory; - private readonly connectionManager: PgManager; + private connectionManager: PgManager | null = null; private lastStream: WalStream | null = null; constructor(options: WalStreamReplicationJobOptions) { super(options); this.logger = logger.child({ prefix: `[${this.slotName}] ` }); this.connectionFactory = options.connectionFactory; - this.connectionManager = this.connectionFactory.create({ - // Pool connections are only used intermittently. - idleTimeout: 30_000, - maxSize: 2, - applicationName: getApplicationName() - }); } /** @@ -40,10 +34,12 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob * **This may be a bug in pgwire or how we're using it. */ async keepAlive() { - try { - await sendKeepAlive(this.connectionManager.pool); - } catch (e) { - this.logger.warn(`KeepAlive failed, unable to post to WAL`, e); + if (this.connectionManager) { + try { + await sendKeepAlive(this.connectionManager.pool); + } catch (e) { + this.logger.warn(`KeepAlive failed, unable to post to WAL`, e); + } } } @@ -92,6 +88,7 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob maxSize: 2, applicationName: getApplicationName() }); + this.connectionManager = connectionManager; try { await this.rateLimiter?.waitUntilAllowed({ signal: this.abortController.signal }); if (this.isStopped) { @@ -148,6 +145,7 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob this.rateLimiter?.reportError(e); } } finally { + this.connectionManager = null; await connectionManager.end(); } } From b74eea7f40778a814764457dc2f8af8a35d60c59 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 11 Nov 2025 13:48:29 +0200 Subject: [PATCH 03/15] Remove replicateLoop - let the replicator handle retries. --- .../replication/WalStreamReplicationJob.ts | 33 ++++++------------- 1 file changed, 10 insertions(+), 23 deletions(-) diff --git a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts index b94fb97e..af0bfc7c 100644 --- a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts +++ b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts @@ -49,7 +49,7 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob async replicate() { try { - await this.replicateLoop(); + await this.replicateOnce(); } catch (e) { // Fatal exception container.reporter.captureException(e, { @@ -68,16 +68,6 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob } } - async replicateLoop() { - while (!this.isStopped) { - await this.replicateOnce(); - - if (!this.isStopped) { - await new Promise((resolve) => setTimeout(resolve, 5000)); - } - } - } - async replicateOnce() { // New connections on every iteration (every error with retry), // otherwise we risk repeating errors related to the connection, @@ -132,18 +122,15 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob // Without this additional log, the cause would not be visible in the logs. this.logger.error(`cause`, e.cause); } - if (e instanceof MissingReplicationSlotError) { - throw e; - } else { - // Report the error if relevant, before retrying - container.reporter.captureException(e, { - metadata: { - replication_slot: this.slotName - } - }); - // This sets the retry delay - this.rateLimiter?.reportError(e); - } + // Report the error if relevant, before retrying + container.reporter.captureException(e, { + metadata: { + replication_slot: this.slotName + } + }); + // This sets the retry delay + this.rateLimiter?.reportError(e); + throw e; } finally { this.connectionManager = null; await connectionManager.end(); From 369aade2b53b881c003b4886c978017e36c61ec6 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 11 Nov 2025 14:19:55 +0200 Subject: [PATCH 04/15] Cleanup MongoManager instances. --- .../src/replication/ConnectionManagerFactory.ts | 13 +++++++++---- .../src/replication/MongoManager.ts | 15 ++++++++++----- .../src/replication/ConnectionManagerFactory.ts | 2 -- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/modules/module-mongodb/src/replication/ConnectionManagerFactory.ts b/modules/module-mongodb/src/replication/ConnectionManagerFactory.ts index 82be5bd3..bd8c649b 100644 --- a/modules/module-mongodb/src/replication/ConnectionManagerFactory.ts +++ b/modules/module-mongodb/src/replication/ConnectionManagerFactory.ts @@ -3,23 +3,28 @@ import { NormalizedMongoConnectionConfig } from '../types/types.js'; import { MongoManager } from './MongoManager.js'; export class ConnectionManagerFactory { - private readonly connectionManagers: MongoManager[]; + private readonly connectionManagers = new Set(); public readonly dbConnectionConfig: NormalizedMongoConnectionConfig; constructor(dbConnectionConfig: NormalizedMongoConnectionConfig) { this.dbConnectionConfig = dbConnectionConfig; - this.connectionManagers = []; } create() { const manager = new MongoManager(this.dbConnectionConfig); - this.connectionManagers.push(manager); + this.connectionManagers.add(manager); + + manager.registerListener({ + onEnded: () => { + this.connectionManagers.delete(manager); + } + }); return manager; } async shutdown() { logger.info('Shutting down MongoDB connection Managers...'); - for (const manager of this.connectionManagers) { + for (const manager of [...this.connectionManagers]) { await manager.end(); } logger.info('MongoDB connection Managers shutdown completed.'); diff --git a/modules/module-mongodb/src/replication/MongoManager.ts b/modules/module-mongodb/src/replication/MongoManager.ts index 3290c22c..7583ae46 100644 --- a/modules/module-mongodb/src/replication/MongoManager.ts +++ b/modules/module-mongodb/src/replication/MongoManager.ts @@ -2,11 +2,16 @@ import { mongo } from '@powersync/lib-service-mongodb'; import { NormalizedMongoConnectionConfig } from '../types/types.js'; import { BSON_DESERIALIZE_DATA_OPTIONS, POWERSYNC_VERSION } from '@powersync/service-core'; +import { BaseObserver } from '@powersync/lib-services-framework'; + +export interface MongoManagerListener { + onEnded(): void; +} /** * Manage a MongoDB source database connection. */ -export class MongoManager { +export class MongoManager extends BaseObserver { public readonly client: mongo.MongoClient; public readonly db: mongo.Db; @@ -14,6 +19,7 @@ export class MongoManager { public options: NormalizedMongoConnectionConfig, overrides?: mongo.MongoClientOptions ) { + super(); // The pool is lazy - no connections are opened until a query is performed. this.client = new mongo.MongoClient(options.uri, { auth: { @@ -59,9 +65,8 @@ export class MongoManager { async end(): Promise { await this.client.close(); - } - - async destroy() { - // TODO: Implement? + this.iterateListeners((listener) => { + listener.onEnded?.(); + }); } } diff --git a/modules/module-postgres/src/replication/ConnectionManagerFactory.ts b/modules/module-postgres/src/replication/ConnectionManagerFactory.ts index d7ef07c6..fb089751 100644 --- a/modules/module-postgres/src/replication/ConnectionManagerFactory.ts +++ b/modules/module-postgres/src/replication/ConnectionManagerFactory.ts @@ -17,12 +17,10 @@ export class ConnectionManagerFactory { create(poolOptions: PgPoolOptions) { const manager = new PgManager(this.dbConnectionConfig, { ...poolOptions, registry: this.registry }); - console.log('create ConnectionManagerFactory', this.connectionManagers.size); this.connectionManagers.add(manager); manager.registerListener({ onEnded: () => { - console.log('manager ended'); this.connectionManagers.delete(manager); } }); From 5f605142b677af20d2e975643c8d10df3ae59fa2 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 11 Nov 2025 14:32:24 +0200 Subject: [PATCH 05/15] Include lock expiration time in warning message. --- .../implementation/MongoSyncRulesLock.ts | 17 +++++++++++++---- .../src/storage/implementation/models.ts | 5 +++++ 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncRulesLock.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncRulesLock.ts index 0c5b7b30..ddd53cb4 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncRulesLock.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncRulesLock.ts @@ -33,10 +33,19 @@ export class MongoSyncRulesLock implements storage.ReplicationLock { ); if (doc == null) { - throw new ServiceError( - ErrorCode.PSYNC_S1003, - `Sync rules: ${sync_rules.id} have been locked by another process for replication.` - ); + // Query the existing lock to get the expiration time (best effort - it may have been released in the meantime). + const heldLock = await db.sync_rules.findOne({ _id: sync_rules.id }, { projection: { lock: 1 } }); + if (heldLock?.lock?.expires_at) { + throw new ServiceError( + ErrorCode.PSYNC_S1003, + `Sync rules: ${sync_rules.id} have been locked by another process for replication, expiring at ${heldLock.lock.expires_at.toISOString()}.` + ); + } else { + throw new ServiceError( + ErrorCode.PSYNC_S1003, + `Sync rules: ${sync_rules.id} have been locked by another process for replication.` + ); + } } return new MongoSyncRulesLock(db, sync_rules.id, lockId); } diff --git a/modules/module-mongodb-storage/src/storage/implementation/models.ts b/modules/module-mongodb-storage/src/storage/implementation/models.ts index 6e00d6f2..2a6ffc20 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/models.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/models.ts @@ -197,6 +197,11 @@ export interface SyncRuleDocument { last_fatal_error: string | null; content: string; + + lock?: { + id: string; + expires_at: Date; + } | null; } export interface CheckpointEventDocument { From 3f216ce7fe73d248604925b78b00690cee5a6325 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 11 Nov 2025 14:34:08 +0200 Subject: [PATCH 06/15] Lazy-instantiate checksum caches to reduce replication memory usage. --- .../storage/implementation/MongoChecksums.ts | 20 ++++++++++++++----- .../src/storage/PostgresSyncRulesStorage.ts | 20 ++++++++++++++----- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts index 9238a602..b4b8aac5 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts @@ -42,11 +42,7 @@ const DEFAULT_OPERATION_BATCH_LIMIT = 50_000; * 4. computePartialChecksumsInternal() -> aggregate over 50_000 operations in bucket_data at a time */ export class MongoChecksums { - private cache = new ChecksumCache({ - fetchChecksums: (batch) => { - return this.computePartialChecksums(batch); - } - }); + private _cache: ChecksumCache | undefined; constructor( private db: PowerSyncMongo, @@ -54,6 +50,20 @@ export class MongoChecksums { private options?: MongoChecksumOptions ) {} + /** + * Lazy-instantiated cache. + * + * This means the cache only allocates memory once it is used for the first time. + */ + private get cache(): ChecksumCache { + this._cache ??= new ChecksumCache({ + fetchChecksums: (batch) => { + return this.computePartialChecksums(batch); + } + }); + return this._cache; + } + /** * Calculate checksums, utilizing the cache for partial checkums, and querying the remainder from * the database (bucket_state + bucket_data). diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index 23498e2f..73600b0b 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -61,11 +61,7 @@ export class PostgresSyncRulesStorage // TODO we might be able to share this in an abstract class private parsedSyncRulesCache: { parsed: sync_rules.SqlSyncRules; options: storage.ParseSyncRulesOptions } | undefined; - private checksumCache = new storage.ChecksumCache({ - fetchChecksums: (batch) => { - return this.getChecksumsInternal(batch); - } - }); + private _checksumCache: storage.ChecksumCache | undefined; constructor(protected options: PostgresSyncRulesStorageOptions) { super(); @@ -81,6 +77,20 @@ export class PostgresSyncRulesStorage }); } + /** + * Lazy-instantiated cache. + * + * This means the cache only allocates memory once it is used for the first time. + */ + private get checksumCache(): storage.ChecksumCache { + this._checksumCache ??= new storage.ChecksumCache({ + fetchChecksums: (batch) => { + return this.getChecksumsInternal(batch); + } + }); + return this._checksumCache; + } + get writeCheckpointMode(): storage.WriteCheckpointMode { return this.writeCheckpointAPI.writeCheckpointMode; } From 15cb31e8e6aa51d19cad7bbdb5d52f288f1d3e36 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 11 Nov 2025 14:39:19 +0200 Subject: [PATCH 07/15] MongoDB: Remove replicateLoop - let the replicator handle retries. --- .../replication/ChangeStreamReplicationJob.ts | 51 +++++-------------- 1 file changed, 12 insertions(+), 39 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts index ff47c159..c8bdaa6b 100644 --- a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts +++ b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts @@ -27,19 +27,21 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ // Nothing needed here } - private get slotName() { - return this.options.storage.slot_name; - } - async replicate() { try { - await this.replicateLoop(); + await this.replicateOnce(); } catch (e) { - // Fatal exception - container.reporter.captureException(e, { - metadata: {} - }); - this.logger.error(`Replication failed`, e); + if (!this.abortController.signal.aborted) { + container.reporter.captureException(e, { + metadata: {} + }); + + this.logger.error(`Replication error`, e); + if (e.cause != null) { + // Without this additional log, the cause may not be visible in the logs. + this.logger.error(`cause`, e.cause); + } + } if (e instanceof ChangeStreamInvalidatedError) { // This stops replication and restarts with a new instance @@ -50,16 +52,6 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ } } - async replicateLoop() { - while (!this.isStopped) { - await this.replicateOnce(); - - if (!this.isStopped) { - await new Promise((resolve) => setTimeout(resolve, 5000)); - } - } - } - async replicateOnce() { // New connections on every iteration (every error with retry), // otherwise we risk repeating errors related to the connection, @@ -79,25 +71,6 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ }); this.lastStream = stream; await stream.replicate(); - } catch (e) { - if (this.abortController.signal.aborted) { - return; - } - this.logger.error(`Replication error`, e); - if (e.cause != null) { - // Without this additional log, the cause may not be visible in the logs. - this.logger.error(`cause`, e.cause); - } - if (e instanceof ChangeStreamInvalidatedError) { - throw e; - } else { - // Report the error if relevant, before retrying - container.reporter.captureException(e, { - metadata: {} - }); - // This sets the retry delay - this.rateLimiter?.reportError(e); - } } finally { await connectionManager.end(); } From ad7dc6a8e349f233eea0037232f212189832354d Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 11 Nov 2025 14:43:08 +0200 Subject: [PATCH 08/15] MySQL: Remove replicateLoop - let the replicator handle retries. --- .../src/replication/BinLogReplicationJob.ts | 54 ++++++------------- 1 file changed, 16 insertions(+), 38 deletions(-) diff --git a/modules/module-mysql/src/replication/BinLogReplicationJob.ts b/modules/module-mysql/src/replication/BinLogReplicationJob.ts index 06f5dfb9..ec8a0fc5 100644 --- a/modules/module-mysql/src/replication/BinLogReplicationJob.ts +++ b/modules/module-mysql/src/replication/BinLogReplicationJob.ts @@ -27,30 +27,29 @@ export class BinLogReplicationJob extends replication.AbstractReplicationJob { async replicate() { try { - await this.replicateLoop(); + await this.replicateOnce(); } catch (e) { - // Fatal exception - container.reporter.captureException(e, { - metadata: { - replication_slot: this.slot_name + if (!this.abortController.signal.aborted) { + this.logger.error(`Replication error`, e); + if (e.cause != null) { + this.logger.error(`cause`, e.cause); } - }); - this.logger.error(`Replication failed`, e); + // Report the error if relevant, before retrying + container.reporter.captureException(e, { + metadata: { + replication_slot: this.slot_name + } + }); + // This sets the retry delay + this.rateLimiter?.reportError(e); + } + + throw e; } finally { this.abortController.abort(); } } - async replicateLoop() { - while (!this.isStopped) { - await this.replicateOnce(); - - if (!this.isStopped) { - await new Promise((resolve) => setTimeout(resolve, 5000)); - } - } - } - async replicateOnce() { // New connections on every iteration (every error with retry), // otherwise we risk repeating errors related to the connection, @@ -84,27 +83,6 @@ export class BinLogReplicationJob extends replication.AbstractReplicationJob { }); this.lastStream = stream; await stream.replicate(); - } catch (e) { - if (this.abortController.signal.aborted) { - return; - } - this.logger.error(`Replication error`, e); - if (e.cause != null) { - this.logger.error(`cause`, e.cause); - } - - if (e instanceof BinlogConfigurationError) { - throw e; - } else { - // Report the error if relevant, before retrying - container.reporter.captureException(e, { - metadata: { - replication_slot: this.slot_name - } - }); - // This sets the retry delay - this.rateLimiter?.reportError(e); - } } finally { await connectionManager.end(); } From c21cf9a26a29f69c8c00becdc6a8e4c47e5d4408 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 11 Nov 2025 14:47:25 +0200 Subject: [PATCH 09/15] Cleanup MySQLConnectionManager instances. --- .../src/replication/MySQLConnectionManager.ts | 39 ++++++++++++------- .../MySQLConnectionManagerFactory.ts | 13 +++++-- 2 files changed, 34 insertions(+), 18 deletions(-) diff --git a/modules/module-mysql/src/replication/MySQLConnectionManager.ts b/modules/module-mysql/src/replication/MySQLConnectionManager.ts index b648ab26..badb55f2 100644 --- a/modules/module-mysql/src/replication/MySQLConnectionManager.ts +++ b/modules/module-mysql/src/replication/MySQLConnectionManager.ts @@ -2,10 +2,14 @@ import { NormalizedMySQLConnectionConfig } from '../types/types.js'; import mysqlPromise from 'mysql2/promise'; import mysql, { FieldPacket, RowDataPacket } from 'mysql2'; import * as mysql_utils from '../utils/mysql-utils.js'; -import { logger } from '@powersync/lib-services-framework'; +import { BaseObserver, logger } from '@powersync/lib-services-framework'; import { ZongJi } from '@powersync/mysql-zongji'; -export class MySQLConnectionManager { +export interface MySQLConnectionManagerListener { + onEnded(): void; +} + +export class MySQLConnectionManager extends BaseObserver { /** * Pool that can create streamable connections */ @@ -23,6 +27,7 @@ export class MySQLConnectionManager { public options: NormalizedMySQLConnectionConfig, public poolOptions: mysqlPromise.PoolOptions ) { + super(); // The pool is lazy - no connections are opened until a query is performed. this.pool = mysql_utils.createPool(options, poolOptions); this.promisePool = this.pool.promise(); @@ -98,18 +103,24 @@ export class MySQLConnectionManager { } async end(): Promise { - if (!this.isClosed) { - for (const listener of this.binlogListeners) { - listener.stop(); - } - - try { - await this.promisePool.end(); - this.isClosed = true; - } catch (error) { - // We don't particularly care if any errors are thrown when shutting down the pool - logger.warn('Error shutting down MySQL connection pool', error); - } + if (this.isClosed) { + return; + } + + for (const listener of this.binlogListeners) { + listener.stop(); + } + + try { + await this.promisePool.end(); + } catch (error) { + // We don't particularly care if any errors are thrown when shutting down the pool + logger.warn('Error shutting down MySQL connection pool', error); + } finally { + this.isClosed = true; + this.iterateListeners((listener) => { + listener.onEnded?.(); + }); } } } diff --git a/modules/module-mysql/src/replication/MySQLConnectionManagerFactory.ts b/modules/module-mysql/src/replication/MySQLConnectionManagerFactory.ts index d817eb85..ca28a1a9 100644 --- a/modules/module-mysql/src/replication/MySQLConnectionManagerFactory.ts +++ b/modules/module-mysql/src/replication/MySQLConnectionManagerFactory.ts @@ -4,23 +4,28 @@ import { MySQLConnectionManager } from './MySQLConnectionManager.js'; import { ResolvedConnectionConfig } from '../types/types.js'; export class MySQLConnectionManagerFactory { - private readonly connectionManagers: MySQLConnectionManager[]; + private readonly connectionManagers = new Set(); public readonly connectionConfig: ResolvedConnectionConfig; constructor(connectionConfig: ResolvedConnectionConfig) { this.connectionConfig = connectionConfig; - this.connectionManagers = []; } create(poolOptions: mysql.PoolOptions) { const manager = new MySQLConnectionManager(this.connectionConfig, poolOptions); - this.connectionManagers.push(manager); + this.connectionManagers.add(manager); + + manager.registerListener({ + onEnded: () => { + this.connectionManagers.delete(manager); + } + }); return manager; } async shutdown() { logger.info('Shutting down MySQL connection Managers...'); - for (const manager of this.connectionManagers) { + for (const manager of [...this.connectionManagers]) { await manager.end(); } logger.info('MySQL connection Managers shutdown completed.'); From 617af005e42be5adf895fb97eccf2f942108a04c Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 11 Nov 2025 14:56:28 +0200 Subject: [PATCH 10/15] More consistent error handling. --- .../replication/ChangeStreamReplicationJob.ts | 2 + .../src/replication/BinLogReplicationJob.ts | 2 +- .../replication/WalStreamReplicationJob.ts | 81 +++++++++---------- 3 files changed, 41 insertions(+), 44 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts index c8bdaa6b..9883764a 100644 --- a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts +++ b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts @@ -47,6 +47,8 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ // This stops replication and restarts with a new instance await this.options.storage.factory.restartReplication(this.storage.group_id); } + + // No need to rethrow - the error is already logged, and retry behavior is the same on error } finally { this.abortController.abort(); } diff --git a/modules/module-mysql/src/replication/BinLogReplicationJob.ts b/modules/module-mysql/src/replication/BinLogReplicationJob.ts index ec8a0fc5..bd6c07c5 100644 --- a/modules/module-mysql/src/replication/BinLogReplicationJob.ts +++ b/modules/module-mysql/src/replication/BinLogReplicationJob.ts @@ -44,7 +44,7 @@ export class BinLogReplicationJob extends replication.AbstractReplicationJob { this.rateLimiter?.reportError(e); } - throw e; + // No need to rethrow - the error is already logged, and retry behavior is the same on error } finally { this.abortController.abort(); } diff --git a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts index af0bfc7c..087ec59f 100644 --- a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts +++ b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts @@ -52,17 +52,50 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob await this.replicateOnce(); } catch (e) { // Fatal exception - container.reporter.captureException(e, { - metadata: { - replication_slot: this.slotName + + if (!this.isStopped) { + // Ignore aborted errors + + this.logger.error(`Replication error`, e); + if (e.cause != null) { + // Example: + // PgError.conn_ended: Unable to do postgres query on ended connection + // at PgConnection.stream (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:315:13) + // at stream.next () + // at PgResult.fromStream (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:1174:22) + // at PgConnection.query (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:311:21) + // at WalStream.startInitialReplication (file:///.../powersync/powersync-service/lib/replication/WalStream.js:266:22) + // ... + // cause: TypeError: match is not iterable + // at timestamptzToSqlite (file:///.../powersync/packages/jpgwire/dist/util.js:140:50) + // at PgType.decode (file:///.../powersync/packages/jpgwire/dist/pgwire_types.js:25:24) + // at PgConnection._recvDataRow (file:///.../powersync/packages/jpgwire/dist/util.js:88:22) + // at PgConnection._recvMessages (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:656:30) + // at PgConnection._ioloopAttempt (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:563:20) + // at process.processTicksAndRejections (node:internal/process/task_queues:95:5) + // at async PgConnection._ioloop (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:517:14), + // [Symbol(pg.ErrorCode)]: 'conn_ended', + // [Symbol(pg.ErrorResponse)]: undefined + // } + // Without this additional log, the cause would not be visible in the logs. + this.logger.error(`cause`, e.cause); } - }); - this.logger.error(`Replication failed`, e); + // Report the error if relevant, before retrying + container.reporter.captureException(e, { + metadata: { + replication_slot: this.slotName + } + }); + // This sets the retry delay + this.rateLimiter?.reportError(e); + } if (e instanceof MissingReplicationSlotError) { // This stops replication on this slot and restarts with a new slot await this.options.storage.factory.restartReplication(this.storage.group_id); } + + // No need to rethrow - the error is already logged, and retry behavior is the same on error } finally { this.abortController.abort(); } @@ -93,44 +126,6 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob }); this.lastStream = stream; await stream.replicate(); - } catch (e) { - if (this.isStopped && e instanceof ReplicationAbortedError) { - // Ignore aborted errors - return; - } - this.logger.error(`Replication error`, e); - if (e.cause != null) { - // Example: - // PgError.conn_ended: Unable to do postgres query on ended connection - // at PgConnection.stream (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:315:13) - // at stream.next () - // at PgResult.fromStream (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:1174:22) - // at PgConnection.query (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:311:21) - // at WalStream.startInitialReplication (file:///.../powersync/powersync-service/lib/replication/WalStream.js:266:22) - // ... - // cause: TypeError: match is not iterable - // at timestamptzToSqlite (file:///.../powersync/packages/jpgwire/dist/util.js:140:50) - // at PgType.decode (file:///.../powersync/packages/jpgwire/dist/pgwire_types.js:25:24) - // at PgConnection._recvDataRow (file:///.../powersync/packages/jpgwire/dist/util.js:88:22) - // at PgConnection._recvMessages (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:656:30) - // at PgConnection._ioloopAttempt (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:563:20) - // at process.processTicksAndRejections (node:internal/process/task_queues:95:5) - // at async PgConnection._ioloop (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:517:14), - // [Symbol(pg.ErrorCode)]: 'conn_ended', - // [Symbol(pg.ErrorResponse)]: undefined - // } - // Without this additional log, the cause would not be visible in the logs. - this.logger.error(`cause`, e.cause); - } - // Report the error if relevant, before retrying - container.reporter.captureException(e, { - metadata: { - replication_slot: this.slotName - } - }); - // This sets the retry delay - this.rateLimiter?.reportError(e); - throw e; } finally { this.connectionManager = null; await connectionManager.end(); From b62796fd6ffb8cd7aa27c1595aeeb47c72bb1f29 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 11 Nov 2025 15:03:22 +0200 Subject: [PATCH 11/15] Use a single MongoErrorRateLimiter, and properly report errors. --- .../src/replication/ChangeStreamReplicationJob.ts | 2 ++ .../module-mongodb/src/replication/ChangeStreamReplicator.ts | 2 +- .../module-mongodb/src/replication/MongoErrorRateLimiter.ts | 4 ++-- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts index 9883764a..21613809 100644 --- a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts +++ b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts @@ -41,6 +41,8 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ // Without this additional log, the cause may not be visible in the logs. this.logger.error(`cause`, e.cause); } + + this.rateLimiter.reportError(e); } if (e instanceof ChangeStreamInvalidatedError) { diff --git a/modules/module-mongodb/src/replication/ChangeStreamReplicator.ts b/modules/module-mongodb/src/replication/ChangeStreamReplicator.ts index 5140ae5d..2fca7aec 100644 --- a/modules/module-mongodb/src/replication/ChangeStreamReplicator.ts +++ b/modules/module-mongodb/src/replication/ChangeStreamReplicator.ts @@ -25,7 +25,7 @@ export class ChangeStreamReplicator extends replication.AbstractReplicator { const delay = Math.max(0, this.nextAllowed - Date.now()); - // Minimum delay between connections, even without errors + // Minimum delay between connections, even without errors (for the next attempt) this.setDelay(500); await setTimeout(delay, undefined, { signal: options?.signal }); } @@ -18,7 +18,7 @@ export class MongoErrorRateLimiter implements ErrorRateLimiter { reportError(e: any): void { // FIXME: Check mongodb-specific requirements const message = (e.message as string) ?? ''; - if (message.includes('password authentication failed')) { + if (message.includes('Authentication failed')) { // Wait 15 minutes, to avoid triggering Supabase's fail2ban this.setDelay(900_000); } else if (message.includes('ENOTFOUND')) { From 9059e8fb8b621c2b507573a9daf541042fb7f1e4 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 11 Nov 2025 15:10:56 +0200 Subject: [PATCH 12/15] More error handling tweaks. --- .../src/replication/MongoErrorRateLimiter.ts | 10 +++++++--- .../src/replication/BinLogReplicationJob.ts | 2 +- .../src/replication/PostgresErrorRateLimiter.ts | 6 +++++- .../src/replication/WalStreamReplicationJob.ts | 2 +- 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/modules/module-mongodb/src/replication/MongoErrorRateLimiter.ts b/modules/module-mongodb/src/replication/MongoErrorRateLimiter.ts index e1484d1a..86d26aae 100644 --- a/modules/module-mongodb/src/replication/MongoErrorRateLimiter.ts +++ b/modules/module-mongodb/src/replication/MongoErrorRateLimiter.ts @@ -1,5 +1,6 @@ import { ErrorRateLimiter } from '@powersync/service-core'; import { setTimeout } from 'timers/promises'; +import { ChangeStreamInvalidatedError } from './ChangeStream.js'; export class MongoErrorRateLimiter implements ErrorRateLimiter { nextAllowed: number = Date.now(); @@ -18,9 +19,12 @@ export class MongoErrorRateLimiter implements ErrorRateLimiter { reportError(e: any): void { // FIXME: Check mongodb-specific requirements const message = (e.message as string) ?? ''; - if (message.includes('Authentication failed')) { - // Wait 15 minutes, to avoid triggering Supabase's fail2ban - this.setDelay(900_000); + if (e instanceof ChangeStreamInvalidatedError) { + // Short delay + this.setDelay(2_000); + } else if (message.includes('Authentication failed')) { + // Wait 2 minutes, to avoid triggering too many authentication attempts + this.setDelay(120_000); } else if (message.includes('ENOTFOUND')) { // DNS lookup issue - incorrect URI or deleted instance this.setDelay(120_000); diff --git a/modules/module-mysql/src/replication/BinLogReplicationJob.ts b/modules/module-mysql/src/replication/BinLogReplicationJob.ts index bd6c07c5..32c2371b 100644 --- a/modules/module-mysql/src/replication/BinLogReplicationJob.ts +++ b/modules/module-mysql/src/replication/BinLogReplicationJob.ts @@ -41,7 +41,7 @@ export class BinLogReplicationJob extends replication.AbstractReplicationJob { } }); // This sets the retry delay - this.rateLimiter?.reportError(e); + this.rateLimiter.reportError(e); } // No need to rethrow - the error is already logged, and retry behavior is the same on error diff --git a/modules/module-postgres/src/replication/PostgresErrorRateLimiter.ts b/modules/module-postgres/src/replication/PostgresErrorRateLimiter.ts index 9a86a070..a356ca6f 100644 --- a/modules/module-postgres/src/replication/PostgresErrorRateLimiter.ts +++ b/modules/module-postgres/src/replication/PostgresErrorRateLimiter.ts @@ -1,5 +1,6 @@ import { setTimeout } from 'timers/promises'; import { ErrorRateLimiter } from '@powersync/service-core'; +import { MissingReplicationSlotError } from './WalStream.js'; export class PostgresErrorRateLimiter implements ErrorRateLimiter { nextAllowed: number = Date.now(); @@ -17,7 +18,10 @@ export class PostgresErrorRateLimiter implements ErrorRateLimiter { reportError(e: any): void { const message = (e.message as string) ?? ''; - if (message.includes('password authentication failed')) { + if (e instanceof MissingReplicationSlotError) { + // Short delay for a retrying (re-creating the slot) + this.setDelay(2_000); + } else if (message.includes('password authentication failed')) { // Wait 15 minutes, to avoid triggering Supabase's fail2ban this.setDelay(900_000); } else if (message.includes('ENOTFOUND')) { diff --git a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts index 087ec59f..8b6021cd 100644 --- a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts +++ b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts @@ -87,7 +87,7 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob } }); // This sets the retry delay - this.rateLimiter?.reportError(e); + this.rateLimiter.reportError(e); } if (e instanceof MissingReplicationSlotError) { From 0e0a6edf33cfe741d0a8228228c458a0b0e82e1b Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 11 Nov 2025 15:20:01 +0200 Subject: [PATCH 13/15] Fix test. --- modules/module-mongodb/test/src/change_stream_utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/module-mongodb/test/src/change_stream_utils.ts b/modules/module-mongodb/test/src/change_stream_utils.ts index b79d12fb..46801ff8 100644 --- a/modules/module-mongodb/test/src/change_stream_utils.ts +++ b/modules/module-mongodb/test/src/change_stream_utils.ts @@ -60,7 +60,7 @@ export class ChangeStreamTestContext { async dispose() { this.abortController.abort(); await this.streamPromise?.catch((e) => e); - await this.connectionManager.destroy(); + await this.connectionManager.end(); await this.factory[Symbol.asyncDispose](); } From 6143c0afd25f2325bb5e4bf1b1c62a7f57848e1b Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 11 Nov 2025 15:26:53 +0200 Subject: [PATCH 14/15] Changeset. --- .changeset/nervous-dots-behave.md | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 .changeset/nervous-dots-behave.md diff --git a/.changeset/nervous-dots-behave.md b/.changeset/nervous-dots-behave.md new file mode 100644 index 00000000..878c268c --- /dev/null +++ b/.changeset/nervous-dots-behave.md @@ -0,0 +1,11 @@ +--- +'@powersync/service-module-postgres-storage': patch +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-module-postgres': patch +'@powersync/service-module-mongodb': patch +'@powersync/service-core': patch +'@powersync/service-module-mysql': patch +'@powersync/service-image': patch +--- + +Fix memory leaks when retrying replication after errors. From c3d8ca3177f28acd47caa6d6f1e85a4104c6fdf6 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 11 Nov 2025 15:59:39 +0200 Subject: [PATCH 15/15] Fix test issues after changing context dispose logic. --- .../test/src/change_stream_utils.ts | 11 +- .../test/src/resuming_snapshots.test.ts | 201 +++++++++--------- 2 files changed, 114 insertions(+), 98 deletions(-) diff --git a/modules/module-mongodb/test/src/change_stream_utils.ts b/modules/module-mongodb/test/src/change_stream_utils.ts index 46801ff8..12243564 100644 --- a/modules/module-mongodb/test/src/change_stream_utils.ts +++ b/modules/module-mongodb/test/src/change_stream_utils.ts @@ -57,11 +57,18 @@ export class ChangeStreamTestContext { initializeCoreReplicationMetrics(METRICS_HELPER.metricsEngine); } - async dispose() { + /** + * Abort snapshot and/or replication, without actively closing connections. + */ + abort() { this.abortController.abort(); + } + + async dispose() { + this.abort(); await this.streamPromise?.catch((e) => e); - await this.connectionManager.end(); await this.factory[Symbol.asyncDispose](); + await this.connectionManager.end(); } async [Symbol.asyncDispose]() { diff --git a/modules/module-mongodb/test/src/resuming_snapshots.test.ts b/modules/module-mongodb/test/src/resuming_snapshots.test.ts index 589c2a55..ff06f6d3 100644 --- a/modules/module-mongodb/test/src/resuming_snapshots.test.ts +++ b/modules/module-mongodb/test/src/resuming_snapshots.test.ts @@ -1,11 +1,11 @@ -import { describe, expect, test } from 'vitest'; -import { env } from './env.js'; -import { describeWithStorage } from './util.js'; import { TestStorageFactory } from '@powersync/service-core'; import { METRICS_HELPER } from '@powersync/service-core-tests'; import { ReplicationMetric } from '@powersync/service-types'; import * as timers from 'node:timers/promises'; +import { describe, expect, test } from 'vitest'; import { ChangeStreamTestContext } from './change_stream_utils.js'; +import { env } from './env.js'; +import { describeWithStorage } from './util.js'; describe.skipIf(!(env.CI || env.SLOW_TESTS))('batch replication', function () { describeWithStorage({ timeout: 240_000 }, function (factory) { @@ -32,107 +32,116 @@ async function testResumingReplication(factory: TestStorageFactory, stopAfter: n // have been / have not been replicated at that point is not deterministic. // We do allow for some variation in the test results to account for this. - await using context = await ChangeStreamTestContext.open(factory, { streamOptions: { snapshotChunkLength: 1000 } }); + let startRowCount: number; + + { + await using context = await ChangeStreamTestContext.open(factory, { streamOptions: { snapshotChunkLength: 1000 } }); - await context.updateSyncRules(`bucket_definitions: + await context.updateSyncRules(`bucket_definitions: global: data: - SELECT _id as id, description FROM test_data1 - SELECT _id as id, description FROM test_data2`); - const { db } = context; - - let batch = db.collection('test_data1').initializeUnorderedBulkOp(); - for (let i = 1; i <= 1000; i++) { - batch.insert({ _id: i, description: 'foo' }); - } - await batch.execute(); - batch = db.collection('test_data2').initializeUnorderedBulkOp(); - for (let i = 1; i <= 10000; i++) { - batch.insert({ _id: i, description: 'foo' }); - } - await batch.execute(); - - const p = context.replicateSnapshot(); - - let done = false; - - const startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; - try { - (async () => { - while (!done) { - const count = - ((await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0) - startRowCount; - - if (count >= stopAfter) { - break; + const { db } = context; + + let batch = db.collection('test_data1').initializeUnorderedBulkOp(); + for (let i = 1; i <= 1000; i++) { + batch.insert({ _id: i, description: 'foo' }); + } + await batch.execute(); + batch = db.collection('test_data2').initializeUnorderedBulkOp(); + for (let i = 1; i <= 10000; i++) { + batch.insert({ _id: i, description: 'foo' }); + } + await batch.execute(); + + const p = context.replicateSnapshot().catch((e) => ({ error: e })); + + let done = false; + + startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; + try { + (async () => { + while (!done) { + const count = + ((await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0) - startRowCount; + + if (count >= stopAfter) { + break; + } + await timers.setTimeout(1); } - await timers.setTimeout(1); - } - // This interrupts initial replication - await context.dispose(); - })(); - // This confirms that initial replication was interrupted - await expect(p).rejects.toThrowError(); - done = true; - } finally { - done = true; + // This interrupts initial replication + // We don't dispose the context here yet, since closing the database connection while in use + // results in unpredictable error conditions. + context.abort(); + })(); + // This confirms that initial replication was interrupted + await expect(await p).haveOwnProperty('error'); + } finally { + done = true; + } } - // Bypass the usual "clear db on factory open" step. - await using context2 = await ChangeStreamTestContext.open(factory, { - doNotClear: true, - streamOptions: { snapshotChunkLength: 1000 } - }); + { + // Bypass the usual "clear db on factory open" step. + await using context2 = await ChangeStreamTestContext.open(factory, { + doNotClear: true, + streamOptions: { snapshotChunkLength: 1000 } + }); - // This delete should be using one of the ids already replicated - await db.collection('test_data2').deleteOne({ _id: 1 as any }); - await db.collection('test_data2').updateOne({ _id: 2 as any }, { $set: { description: 'update1' } }); - await db.collection('test_data2').insertOne({ _id: 10001 as any, description: 'insert1' }); - - await context2.loadNextSyncRules(); - await context2.replicateSnapshot(); - - context2.startStreaming(); - const data = await context2.getBucketData('global[]', undefined, {}); - - const deletedRowOps = data.filter((row) => row.object_type == 'test_data2' && row.object_id === '1'); - const updatedRowOps = data.filter((row) => row.object_type == 'test_data2' && row.object_id === '2'); - const insertedRowOps = data.filter((row) => row.object_type == 'test_data2' && row.object_id === '10001'); - - if (deletedRowOps.length != 0) { - // The deleted row was part of the first replication batch, - // so it is removed by streaming replication. - expect(deletedRowOps.length).toEqual(2); - expect(deletedRowOps[1].op).toEqual('REMOVE'); - } else { - // The deleted row was not part of the first replication batch, - // so it's not in the resulting ops at all. + const { db } = context2; + + // This delete should be using one of the ids already replicated + await db.collection('test_data2').deleteOne({ _id: 1 as any }); + await db.collection('test_data2').updateOne({ _id: 2 as any }, { $set: { description: 'update1' } }); + await db.collection('test_data2').insertOne({ _id: 10001 as any, description: 'insert1' }); + + await context2.loadNextSyncRules(); + await context2.replicateSnapshot(); + + context2.startStreaming(); + const data = await context2.getBucketData('global[]', undefined, {}); + + const deletedRowOps = data.filter((row) => row.object_type == 'test_data2' && row.object_id === '1'); + const updatedRowOps = data.filter((row) => row.object_type == 'test_data2' && row.object_id === '2'); + const insertedRowOps = data.filter((row) => row.object_type == 'test_data2' && row.object_id === '10001'); + + if (deletedRowOps.length != 0) { + // The deleted row was part of the first replication batch, + // so it is removed by streaming replication. + expect(deletedRowOps.length).toEqual(2); + expect(deletedRowOps[1].op).toEqual('REMOVE'); + } else { + // The deleted row was not part of the first replication batch, + // so it's not in the resulting ops at all. + } + + expect(updatedRowOps.length).toEqual(2); + // description for the first op could be 'foo' or 'update1'. + // We only test the final version. + expect(JSON.parse(updatedRowOps[1].data as string).description).toEqual('update1'); + + expect(insertedRowOps.length).toEqual(2); + expect(JSON.parse(insertedRowOps[0].data as string).description).toEqual('insert1'); + expect(JSON.parse(insertedRowOps[1].data as string).description).toEqual('insert1'); + + // 1000 of test_data1 during first replication attempt. + // N >= 1000 of test_data2 during first replication attempt. + // 10000 - N - 1 + 1 of test_data2 during second replication attempt. + // An additional update during streaming replication (2x total for this row). + // An additional insert during streaming replication (2x total for this row). + // If the deleted row was part of the first replication batch, it's removed by streaming replication. + // This adds 2 ops. + // We expect this to be 11002 for stopAfter: 2000, and 11004 for stopAfter: 8000. + // However, this is not deterministic. + const expectedCount = 11002 + deletedRowOps.length; + expect(data.length).toEqual(expectedCount); + + const replicatedCount = + ((await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0) - startRowCount; + + // With resumable replication, there should be no need to re-replicate anything. + expect(replicatedCount).toEqual(expectedCount); } - - expect(updatedRowOps.length).toEqual(2); - // description for the first op could be 'foo' or 'update1'. - // We only test the final version. - expect(JSON.parse(updatedRowOps[1].data as string).description).toEqual('update1'); - - expect(insertedRowOps.length).toEqual(2); - expect(JSON.parse(insertedRowOps[0].data as string).description).toEqual('insert1'); - expect(JSON.parse(insertedRowOps[1].data as string).description).toEqual('insert1'); - - // 1000 of test_data1 during first replication attempt. - // N >= 1000 of test_data2 during first replication attempt. - // 10000 - N - 1 + 1 of test_data2 during second replication attempt. - // An additional update during streaming replication (2x total for this row). - // An additional insert during streaming replication (2x total for this row). - // If the deleted row was part of the first replication batch, it's removed by streaming replication. - // This adds 2 ops. - // We expect this to be 11002 for stopAfter: 2000, and 11004 for stopAfter: 8000. - // However, this is not deterministic. - const expectedCount = 11002 + deletedRowOps.length; - expect(data.length).toEqual(expectedCount); - - const replicatedCount = - ((await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0) - startRowCount; - - // With resumable replication, there should be no need to re-replicate anything. - expect(replicatedCount).toEqual(expectedCount); }