Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/selfish-carpets-bathe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@powersync/service-module-postgres-storage': patch
'@powersync/service-module-mongodb-storage': patch
'@powersync/service-core': patch
'@powersync/service-types': patch
'@powersync/service-image': patch
---

Clear replication errors when any replication progress has been made.
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ export class MongoBucketStorage
slot_name: slot_name,
last_checkpoint_ts: null,
last_fatal_error: null,
last_fatal_error_ts: null,
last_keepalive_ts: null
};
await this.db.sync_rules.insertOne(doc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ export class MongoBucketBatch
private batch: OperationBatch | null = null;
private write_checkpoint_batch: storage.CustomWriteCheckpointOptions[] = [];
private markRecordUnavailable: BucketStorageMarkRecordUnavailable | undefined;
private clearedError = false;

/**
* Last LSN received associated with a checkpoint.
Expand Down Expand Up @@ -243,6 +244,8 @@ export class MongoBucketBatch
let resumeBatch: OperationBatch | null = null;
let transactionSize = 0;

let didFlush = false;

// Now batch according to the sizes
// This is a single batch if storeCurrentData == false
for await (let b of batch.batched(sizes)) {
Expand Down Expand Up @@ -292,7 +295,8 @@ export class MongoBucketBatch
if (persistedBatch!.shouldFlushTransaction()) {
// Transaction is getting big.
// Flush, and resume in a new transaction.
await persistedBatch!.flush(this.db, this.session, options);
const { flushedAny } = await persistedBatch!.flush(this.db, this.session, options);
didFlush ||= flushedAny;
persistedBatch = null;
// Computing our current progress is a little tricky here, since
// we're stopping in the middle of a batch.
Expand All @@ -303,10 +307,15 @@ export class MongoBucketBatch

if (persistedBatch) {
transactionSize = persistedBatch.currentSize;
await persistedBatch.flush(this.db, this.session, options);
const { flushedAny } = await persistedBatch.flush(this.db, this.session, options);
didFlush ||= flushedAny;
}
}

if (didFlush) {
await this.clearError();
}

return resumeBatch?.hasData() ? resumeBatch : null;
}

Expand Down Expand Up @@ -714,6 +723,7 @@ export class MongoBucketBatch
last_keepalive_ts: now,
snapshot_done: true,
last_fatal_error: null,
last_fatal_error_ts: null,
keepalive_op: null
};

Expand Down Expand Up @@ -848,6 +858,7 @@ export class MongoBucketBatch
last_checkpoint_lsn: lsn,
snapshot_done: true,
last_fatal_error: null,
last_fatal_error_ts: null,
last_keepalive_ts: new Date()
},
$unset: { snapshot_lsn: 1 }
Expand Down Expand Up @@ -1075,6 +1086,26 @@ export class MongoBucketBatch
});
}

protected async clearError(): Promise<void> {
// No need to clear an error more than once per batch, since an error would always result in restarting the batch.
if (this.clearedError) {
return;
}

await this.db.sync_rules.updateOne(
{
_id: this.group_id
},
{
$set: {
last_fatal_error: null,
last_fatal_error_ts: null
}
}
);
this.clearedError = true;
}

/**
* Gets relevant {@link SqlEventDescriptor}s for the given {@link SourceTable}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export class MongoPersistedSyncRulesContent implements storage.PersistedSyncRule
public readonly sync_rules_content: string;
public readonly last_checkpoint_lsn: string | null;
public readonly last_fatal_error: string | null;
public readonly last_fatal_error_ts: Date | null;
public readonly last_keepalive_ts: Date | null;
public readonly last_checkpoint_ts: Date | null;
public readonly active: boolean;
Expand All @@ -29,6 +30,7 @@ export class MongoPersistedSyncRulesContent implements storage.PersistedSyncRule
// Handle legacy values
this.slot_name = doc.slot_name ?? `powersync_${this.id}`;
this.last_fatal_error = doc.last_fatal_error;
this.last_fatal_error_ts = doc.last_fatal_error_ts;
this.last_checkpoint_ts = doc.last_checkpoint_ts;
this.last_keepalive_ts = doc.last_keepalive_ts;
this.active = doc.state == 'ACTIVE';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import { MongoParameterCompactor } from './MongoParameterCompactor.js';
import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js';
import { idPrefixFilter, mapOpEntry, readSingleBatch, setSessionSnapshotTime } from '../../utils/util.js';


export interface MongoSyncBucketStorageOptions {
checksumOptions?: MongoChecksumOptions;
}
Expand Down Expand Up @@ -648,11 +647,11 @@ export class MongoSyncBucketStorage
},
{
$set: {
last_fatal_error: message
last_fatal_error: message,
last_fatal_error_ts: new Date()
}
}
);
await this.db.notifyCheckpoint();
}

async compact(options?: storage.CompactOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,12 +351,21 @@ export class PersistedBatch {
}
}

const stats = {
bucketDataCount: this.bucketData.length,
parameterDataCount: this.bucketParameters.length,
currentDataCount: this.currentData.length,
flushedAny: flushedSomething
};

this.bucketData = [];
this.bucketParameters = [];
this.currentData = [];
this.bucketStates.clear();
this.currentSize = 0;
this.debugLastOpId = null;

return stats;
}

private getBucketStateUpdates(): mongo.AnyBulkWriteOperation<BucketStateDocument>[] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ export interface SyncRuleDocument {
*/
last_fatal_error: string | null;

last_fatal_error_ts: Date | null;

content: string;

lock?: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export class PostgresBucketBatch
private lastWaitingLogThrottled = 0;
private markRecordUnavailable: BucketStorageMarkRecordUnavailable | undefined;
private needsActivation = true;
private clearedError = false;

constructor(protected options: PostgresBucketBatchOptions) {
super();
Expand Down Expand Up @@ -213,7 +214,10 @@ export class PostgresBucketBatch
});
}
}
await persistedBatch.flush(db);
const { flushedAny } = await persistedBatch.flush(db);
if (flushedAny) {
await this.clearError(db);
}
});
}
if (processedCount == 0) {
Expand Down Expand Up @@ -570,6 +574,7 @@ export class PostgresBucketBatch

// If set, we need to start a new transaction with this batch.
let resumeBatch: OperationBatch | null = null;
let didFlush = false;

// Now batch according to the sizes
// This is a single batch if storeCurrentData == false
Expand Down Expand Up @@ -654,7 +659,8 @@ export class PostgresBucketBatch
}

if (persistedBatch!.shouldFlushTransaction()) {
await persistedBatch!.flush(db);
const { flushedAny } = await persistedBatch!.flush(db);
didFlush ||= flushedAny;
// The operations stored in this batch will be processed in the `resumeBatch`
persistedBatch = null;
// Return the remaining entries for the next resume transaction
Expand All @@ -667,10 +673,15 @@ export class PostgresBucketBatch
* The operations were less than the max size if here. Flush now.
* `persistedBatch` will be `null` if the operations should be flushed in a new transaction.
*/
await persistedBatch.flush(db);
const { flushedAny } = await persistedBatch.flush(db);
didFlush ||= flushedAny;
}
}

if (didFlush) {
await this.clearError(db);
}

// Don't return empty batches
if (resumeBatch?.batch.length) {
return resumeBatch;
Expand Down Expand Up @@ -1006,6 +1017,24 @@ export class PostgresBucketBatch
}
}

protected async clearError(
db: lib_postgres.AbstractPostgresConnection | lib_postgres.DatabaseClient = this.db
): Promise<void> {
// No need to clear an error more than once per batch, since an error would always result in restarting the batch.
if (this.clearedError) {
return;
}

await db.sql`
UPDATE sync_rules
SET
last_fatal_error = ${{ type: 'varchar', value: null }}
WHERE
id = ${{ type: 'int4', value: this.group_id }}
`.execute();
this.clearedError = true;
}

private async getLastOpIdSequence(db: lib_postgres.AbstractPostgresConnection) {
// When no op_id has been generated, last_value = 1 and nextval() will be 1.
// To cater for this case, we check is_called, and default to 0 if no value has been generated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,13 @@ export class PostgresPersistedBatch {
}

async flush(db: lib_postgres.WrappedConnection) {
const stats = {
bucketDataCount: this.bucketDataInserts.length,
parameterDataCount: this.parameterDataInserts.length,
currentDataCount: this.currentDataInserts.size + this.currentDataDeletes.length
};
const flushedAny = stats.bucketDataCount > 0 || stats.parameterDataCount > 0 || stats.currentDataCount > 0;

logger.info(
`powersync_${this.group_id} Flushed ${this.bucketDataInserts.length} + ${this.parameterDataInserts.length} + ${
this.currentDataInserts.size + this.currentDataDeletes.length
Expand All @@ -251,6 +258,11 @@ export class PostgresPersistedBatch {
this.currentDataDeletes = [];
this.currentDataInserts = new Map();
this.currentSize = 0;

return {
...stats,
flushedAny
};
}

protected async flushBucketData(db: lib_postgres.WrappedConnection) {
Expand Down
31 changes: 20 additions & 11 deletions packages/service-core/src/api/diagnostics.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { logger } from '@powersync/lib-services-framework';
import { DEFAULT_TAG, SourceTableInterface, SqlSyncRules } from '@powersync/service-sync-rules';
import { SyncRulesStatus, TableInfo } from '@powersync/service-types';
import { ReplicationError, SyncRulesStatus, TableInfo } from '@powersync/service-types';

import * as storage from '../storage/storage-index.js';
import { RouteAPI } from './RouteAPI.js';
Expand Down Expand Up @@ -39,6 +39,7 @@ export async function getSyncRulesStatus(
const include_content = options.include_content ?? false;
const live_status = options.live_status ?? false;
const check_connection = options.check_connection ?? false;
const now = new Date().toISOString();

let rules: SqlSyncRules;
let persisted: storage.PersistedSyncRules;
Expand All @@ -49,7 +50,7 @@ export async function getSyncRulesStatus(
return {
content: include_content ? sync_rules.sync_rules_content : undefined,
connections: [],
errors: [{ level: 'fatal', message: e.message }]
errors: [{ level: 'fatal', message: e.message, ts: now }]
};
}

Expand Down Expand Up @@ -99,7 +100,7 @@ export async function getSyncRulesStatus(
data_queries: false,
parameter_queries: false,
replication_id: [],
errors: [{ level: 'fatal', message: 'connection failed' }]
errors: [{ level: 'fatal', message: 'connection failed', ts: now }]
};
} else {
const source: SourceTableInterface = {
Expand All @@ -115,21 +116,26 @@ export async function getSyncRulesStatus(
data_queries: syncData,
parameter_queries: syncParameters,
replication_id: [],
errors: [{ level: 'fatal', message: 'connection failed' }]
errors: [{ level: 'fatal', message: 'connection failed', ts: now }]
};
}
});
}

const errors = tables_flat.flatMap((info) => info.errors);
if (sync_rules.last_fatal_error) {
errors.push({ level: 'fatal', message: sync_rules.last_fatal_error });
errors.push({
level: 'fatal',
message: sync_rules.last_fatal_error,
ts: sync_rules.last_fatal_error_ts?.toISOString()
});
}
errors.push(
...rules.errors.map((e) => {
return {
level: e.type,
message: e.message
message: e.message,
ts: now
};
})
);
Expand All @@ -140,7 +146,8 @@ export async function getSyncRulesStatus(
if (sync_rules.last_checkpoint_ts == null && sync_rules.last_keepalive_ts == null) {
errors.push({
level: 'warning',
message: 'No checkpoint found, cannot calculate replication lag'
message: 'No checkpoint found, cannot calculate replication lag',
ts: now
});
} else {
const lastTime = Math.max(
Expand All @@ -155,12 +162,14 @@ export async function getSyncRulesStatus(
if (lagSeconds > 15 * 60) {
errors.push({
level: 'fatal',
message: `No replicated commit in more than ${lagSeconds}s`
message: `No replicated commit in more than ${lagSeconds}s`,
ts: now
});
} else if (lagSeconds > 5 * 60) {
errors.push({
level: 'warning',
message: `No replicated commit in more than ${lagSeconds}s`
message: `No replicated commit in more than ${lagSeconds}s`,
ts: now
});
}
}
Expand All @@ -186,9 +195,9 @@ export async function getSyncRulesStatus(
};
}

function deduplicate(errors: { level: 'warning' | 'fatal'; message: string }[]) {
function deduplicate(errors: ReplicationError[]): ReplicationError[] {
let seen = new Set<string>();
let result: { level: 'warning' | 'fatal'; message: string }[] = [];
let result: ReplicationError[] = [];
for (let error of errors) {
const key = JSON.stringify(error);
if (seen.has(key)) {
Expand Down
Loading