Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/nervous-dots-behave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
'@powersync/service-module-postgres-storage': patch
'@powersync/service-module-mongodb-storage': patch
'@powersync/service-module-postgres': patch
'@powersync/service-module-mongodb': patch
'@powersync/service-core': patch
'@powersync/service-module-mysql': patch
'@powersync/service-image': patch
---

Fix memory leaks when retrying replication after errors.
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,28 @@ const DEFAULT_OPERATION_BATCH_LIMIT = 50_000;
* 4. computePartialChecksumsInternal() -> aggregate over 50_000 operations in bucket_data at a time
*/
export class MongoChecksums {
private cache = new ChecksumCache({
fetchChecksums: (batch) => {
return this.computePartialChecksums(batch);
}
});
private _cache: ChecksumCache | undefined;

constructor(
private db: PowerSyncMongo,
private group_id: number,
private options?: MongoChecksumOptions
) {}

/**
* Lazy-instantiated cache.
*
* This means the cache only allocates memory once it is used for the first time.
*/
private get cache(): ChecksumCache {
this._cache ??= new ChecksumCache({
fetchChecksums: (batch) => {
return this.computePartialChecksums(batch);
}
});
return this._cache;
}

/**
* Calculate checksums, utilizing the cache for partial checkums, and querying the remainder from
* the database (bucket_state + bucket_data).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,19 @@ export class MongoSyncRulesLock implements storage.ReplicationLock {
);

if (doc == null) {
throw new ServiceError(
ErrorCode.PSYNC_S1003,
`Sync rules: ${sync_rules.id} have been locked by another process for replication.`
);
// Query the existing lock to get the expiration time (best effort - it may have been released in the meantime).
const heldLock = await db.sync_rules.findOne({ _id: sync_rules.id }, { projection: { lock: 1 } });
if (heldLock?.lock?.expires_at) {
throw new ServiceError(
ErrorCode.PSYNC_S1003,
`Sync rules: ${sync_rules.id} have been locked by another process for replication, expiring at ${heldLock.lock.expires_at.toISOString()}.`
);
} else {
throw new ServiceError(
ErrorCode.PSYNC_S1003,
`Sync rules: ${sync_rules.id} have been locked by another process for replication.`
);
}
}
return new MongoSyncRulesLock(db, sync_rules.id, lockId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ export interface SyncRuleDocument {
last_fatal_error: string | null;

content: string;

lock?: {
id: string;
expires_at: Date;
} | null;
}

export interface CheckpointEventDocument {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,39 +27,35 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ
// Nothing needed here
}

private get slotName() {
return this.options.storage.slot_name;
}

async replicate() {
try {
await this.replicateLoop();
await this.replicateOnce();
} catch (e) {
// Fatal exception
container.reporter.captureException(e, {
metadata: {}
});
this.logger.error(`Replication failed`, e);
if (!this.abortController.signal.aborted) {
container.reporter.captureException(e, {
metadata: {}
});

this.logger.error(`Replication error`, e);
if (e.cause != null) {
// Without this additional log, the cause may not be visible in the logs.
this.logger.error(`cause`, e.cause);
}

this.rateLimiter.reportError(e);
}

if (e instanceof ChangeStreamInvalidatedError) {
// This stops replication and restarts with a new instance
await this.options.storage.factory.restartReplication(this.storage.group_id);
}

// No need to rethrow - the error is already logged, and retry behavior is the same on error
} finally {
this.abortController.abort();
}
}

async replicateLoop() {
while (!this.isStopped) {
await this.replicateOnce();

if (!this.isStopped) {
await new Promise((resolve) => setTimeout(resolve, 5000));
}
}
}

async replicateOnce() {
// New connections on every iteration (every error with retry),
// otherwise we risk repeating errors related to the connection,
Expand All @@ -79,25 +75,6 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ
});
this.lastStream = stream;
await stream.replicate();
} catch (e) {
if (this.abortController.signal.aborted) {
return;
}
this.logger.error(`Replication error`, e);
if (e.cause != null) {
// Without this additional log, the cause may not be visible in the logs.
this.logger.error(`cause`, e.cause);
}
if (e instanceof ChangeStreamInvalidatedError) {
throw e;
} else {
// Report the error if relevant, before retrying
container.reporter.captureException(e, {
metadata: {}
});
// This sets the retry delay
this.rateLimiter?.reportError(e);
}
} finally {
await connectionManager.end();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export class ChangeStreamReplicator extends replication.AbstractReplicator<Chang
metrics: this.metrics,
connectionFactory: this.connectionFactory,
lock: options.lock,
rateLimiter: new MongoErrorRateLimiter()
rateLimiter: this.rateLimiter
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,28 @@ import { NormalizedMongoConnectionConfig } from '../types/types.js';
import { MongoManager } from './MongoManager.js';

export class ConnectionManagerFactory {
private readonly connectionManagers: MongoManager[];
private readonly connectionManagers = new Set<MongoManager>();
public readonly dbConnectionConfig: NormalizedMongoConnectionConfig;

constructor(dbConnectionConfig: NormalizedMongoConnectionConfig) {
this.dbConnectionConfig = dbConnectionConfig;
this.connectionManagers = [];
}

create() {
const manager = new MongoManager(this.dbConnectionConfig);
this.connectionManagers.push(manager);
this.connectionManagers.add(manager);

manager.registerListener({
onEnded: () => {
this.connectionManagers.delete(manager);
}
});
return manager;
}

async shutdown() {
logger.info('Shutting down MongoDB connection Managers...');
for (const manager of this.connectionManagers) {
for (const manager of [...this.connectionManagers]) {
await manager.end();
}
logger.info('MongoDB connection Managers shutdown completed.');
Expand Down
12 changes: 8 additions & 4 deletions modules/module-mongodb/src/replication/MongoErrorRateLimiter.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { ErrorRateLimiter } from '@powersync/service-core';
import { setTimeout } from 'timers/promises';
import { ChangeStreamInvalidatedError } from './ChangeStream.js';

export class MongoErrorRateLimiter implements ErrorRateLimiter {
nextAllowed: number = Date.now();

async waitUntilAllowed(options?: { signal?: AbortSignal | undefined } | undefined): Promise<void> {
const delay = Math.max(0, this.nextAllowed - Date.now());
// Minimum delay between connections, even without errors
// Minimum delay between connections, even without errors (for the next attempt)
this.setDelay(500);
await setTimeout(delay, undefined, { signal: options?.signal });
}
Expand All @@ -18,9 +19,12 @@ export class MongoErrorRateLimiter implements ErrorRateLimiter {
reportError(e: any): void {
// FIXME: Check mongodb-specific requirements
const message = (e.message as string) ?? '';
if (message.includes('password authentication failed')) {
// Wait 15 minutes, to avoid triggering Supabase's fail2ban
this.setDelay(900_000);
if (e instanceof ChangeStreamInvalidatedError) {
// Short delay
this.setDelay(2_000);
} else if (message.includes('Authentication failed')) {
// Wait 2 minutes, to avoid triggering too many authentication attempts
this.setDelay(120_000);
} else if (message.includes('ENOTFOUND')) {
// DNS lookup issue - incorrect URI or deleted instance
this.setDelay(120_000);
Expand Down
15 changes: 10 additions & 5 deletions modules/module-mongodb/src/replication/MongoManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,24 @@ import { mongo } from '@powersync/lib-service-mongodb';

import { NormalizedMongoConnectionConfig } from '../types/types.js';
import { BSON_DESERIALIZE_DATA_OPTIONS, POWERSYNC_VERSION } from '@powersync/service-core';
import { BaseObserver } from '@powersync/lib-services-framework';

export interface MongoManagerListener {
onEnded(): void;
}

/**
* Manage a MongoDB source database connection.
*/
export class MongoManager {
export class MongoManager extends BaseObserver<MongoManagerListener> {
public readonly client: mongo.MongoClient;
public readonly db: mongo.Db;

constructor(
public options: NormalizedMongoConnectionConfig,
overrides?: mongo.MongoClientOptions
) {
super();
// The pool is lazy - no connections are opened until a query is performed.
this.client = new mongo.MongoClient(options.uri, {
auth: {
Expand Down Expand Up @@ -59,9 +65,8 @@ export class MongoManager {

async end(): Promise<void> {
await this.client.close();
}

async destroy() {
// TODO: Implement?
this.iterateListeners((listener) => {
listener.onEnded?.();
});
}
}
11 changes: 9 additions & 2 deletions modules/module-mongodb/test/src/change_stream_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,18 @@ export class ChangeStreamTestContext {
initializeCoreReplicationMetrics(METRICS_HELPER.metricsEngine);
}

async dispose() {
/**
* Abort snapshot and/or replication, without actively closing connections.
*/
abort() {
this.abortController.abort();
}

async dispose() {
this.abort();
await this.streamPromise?.catch((e) => e);
await this.connectionManager.destroy();
await this.factory[Symbol.asyncDispose]();
await this.connectionManager.end();
}

async [Symbol.asyncDispose]() {
Expand Down
Loading