diff --git a/.changeset/nervous-dots-behave.md b/.changeset/nervous-dots-behave.md new file mode 100644 index 000000000..878c268c8 --- /dev/null +++ b/.changeset/nervous-dots-behave.md @@ -0,0 +1,11 @@ +--- +'@powersync/service-module-postgres-storage': patch +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-module-postgres': patch +'@powersync/service-module-mongodb': patch +'@powersync/service-core': patch +'@powersync/service-module-mysql': patch +'@powersync/service-image': patch +--- + +Fix memory leaks when retrying replication after errors. diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts index 9238a6022..b4b8aac53 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts @@ -42,11 +42,7 @@ const DEFAULT_OPERATION_BATCH_LIMIT = 50_000; * 4. computePartialChecksumsInternal() -> aggregate over 50_000 operations in bucket_data at a time */ export class MongoChecksums { - private cache = new ChecksumCache({ - fetchChecksums: (batch) => { - return this.computePartialChecksums(batch); - } - }); + private _cache: ChecksumCache | undefined; constructor( private db: PowerSyncMongo, @@ -54,6 +50,20 @@ export class MongoChecksums { private options?: MongoChecksumOptions ) {} + /** + * Lazy-instantiated cache. + * + * This means the cache only allocates memory once it is used for the first time. + */ + private get cache(): ChecksumCache { + this._cache ??= new ChecksumCache({ + fetchChecksums: (batch) => { + return this.computePartialChecksums(batch); + } + }); + return this._cache; + } + /** * Calculate checksums, utilizing the cache for partial checkums, and querying the remainder from * the database (bucket_state + bucket_data). diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncRulesLock.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncRulesLock.ts index 0c5b7b30e..ddd53cb47 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncRulesLock.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncRulesLock.ts @@ -33,10 +33,19 @@ export class MongoSyncRulesLock implements storage.ReplicationLock { ); if (doc == null) { - throw new ServiceError( - ErrorCode.PSYNC_S1003, - `Sync rules: ${sync_rules.id} have been locked by another process for replication.` - ); + // Query the existing lock to get the expiration time (best effort - it may have been released in the meantime). + const heldLock = await db.sync_rules.findOne({ _id: sync_rules.id }, { projection: { lock: 1 } }); + if (heldLock?.lock?.expires_at) { + throw new ServiceError( + ErrorCode.PSYNC_S1003, + `Sync rules: ${sync_rules.id} have been locked by another process for replication, expiring at ${heldLock.lock.expires_at.toISOString()}.` + ); + } else { + throw new ServiceError( + ErrorCode.PSYNC_S1003, + `Sync rules: ${sync_rules.id} have been locked by another process for replication.` + ); + } } return new MongoSyncRulesLock(db, sync_rules.id, lockId); } diff --git a/modules/module-mongodb-storage/src/storage/implementation/models.ts b/modules/module-mongodb-storage/src/storage/implementation/models.ts index 6e00d6f2d..2a6ffc20d 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/models.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/models.ts @@ -197,6 +197,11 @@ export interface SyncRuleDocument { last_fatal_error: string | null; content: string; + + lock?: { + id: string; + expires_at: Date; + } | null; } export interface CheckpointEventDocument { diff --git a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts index ff47c159b..216138096 100644 --- a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts +++ b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts @@ -27,39 +27,35 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ // Nothing needed here } - private get slotName() { - return this.options.storage.slot_name; - } - async replicate() { try { - await this.replicateLoop(); + await this.replicateOnce(); } catch (e) { - // Fatal exception - container.reporter.captureException(e, { - metadata: {} - }); - this.logger.error(`Replication failed`, e); + if (!this.abortController.signal.aborted) { + container.reporter.captureException(e, { + metadata: {} + }); + + this.logger.error(`Replication error`, e); + if (e.cause != null) { + // Without this additional log, the cause may not be visible in the logs. + this.logger.error(`cause`, e.cause); + } + + this.rateLimiter.reportError(e); + } if (e instanceof ChangeStreamInvalidatedError) { // This stops replication and restarts with a new instance await this.options.storage.factory.restartReplication(this.storage.group_id); } + + // No need to rethrow - the error is already logged, and retry behavior is the same on error } finally { this.abortController.abort(); } } - async replicateLoop() { - while (!this.isStopped) { - await this.replicateOnce(); - - if (!this.isStopped) { - await new Promise((resolve) => setTimeout(resolve, 5000)); - } - } - } - async replicateOnce() { // New connections on every iteration (every error with retry), // otherwise we risk repeating errors related to the connection, @@ -79,25 +75,6 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ }); this.lastStream = stream; await stream.replicate(); - } catch (e) { - if (this.abortController.signal.aborted) { - return; - } - this.logger.error(`Replication error`, e); - if (e.cause != null) { - // Without this additional log, the cause may not be visible in the logs. - this.logger.error(`cause`, e.cause); - } - if (e instanceof ChangeStreamInvalidatedError) { - throw e; - } else { - // Report the error if relevant, before retrying - container.reporter.captureException(e, { - metadata: {} - }); - // This sets the retry delay - this.rateLimiter?.reportError(e); - } } finally { await connectionManager.end(); } diff --git a/modules/module-mongodb/src/replication/ChangeStreamReplicator.ts b/modules/module-mongodb/src/replication/ChangeStreamReplicator.ts index 5140ae5d7..2fca7aec3 100644 --- a/modules/module-mongodb/src/replication/ChangeStreamReplicator.ts +++ b/modules/module-mongodb/src/replication/ChangeStreamReplicator.ts @@ -25,7 +25,7 @@ export class ChangeStreamReplicator extends replication.AbstractReplicator(); public readonly dbConnectionConfig: NormalizedMongoConnectionConfig; constructor(dbConnectionConfig: NormalizedMongoConnectionConfig) { this.dbConnectionConfig = dbConnectionConfig; - this.connectionManagers = []; } create() { const manager = new MongoManager(this.dbConnectionConfig); - this.connectionManagers.push(manager); + this.connectionManagers.add(manager); + + manager.registerListener({ + onEnded: () => { + this.connectionManagers.delete(manager); + } + }); return manager; } async shutdown() { logger.info('Shutting down MongoDB connection Managers...'); - for (const manager of this.connectionManagers) { + for (const manager of [...this.connectionManagers]) { await manager.end(); } logger.info('MongoDB connection Managers shutdown completed.'); diff --git a/modules/module-mongodb/src/replication/MongoErrorRateLimiter.ts b/modules/module-mongodb/src/replication/MongoErrorRateLimiter.ts index 3c770fb9b..86d26aae2 100644 --- a/modules/module-mongodb/src/replication/MongoErrorRateLimiter.ts +++ b/modules/module-mongodb/src/replication/MongoErrorRateLimiter.ts @@ -1,12 +1,13 @@ import { ErrorRateLimiter } from '@powersync/service-core'; import { setTimeout } from 'timers/promises'; +import { ChangeStreamInvalidatedError } from './ChangeStream.js'; export class MongoErrorRateLimiter implements ErrorRateLimiter { nextAllowed: number = Date.now(); async waitUntilAllowed(options?: { signal?: AbortSignal | undefined } | undefined): Promise { const delay = Math.max(0, this.nextAllowed - Date.now()); - // Minimum delay between connections, even without errors + // Minimum delay between connections, even without errors (for the next attempt) this.setDelay(500); await setTimeout(delay, undefined, { signal: options?.signal }); } @@ -18,9 +19,12 @@ export class MongoErrorRateLimiter implements ErrorRateLimiter { reportError(e: any): void { // FIXME: Check mongodb-specific requirements const message = (e.message as string) ?? ''; - if (message.includes('password authentication failed')) { - // Wait 15 minutes, to avoid triggering Supabase's fail2ban - this.setDelay(900_000); + if (e instanceof ChangeStreamInvalidatedError) { + // Short delay + this.setDelay(2_000); + } else if (message.includes('Authentication failed')) { + // Wait 2 minutes, to avoid triggering too many authentication attempts + this.setDelay(120_000); } else if (message.includes('ENOTFOUND')) { // DNS lookup issue - incorrect URI or deleted instance this.setDelay(120_000); diff --git a/modules/module-mongodb/src/replication/MongoManager.ts b/modules/module-mongodb/src/replication/MongoManager.ts index 3290c22c5..7583ae463 100644 --- a/modules/module-mongodb/src/replication/MongoManager.ts +++ b/modules/module-mongodb/src/replication/MongoManager.ts @@ -2,11 +2,16 @@ import { mongo } from '@powersync/lib-service-mongodb'; import { NormalizedMongoConnectionConfig } from '../types/types.js'; import { BSON_DESERIALIZE_DATA_OPTIONS, POWERSYNC_VERSION } from '@powersync/service-core'; +import { BaseObserver } from '@powersync/lib-services-framework'; + +export interface MongoManagerListener { + onEnded(): void; +} /** * Manage a MongoDB source database connection. */ -export class MongoManager { +export class MongoManager extends BaseObserver { public readonly client: mongo.MongoClient; public readonly db: mongo.Db; @@ -14,6 +19,7 @@ export class MongoManager { public options: NormalizedMongoConnectionConfig, overrides?: mongo.MongoClientOptions ) { + super(); // The pool is lazy - no connections are opened until a query is performed. this.client = new mongo.MongoClient(options.uri, { auth: { @@ -59,9 +65,8 @@ export class MongoManager { async end(): Promise { await this.client.close(); - } - - async destroy() { - // TODO: Implement? + this.iterateListeners((listener) => { + listener.onEnded?.(); + }); } } diff --git a/modules/module-mongodb/test/src/change_stream_utils.ts b/modules/module-mongodb/test/src/change_stream_utils.ts index b79d12fb4..122435640 100644 --- a/modules/module-mongodb/test/src/change_stream_utils.ts +++ b/modules/module-mongodb/test/src/change_stream_utils.ts @@ -57,11 +57,18 @@ export class ChangeStreamTestContext { initializeCoreReplicationMetrics(METRICS_HELPER.metricsEngine); } - async dispose() { + /** + * Abort snapshot and/or replication, without actively closing connections. + */ + abort() { this.abortController.abort(); + } + + async dispose() { + this.abort(); await this.streamPromise?.catch((e) => e); - await this.connectionManager.destroy(); await this.factory[Symbol.asyncDispose](); + await this.connectionManager.end(); } async [Symbol.asyncDispose]() { diff --git a/modules/module-mongodb/test/src/resuming_snapshots.test.ts b/modules/module-mongodb/test/src/resuming_snapshots.test.ts index 589c2a55d..ff06f6d3f 100644 --- a/modules/module-mongodb/test/src/resuming_snapshots.test.ts +++ b/modules/module-mongodb/test/src/resuming_snapshots.test.ts @@ -1,11 +1,11 @@ -import { describe, expect, test } from 'vitest'; -import { env } from './env.js'; -import { describeWithStorage } from './util.js'; import { TestStorageFactory } from '@powersync/service-core'; import { METRICS_HELPER } from '@powersync/service-core-tests'; import { ReplicationMetric } from '@powersync/service-types'; import * as timers from 'node:timers/promises'; +import { describe, expect, test } from 'vitest'; import { ChangeStreamTestContext } from './change_stream_utils.js'; +import { env } from './env.js'; +import { describeWithStorage } from './util.js'; describe.skipIf(!(env.CI || env.SLOW_TESTS))('batch replication', function () { describeWithStorage({ timeout: 240_000 }, function (factory) { @@ -32,107 +32,116 @@ async function testResumingReplication(factory: TestStorageFactory, stopAfter: n // have been / have not been replicated at that point is not deterministic. // We do allow for some variation in the test results to account for this. - await using context = await ChangeStreamTestContext.open(factory, { streamOptions: { snapshotChunkLength: 1000 } }); + let startRowCount: number; + + { + await using context = await ChangeStreamTestContext.open(factory, { streamOptions: { snapshotChunkLength: 1000 } }); - await context.updateSyncRules(`bucket_definitions: + await context.updateSyncRules(`bucket_definitions: global: data: - SELECT _id as id, description FROM test_data1 - SELECT _id as id, description FROM test_data2`); - const { db } = context; - - let batch = db.collection('test_data1').initializeUnorderedBulkOp(); - for (let i = 1; i <= 1000; i++) { - batch.insert({ _id: i, description: 'foo' }); - } - await batch.execute(); - batch = db.collection('test_data2').initializeUnorderedBulkOp(); - for (let i = 1; i <= 10000; i++) { - batch.insert({ _id: i, description: 'foo' }); - } - await batch.execute(); - - const p = context.replicateSnapshot(); - - let done = false; - - const startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; - try { - (async () => { - while (!done) { - const count = - ((await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0) - startRowCount; - - if (count >= stopAfter) { - break; + const { db } = context; + + let batch = db.collection('test_data1').initializeUnorderedBulkOp(); + for (let i = 1; i <= 1000; i++) { + batch.insert({ _id: i, description: 'foo' }); + } + await batch.execute(); + batch = db.collection('test_data2').initializeUnorderedBulkOp(); + for (let i = 1; i <= 10000; i++) { + batch.insert({ _id: i, description: 'foo' }); + } + await batch.execute(); + + const p = context.replicateSnapshot().catch((e) => ({ error: e })); + + let done = false; + + startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; + try { + (async () => { + while (!done) { + const count = + ((await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0) - startRowCount; + + if (count >= stopAfter) { + break; + } + await timers.setTimeout(1); } - await timers.setTimeout(1); - } - // This interrupts initial replication - await context.dispose(); - })(); - // This confirms that initial replication was interrupted - await expect(p).rejects.toThrowError(); - done = true; - } finally { - done = true; + // This interrupts initial replication + // We don't dispose the context here yet, since closing the database connection while in use + // results in unpredictable error conditions. + context.abort(); + })(); + // This confirms that initial replication was interrupted + await expect(await p).haveOwnProperty('error'); + } finally { + done = true; + } } - // Bypass the usual "clear db on factory open" step. - await using context2 = await ChangeStreamTestContext.open(factory, { - doNotClear: true, - streamOptions: { snapshotChunkLength: 1000 } - }); + { + // Bypass the usual "clear db on factory open" step. + await using context2 = await ChangeStreamTestContext.open(factory, { + doNotClear: true, + streamOptions: { snapshotChunkLength: 1000 } + }); - // This delete should be using one of the ids already replicated - await db.collection('test_data2').deleteOne({ _id: 1 as any }); - await db.collection('test_data2').updateOne({ _id: 2 as any }, { $set: { description: 'update1' } }); - await db.collection('test_data2').insertOne({ _id: 10001 as any, description: 'insert1' }); - - await context2.loadNextSyncRules(); - await context2.replicateSnapshot(); - - context2.startStreaming(); - const data = await context2.getBucketData('global[]', undefined, {}); - - const deletedRowOps = data.filter((row) => row.object_type == 'test_data2' && row.object_id === '1'); - const updatedRowOps = data.filter((row) => row.object_type == 'test_data2' && row.object_id === '2'); - const insertedRowOps = data.filter((row) => row.object_type == 'test_data2' && row.object_id === '10001'); - - if (deletedRowOps.length != 0) { - // The deleted row was part of the first replication batch, - // so it is removed by streaming replication. - expect(deletedRowOps.length).toEqual(2); - expect(deletedRowOps[1].op).toEqual('REMOVE'); - } else { - // The deleted row was not part of the first replication batch, - // so it's not in the resulting ops at all. + const { db } = context2; + + // This delete should be using one of the ids already replicated + await db.collection('test_data2').deleteOne({ _id: 1 as any }); + await db.collection('test_data2').updateOne({ _id: 2 as any }, { $set: { description: 'update1' } }); + await db.collection('test_data2').insertOne({ _id: 10001 as any, description: 'insert1' }); + + await context2.loadNextSyncRules(); + await context2.replicateSnapshot(); + + context2.startStreaming(); + const data = await context2.getBucketData('global[]', undefined, {}); + + const deletedRowOps = data.filter((row) => row.object_type == 'test_data2' && row.object_id === '1'); + const updatedRowOps = data.filter((row) => row.object_type == 'test_data2' && row.object_id === '2'); + const insertedRowOps = data.filter((row) => row.object_type == 'test_data2' && row.object_id === '10001'); + + if (deletedRowOps.length != 0) { + // The deleted row was part of the first replication batch, + // so it is removed by streaming replication. + expect(deletedRowOps.length).toEqual(2); + expect(deletedRowOps[1].op).toEqual('REMOVE'); + } else { + // The deleted row was not part of the first replication batch, + // so it's not in the resulting ops at all. + } + + expect(updatedRowOps.length).toEqual(2); + // description for the first op could be 'foo' or 'update1'. + // We only test the final version. + expect(JSON.parse(updatedRowOps[1].data as string).description).toEqual('update1'); + + expect(insertedRowOps.length).toEqual(2); + expect(JSON.parse(insertedRowOps[0].data as string).description).toEqual('insert1'); + expect(JSON.parse(insertedRowOps[1].data as string).description).toEqual('insert1'); + + // 1000 of test_data1 during first replication attempt. + // N >= 1000 of test_data2 during first replication attempt. + // 10000 - N - 1 + 1 of test_data2 during second replication attempt. + // An additional update during streaming replication (2x total for this row). + // An additional insert during streaming replication (2x total for this row). + // If the deleted row was part of the first replication batch, it's removed by streaming replication. + // This adds 2 ops. + // We expect this to be 11002 for stopAfter: 2000, and 11004 for stopAfter: 8000. + // However, this is not deterministic. + const expectedCount = 11002 + deletedRowOps.length; + expect(data.length).toEqual(expectedCount); + + const replicatedCount = + ((await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0) - startRowCount; + + // With resumable replication, there should be no need to re-replicate anything. + expect(replicatedCount).toEqual(expectedCount); } - - expect(updatedRowOps.length).toEqual(2); - // description for the first op could be 'foo' or 'update1'. - // We only test the final version. - expect(JSON.parse(updatedRowOps[1].data as string).description).toEqual('update1'); - - expect(insertedRowOps.length).toEqual(2); - expect(JSON.parse(insertedRowOps[0].data as string).description).toEqual('insert1'); - expect(JSON.parse(insertedRowOps[1].data as string).description).toEqual('insert1'); - - // 1000 of test_data1 during first replication attempt. - // N >= 1000 of test_data2 during first replication attempt. - // 10000 - N - 1 + 1 of test_data2 during second replication attempt. - // An additional update during streaming replication (2x total for this row). - // An additional insert during streaming replication (2x total for this row). - // If the deleted row was part of the first replication batch, it's removed by streaming replication. - // This adds 2 ops. - // We expect this to be 11002 for stopAfter: 2000, and 11004 for stopAfter: 8000. - // However, this is not deterministic. - const expectedCount = 11002 + deletedRowOps.length; - expect(data.length).toEqual(expectedCount); - - const replicatedCount = - ((await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0) - startRowCount; - - // With resumable replication, there should be no need to re-replicate anything. - expect(replicatedCount).toEqual(expectedCount); } diff --git a/modules/module-mysql/src/replication/BinLogReplicationJob.ts b/modules/module-mysql/src/replication/BinLogReplicationJob.ts index 06f5dfb9f..32c2371b7 100644 --- a/modules/module-mysql/src/replication/BinLogReplicationJob.ts +++ b/modules/module-mysql/src/replication/BinLogReplicationJob.ts @@ -27,30 +27,29 @@ export class BinLogReplicationJob extends replication.AbstractReplicationJob { async replicate() { try { - await this.replicateLoop(); + await this.replicateOnce(); } catch (e) { - // Fatal exception - container.reporter.captureException(e, { - metadata: { - replication_slot: this.slot_name + if (!this.abortController.signal.aborted) { + this.logger.error(`Replication error`, e); + if (e.cause != null) { + this.logger.error(`cause`, e.cause); } - }); - this.logger.error(`Replication failed`, e); + // Report the error if relevant, before retrying + container.reporter.captureException(e, { + metadata: { + replication_slot: this.slot_name + } + }); + // This sets the retry delay + this.rateLimiter.reportError(e); + } + + // No need to rethrow - the error is already logged, and retry behavior is the same on error } finally { this.abortController.abort(); } } - async replicateLoop() { - while (!this.isStopped) { - await this.replicateOnce(); - - if (!this.isStopped) { - await new Promise((resolve) => setTimeout(resolve, 5000)); - } - } - } - async replicateOnce() { // New connections on every iteration (every error with retry), // otherwise we risk repeating errors related to the connection, @@ -84,27 +83,6 @@ export class BinLogReplicationJob extends replication.AbstractReplicationJob { }); this.lastStream = stream; await stream.replicate(); - } catch (e) { - if (this.abortController.signal.aborted) { - return; - } - this.logger.error(`Replication error`, e); - if (e.cause != null) { - this.logger.error(`cause`, e.cause); - } - - if (e instanceof BinlogConfigurationError) { - throw e; - } else { - // Report the error if relevant, before retrying - container.reporter.captureException(e, { - metadata: { - replication_slot: this.slot_name - } - }); - // This sets the retry delay - this.rateLimiter?.reportError(e); - } } finally { await connectionManager.end(); } diff --git a/modules/module-mysql/src/replication/MySQLConnectionManager.ts b/modules/module-mysql/src/replication/MySQLConnectionManager.ts index b648ab262..badb55f22 100644 --- a/modules/module-mysql/src/replication/MySQLConnectionManager.ts +++ b/modules/module-mysql/src/replication/MySQLConnectionManager.ts @@ -2,10 +2,14 @@ import { NormalizedMySQLConnectionConfig } from '../types/types.js'; import mysqlPromise from 'mysql2/promise'; import mysql, { FieldPacket, RowDataPacket } from 'mysql2'; import * as mysql_utils from '../utils/mysql-utils.js'; -import { logger } from '@powersync/lib-services-framework'; +import { BaseObserver, logger } from '@powersync/lib-services-framework'; import { ZongJi } from '@powersync/mysql-zongji'; -export class MySQLConnectionManager { +export interface MySQLConnectionManagerListener { + onEnded(): void; +} + +export class MySQLConnectionManager extends BaseObserver { /** * Pool that can create streamable connections */ @@ -23,6 +27,7 @@ export class MySQLConnectionManager { public options: NormalizedMySQLConnectionConfig, public poolOptions: mysqlPromise.PoolOptions ) { + super(); // The pool is lazy - no connections are opened until a query is performed. this.pool = mysql_utils.createPool(options, poolOptions); this.promisePool = this.pool.promise(); @@ -98,18 +103,24 @@ export class MySQLConnectionManager { } async end(): Promise { - if (!this.isClosed) { - for (const listener of this.binlogListeners) { - listener.stop(); - } - - try { - await this.promisePool.end(); - this.isClosed = true; - } catch (error) { - // We don't particularly care if any errors are thrown when shutting down the pool - logger.warn('Error shutting down MySQL connection pool', error); - } + if (this.isClosed) { + return; + } + + for (const listener of this.binlogListeners) { + listener.stop(); + } + + try { + await this.promisePool.end(); + } catch (error) { + // We don't particularly care if any errors are thrown when shutting down the pool + logger.warn('Error shutting down MySQL connection pool', error); + } finally { + this.isClosed = true; + this.iterateListeners((listener) => { + listener.onEnded?.(); + }); } } } diff --git a/modules/module-mysql/src/replication/MySQLConnectionManagerFactory.ts b/modules/module-mysql/src/replication/MySQLConnectionManagerFactory.ts index d817eb850..ca28a1a94 100644 --- a/modules/module-mysql/src/replication/MySQLConnectionManagerFactory.ts +++ b/modules/module-mysql/src/replication/MySQLConnectionManagerFactory.ts @@ -4,23 +4,28 @@ import { MySQLConnectionManager } from './MySQLConnectionManager.js'; import { ResolvedConnectionConfig } from '../types/types.js'; export class MySQLConnectionManagerFactory { - private readonly connectionManagers: MySQLConnectionManager[]; + private readonly connectionManagers = new Set(); public readonly connectionConfig: ResolvedConnectionConfig; constructor(connectionConfig: ResolvedConnectionConfig) { this.connectionConfig = connectionConfig; - this.connectionManagers = []; } create(poolOptions: mysql.PoolOptions) { const manager = new MySQLConnectionManager(this.connectionConfig, poolOptions); - this.connectionManagers.push(manager); + this.connectionManagers.add(manager); + + manager.registerListener({ + onEnded: () => { + this.connectionManagers.delete(manager); + } + }); return manager; } async shutdown() { logger.info('Shutting down MySQL connection Managers...'); - for (const manager of this.connectionManagers) { + for (const manager of [...this.connectionManagers]) { await manager.end(); } logger.info('MySQL connection Managers shutdown completed.'); diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index 23498e2fc..73600b0b8 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -61,11 +61,7 @@ export class PostgresSyncRulesStorage // TODO we might be able to share this in an abstract class private parsedSyncRulesCache: { parsed: sync_rules.SqlSyncRules; options: storage.ParseSyncRulesOptions } | undefined; - private checksumCache = new storage.ChecksumCache({ - fetchChecksums: (batch) => { - return this.getChecksumsInternal(batch); - } - }); + private _checksumCache: storage.ChecksumCache | undefined; constructor(protected options: PostgresSyncRulesStorageOptions) { super(); @@ -81,6 +77,20 @@ export class PostgresSyncRulesStorage }); } + /** + * Lazy-instantiated cache. + * + * This means the cache only allocates memory once it is used for the first time. + */ + private get checksumCache(): storage.ChecksumCache { + this._checksumCache ??= new storage.ChecksumCache({ + fetchChecksums: (batch) => { + return this.getChecksumsInternal(batch); + } + }); + return this._checksumCache; + } + get writeCheckpointMode(): storage.WriteCheckpointMode { return this.writeCheckpointAPI.writeCheckpointMode; } diff --git a/modules/module-postgres/src/replication/ConnectionManagerFactory.ts b/modules/module-postgres/src/replication/ConnectionManagerFactory.ts index 094ae339f..fb089751d 100644 --- a/modules/module-postgres/src/replication/ConnectionManagerFactory.ts +++ b/modules/module-postgres/src/replication/ConnectionManagerFactory.ts @@ -5,7 +5,7 @@ import { logger } from '@powersync/lib-services-framework'; import { CustomTypeRegistry } from '../types/registry.js'; export class ConnectionManagerFactory { - private readonly connectionManagers: PgManager[]; + private readonly connectionManagers = new Set(); public readonly dbConnectionConfig: NormalizedPostgresConnectionConfig; constructor( @@ -13,18 +13,23 @@ export class ConnectionManagerFactory { private readonly registry: CustomTypeRegistry ) { this.dbConnectionConfig = dbConnectionConfig; - this.connectionManagers = []; } create(poolOptions: PgPoolOptions) { const manager = new PgManager(this.dbConnectionConfig, { ...poolOptions, registry: this.registry }); - this.connectionManagers.push(manager); + this.connectionManagers.add(manager); + + manager.registerListener({ + onEnded: () => { + this.connectionManagers.delete(manager); + } + }); return manager; } async shutdown() { logger.info('Shutting down Postgres connection Managers...'); - for (const manager of this.connectionManagers) { + for (const manager of [...this.connectionManagers]) { await manager.end(); } logger.info('Postgres connection Managers shutdown completed.'); diff --git a/modules/module-postgres/src/replication/PgManager.ts b/modules/module-postgres/src/replication/PgManager.ts index 3d034d326..8bcd7ab4f 100644 --- a/modules/module-postgres/src/replication/PgManager.ts +++ b/modules/module-postgres/src/replication/PgManager.ts @@ -5,6 +5,7 @@ import { getApplicationName } from '../utils/application-name.js'; import { PostgresTypeResolver } from '../types/resolver.js'; import { getServerVersion } from '../utils/postgres_version.js'; import { CustomTypeRegistry } from '../types/registry.js'; +import { BaseObserver } from '@powersync/lib-services-framework'; export interface PgManagerOptions extends pgwire.PgPoolOptions { registry: CustomTypeRegistry; @@ -15,7 +16,11 @@ export interface PgManagerOptions extends pgwire.PgPoolOptions { */ const SNAPSHOT_SOCKET_TIMEOUT = 30_000; -export class PgManager { +export interface PgManagerListener { + onEnded(): void; +} + +export class PgManager extends BaseObserver { /** * Do not use this for any transactions. */ @@ -29,6 +34,7 @@ export class PgManager { public options: NormalizedPostgresConnectionConfig, public poolOptions: PgManagerOptions ) { + super(); // The pool is lazy - no connections are opened until a query is performed. this.pool = pgwire.connectPgWirePool(this.options, poolOptions); this.types = new PostgresTypeResolver(poolOptions.registry, this.pool); @@ -83,8 +89,9 @@ export class PgManager { for (let result of await Promise.allSettled([ this.pool.end(), ...this.connectionPromises.map(async (promise) => { - const connection = await promise; - return await connection.end(); + // Wait for connection attempts to finish, but do not throw connection errors here + const connection = await promise.catch((_) => {}); + return await connection?.end(); }) ])) { // Throw the first error, if any @@ -92,14 +99,18 @@ export class PgManager { throw result.reason; } } + this.iterateListeners((listener) => { + listener.onEnded?.(); + }); } async destroy() { this.pool.destroy(); for (let result of await Promise.allSettled([ ...this.connectionPromises.map(async (promise) => { - const connection = await promise; - return connection.destroy(); + // Wait for connection attempts to finish, but do not throw connection errors here + const connection = await promise.catch((_) => {}); + return connection?.destroy(); }) ])) { // Throw the first error, if any @@ -107,5 +118,8 @@ export class PgManager { throw result.reason; } } + this.iterateListeners((listener) => { + listener.onEnded?.(); + }); } } diff --git a/modules/module-postgres/src/replication/PostgresErrorRateLimiter.ts b/modules/module-postgres/src/replication/PostgresErrorRateLimiter.ts index 9a86a0704..a356ca6f2 100644 --- a/modules/module-postgres/src/replication/PostgresErrorRateLimiter.ts +++ b/modules/module-postgres/src/replication/PostgresErrorRateLimiter.ts @@ -1,5 +1,6 @@ import { setTimeout } from 'timers/promises'; import { ErrorRateLimiter } from '@powersync/service-core'; +import { MissingReplicationSlotError } from './WalStream.js'; export class PostgresErrorRateLimiter implements ErrorRateLimiter { nextAllowed: number = Date.now(); @@ -17,7 +18,10 @@ export class PostgresErrorRateLimiter implements ErrorRateLimiter { reportError(e: any): void { const message = (e.message as string) ?? ''; - if (message.includes('password authentication failed')) { + if (e instanceof MissingReplicationSlotError) { + // Short delay for a retrying (re-creating the slot) + this.setDelay(2_000); + } else if (message.includes('password authentication failed')) { // Wait 15 minutes, to avoid triggering Supabase's fail2ban this.setDelay(900_000); } else if (message.includes('ENOTFOUND')) { diff --git a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts index a9d38b5b5..8b6021cd6 100644 --- a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts +++ b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts @@ -12,19 +12,13 @@ export interface WalStreamReplicationJobOptions extends replication.AbstractRepl export class WalStreamReplicationJob extends replication.AbstractReplicationJob { private connectionFactory: ConnectionManagerFactory; - private readonly connectionManager: PgManager; + private connectionManager: PgManager | null = null; private lastStream: WalStream | null = null; constructor(options: WalStreamReplicationJobOptions) { super(options); this.logger = logger.child({ prefix: `[${this.slotName}] ` }); this.connectionFactory = options.connectionFactory; - this.connectionManager = this.connectionFactory.create({ - // Pool connections are only used intermittently. - idleTimeout: 30_000, - maxSize: 2, - applicationName: getApplicationName() - }); } /** @@ -40,10 +34,12 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob * **This may be a bug in pgwire or how we're using it. */ async keepAlive() { - try { - await sendKeepAlive(this.connectionManager.pool); - } catch (e) { - this.logger.warn(`KeepAlive failed, unable to post to WAL`, e); + if (this.connectionManager) { + try { + await sendKeepAlive(this.connectionManager.pool); + } catch (e) { + this.logger.warn(`KeepAlive failed, unable to post to WAL`, e); + } } } @@ -53,35 +49,58 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob async replicate() { try { - await this.replicateLoop(); + await this.replicateOnce(); } catch (e) { // Fatal exception - container.reporter.captureException(e, { - metadata: { - replication_slot: this.slotName + + if (!this.isStopped) { + // Ignore aborted errors + + this.logger.error(`Replication error`, e); + if (e.cause != null) { + // Example: + // PgError.conn_ended: Unable to do postgres query on ended connection + // at PgConnection.stream (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:315:13) + // at stream.next () + // at PgResult.fromStream (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:1174:22) + // at PgConnection.query (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:311:21) + // at WalStream.startInitialReplication (file:///.../powersync/powersync-service/lib/replication/WalStream.js:266:22) + // ... + // cause: TypeError: match is not iterable + // at timestamptzToSqlite (file:///.../powersync/packages/jpgwire/dist/util.js:140:50) + // at PgType.decode (file:///.../powersync/packages/jpgwire/dist/pgwire_types.js:25:24) + // at PgConnection._recvDataRow (file:///.../powersync/packages/jpgwire/dist/util.js:88:22) + // at PgConnection._recvMessages (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:656:30) + // at PgConnection._ioloopAttempt (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:563:20) + // at process.processTicksAndRejections (node:internal/process/task_queues:95:5) + // at async PgConnection._ioloop (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:517:14), + // [Symbol(pg.ErrorCode)]: 'conn_ended', + // [Symbol(pg.ErrorResponse)]: undefined + // } + // Without this additional log, the cause would not be visible in the logs. + this.logger.error(`cause`, e.cause); } - }); - this.logger.error(`Replication failed`, e); + // Report the error if relevant, before retrying + container.reporter.captureException(e, { + metadata: { + replication_slot: this.slotName + } + }); + // This sets the retry delay + this.rateLimiter.reportError(e); + } if (e instanceof MissingReplicationSlotError) { // This stops replication on this slot and restarts with a new slot await this.options.storage.factory.restartReplication(this.storage.group_id); } + + // No need to rethrow - the error is already logged, and retry behavior is the same on error } finally { this.abortController.abort(); } } - async replicateLoop() { - while (!this.isStopped) { - await this.replicateOnce(); - - if (!this.isStopped) { - await new Promise((resolve) => setTimeout(resolve, 5000)); - } - } - } - async replicateOnce() { // New connections on every iteration (every error with retry), // otherwise we risk repeating errors related to the connection, @@ -92,6 +111,7 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob maxSize: 2, applicationName: getApplicationName() }); + this.connectionManager = connectionManager; try { await this.rateLimiter?.waitUntilAllowed({ signal: this.abortController.signal }); if (this.isStopped) { @@ -106,48 +126,8 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob }); this.lastStream = stream; await stream.replicate(); - } catch (e) { - if (this.isStopped && e instanceof ReplicationAbortedError) { - // Ignore aborted errors - return; - } - this.logger.error(`Replication error`, e); - if (e.cause != null) { - // Example: - // PgError.conn_ended: Unable to do postgres query on ended connection - // at PgConnection.stream (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:315:13) - // at stream.next () - // at PgResult.fromStream (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:1174:22) - // at PgConnection.query (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:311:21) - // at WalStream.startInitialReplication (file:///.../powersync/powersync-service/lib/replication/WalStream.js:266:22) - // ... - // cause: TypeError: match is not iterable - // at timestamptzToSqlite (file:///.../powersync/packages/jpgwire/dist/util.js:140:50) - // at PgType.decode (file:///.../powersync/packages/jpgwire/dist/pgwire_types.js:25:24) - // at PgConnection._recvDataRow (file:///.../powersync/packages/jpgwire/dist/util.js:88:22) - // at PgConnection._recvMessages (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:656:30) - // at PgConnection._ioloopAttempt (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:563:20) - // at process.processTicksAndRejections (node:internal/process/task_queues:95:5) - // at async PgConnection._ioloop (file:///.../powersync/node_modules/.pnpm/github.com+kagis+pgwire@f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87/node_modules/pgwire/mod.js:517:14), - // [Symbol(pg.ErrorCode)]: 'conn_ended', - // [Symbol(pg.ErrorResponse)]: undefined - // } - // Without this additional log, the cause would not be visible in the logs. - this.logger.error(`cause`, e.cause); - } - if (e instanceof MissingReplicationSlotError) { - throw e; - } else { - // Report the error if relevant, before retrying - container.reporter.captureException(e, { - metadata: { - replication_slot: this.slotName - } - }); - // This sets the retry delay - this.rateLimiter?.reportError(e); - } } finally { + this.connectionManager = null; await connectionManager.end(); } }