Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b1344e7
cleanup persisted queries
stevensJourney Jan 20, 2025
7cea06b
minor cleanup
stevensJourney Jan 22, 2025
5aa55e9
Add support for same postgres cluster for replication and sync storage
stevensJourney Jan 28, 2025
c95608d
Merge remote-tracking branch 'origin/main' into pg-storage-2
stevensJourney Jan 28, 2025
b3894c9
add error code
stevensJourney Jan 28, 2025
37e9228
add error cause
stevensJourney Jan 28, 2025
e63a4e1
update to use replica set name
stevensJourney Jan 28, 2025
4f66e95
add unit test for streaming errors
stevensJourney Jan 29, 2025
8cda6ff
throw an error on initial replication if storage is incompatible
stevensJourney Jan 29, 2025
cc27649
update readme
stevensJourney Jan 29, 2025
bfe54fb
fix test
stevensJourney Jan 29, 2025
e9c5f3d
code cleanup
stevensJourney Jan 29, 2025
aad0fa1
wording update
stevensJourney Jan 29, 2025
aae2ae3
Merge remote-tracking branch 'origin/main' into pg-storage-2
stevensJourney Jan 29, 2025
031d7ac
cleanup
stevensJourney Jan 29, 2025
c7329bb
added changeset
stevensJourney Jan 29, 2025
ea5c3f8
fix notification bug
stevensJourney Jan 30, 2025
4c41510
cleanup keepAlive variables
stevensJourney Jan 30, 2025
5d6da80
allow using the same database for replication source and storage.
stevensJourney Jan 30, 2025
a65f03c
cleanup comments
stevensJourney Jan 30, 2025
4c9b61f
Merge remote-tracking branch 'origin/main' into pg-storage-2
stevensJourney Jan 30, 2025
f076c44
acknowledge more often when out of tx
stevensJourney Jan 30, 2025
834b990
share keepalive statement for write checkpoints
stevensJourney Jan 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/pretty-countries-cover.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/service-module-postgres-storage': minor
'@powersync/service-module-postgres': minor
---

Allowed using the same Postgres server for the replication source and sync bucket storage. This is only supported on Postgres versions newer than 14.0.
7 changes: 7 additions & 0 deletions .changeset/smart-chairs-smoke.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-module-postgres-storage': minor
'@powersync/service-module-mongodb-storage': minor
'@powersync/service-core': minor
---

Added the ability to skip creating empty sync checkpoints if no changes were present in a batch.
5 changes: 5 additions & 0 deletions .changeset/strong-tables-rescue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-module-postgres-storage': patch
---

Fix bug where listening to active checkpoint notifications on an ended connection could cause a crash.
27 changes: 6 additions & 21 deletions libs/lib-postgres/src/db/connection/ConnectionSlot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ export class ConnectionSlot extends framework.DisposableObserver<ConnectionSlotL
const connection = await this.connectingPromise;
this.connectingPromise = null;
await this.iterateAsyncListeners(async (l) => l.connectionCreated?.(connection));
if (this.hasNotificationListener()) {
await this.configureConnectionNotifications(connection);
}

/**
* Configure the Postgres connection to listen to notifications.
* Subscribing to notifications, even without a registered listener, should not add much overhead.
*/
await this.configureConnectionNotifications(connection);
return connection;
}

Expand All @@ -64,11 +67,6 @@ export class ConnectionSlot extends framework.DisposableObserver<ConnectionSlotL
}

protected async configureConnectionNotifications(connection: pgwire.PgConnection) {
if (connection.onnotification == this.handleNotification || this.closed == true) {
// Already configured
return;
}

connection.onnotification = this.handleNotification;

for (const channelName of this.options.notificationChannels ?? []) {
Expand All @@ -78,19 +76,6 @@ export class ConnectionSlot extends framework.DisposableObserver<ConnectionSlotL
}
}

registerListener(listener: Partial<ConnectionSlotListener>): () => void {
const dispose = super.registerListener(listener);
if (this.connection && this.hasNotificationListener()) {
this.configureConnectionNotifications(this.connection);
}
return () => {
dispose();
if (this.connection && !this.hasNotificationListener()) {
this.connection.onnotification = () => {};
}
};
}

protected handleNotification = (payload: pgwire.PgNotification) => {
if (!this.options.notificationChannels?.includes(payload.channel)) {
return;
Expand Down
9 changes: 6 additions & 3 deletions libs/lib-postgres/src/db/connection/DatabaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ export const TRANSACTION_CONNECTION_COUNT = 5;
export class DatabaseClient extends AbstractPostgresConnection<DatabaseClientListener> {
closed: boolean;

protected pool: pgwire.PgClient;
pool: pgwire.PgClient;

protected connections: ConnectionSlot[];

protected initialized: Promise<void>;
Expand All @@ -42,8 +43,10 @@ export class DatabaseClient extends AbstractPostgresConnection<DatabaseClientLis
super();
this.closed = false;
this.pool = pgwire.connectPgWirePool(options.config);
this.connections = Array.from({ length: TRANSACTION_CONNECTION_COUNT }, () => {
const slot = new ConnectionSlot({ config: options.config, notificationChannels: options.notificationChannels });
this.connections = Array.from({ length: TRANSACTION_CONNECTION_COUNT }, (v, index) => {
// Only listen to notifications on a single (the first) connection
const notificationChannels = index == 0 ? options.notificationChannels : [];
const slot = new ConnectionSlot({ config: options.config, notificationChannels });
slot.registerListener({
connectionAvailable: () => this.processConnectionQueue(),
connectionError: (ex) => this.handleConnectionError(ex),
Expand Down
38 changes: 38 additions & 0 deletions libs/lib-postgres/src/utils/identifier-utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import * as pgwire from '@powersync/service-jpgwire';
import { retriedQuery } from './pgwire_utils.js';

export interface DecodedPostgresIdentifier {
server_id: string;
database_name: string;
}

export const decodePostgresSystemIdentifier = (identifier: string): DecodedPostgresIdentifier => {
const [server_id, database_name] = identifier.split('.');
return { server_id, database_name };
};

export const encodePostgresSystemIdentifier = (decoded: DecodedPostgresIdentifier): string => {
return `${decoded.server_id}.${decoded.database_name}`;
};

export const queryPostgresSystemIdentifier = async (
connection: pgwire.PgClient
): Promise<DecodedPostgresIdentifier> => {
const result = pgwire.pgwireRows(
await retriedQuery(
connection,
/* sql */ `
SELECT
current_database() AS database_name,
system_identifier
FROM
pg_control_system();
`
)
) as Array<{ database_name: string; system_identifier: bigint }>;

return {
database_name: result[0].database_name,
server_id: result[0].system_identifier.toString()
};
};
1 change: 1 addition & 0 deletions libs/lib-postgres/src/utils/utils-index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './identifier-utils.js';
export * from './pgwire_utils.js';
17 changes: 17 additions & 0 deletions modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,23 @@ export class MongoBucketStorage
return storage;
}

async getSystemIdentifier(): Promise<storage.BucketStorageSystemIdentifier> {
const { setName: id } = await this.db.db.command({
hello: 1
});
if (id == null) {
throw new ServiceError(
ErrorCode.PSYNC_S1342,
'Standalone MongoDB instances are not supported - use a replicaset.'
);
}

return {
id,
type: lib_mongo.MONGO_CONNECTION_TYPE
};
}

async configureSyncRules(sync_rules: string, options?: { lock?: boolean }) {
const next = await this.getNextSyncRulesContent();
const active = await this.getActiveSyncRulesContent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import {
errors,
logger,
ReplicationAssertionError,
ServiceAssertionError,
ServiceError
} from '@powersync/lib-services-framework';
import { SaveOperationTag, storage, utils } from '@powersync/service-core';
Expand Down Expand Up @@ -616,7 +615,9 @@ export class MongoBucketBatch

private lastWaitingLogThottled = 0;

async commit(lsn: string): Promise<boolean> {
async commit(lsn: string, options?: storage.BucketBatchCommitOptions): Promise<boolean> {
const { createEmptyCheckpoints } = { ...storage.DEFAULT_BUCKET_BATCH_COMMIT_OPTIONS, ...options };

await this.flush();

if (this.last_checkpoint_lsn != null && lsn < this.last_checkpoint_lsn) {
Expand Down Expand Up @@ -654,6 +655,10 @@ export class MongoBucketBatch
return false;
}

if (!createEmptyCheckpoints && this.persisted_op == null) {
return false;
}

const now = new Date();
const update: Partial<SyncRuleDocument> = {
last_checkpoint_lsn: lsn,
Expand Down
8 changes: 7 additions & 1 deletion modules/module-postgres-storage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ storage:
```

**IMPORTANT**:
A separate Postgres server is currently required for replication connections (if using Postgres for replication) and storage. Using the same server might cause unexpected results.

Separate Postgres servers are required for replication connections **if using PostgreSQL versions below 14**.

| PostgreSQL Version | Server configuration |
| ------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| Below 14 | Separate servers are required for the source and sync bucket storage. Replication will be blocked if the same server is detected. |
| 14 and above | The source database and sync bucket storage database can be on the same server. Using the same database (with separate schemas) is supported but may lead to higher CPU usage. Using separate servers remains an option. |

### Connection credentials

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,17 @@ export class PostgresBucketStorageFactory
return newInstanceRow!.id;
}

async getSystemIdentifier(): Promise<storage.BucketStorageSystemIdentifier> {
const id = lib_postgres.utils.encodePostgresSystemIdentifier(
await lib_postgres.utils.queryPostgresSystemIdentifier(this.db.pool)
);

return {
id,
type: lib_postgres.POSTGRES_CONNECTION_TYPE
};
}

// TODO possibly share implementation in abstract class
async configureSyncRules(
sync_rules: string,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ export class PostgresBucketBatch

async save(record: storage.SaveOptions): Promise<storage.FlushedResult | null> {
// TODO maybe share with abstract class
const { after, afterReplicaId, before, beforeReplicaId, sourceTable, tag } = record;
const { after, before, sourceTable, tag } = record;
for (const event of this.getTableEvents(sourceTable)) {
this.iterateListeners((cb) =>
cb.replicationEvent?.({
Expand Down Expand Up @@ -245,7 +245,10 @@ export class PostgresBucketBatch

private async flushInner(): Promise<storage.FlushedResult | null> {
const batch = this.batch;
if (batch == null) {
// Don't flush empty batches
// This helps prevent feedback loops when using the same database for
// the source data and sync bucket storage
if (batch == null || batch.length == 0) {
return null;
}

Expand Down Expand Up @@ -275,7 +278,9 @@ export class PostgresBucketBatch
return { flushed_op: String(lastOp) };
}

async commit(lsn: string): Promise<boolean> {
async commit(lsn: string, options?: storage.BucketBatchCommitOptions): Promise<boolean> {
const { createEmptyCheckpoints } = { ...storage.DEFAULT_BUCKET_BATCH_COMMIT_OPTIONS, ...options };

await this.flush();

if (this.last_checkpoint_lsn != null && lsn < this.last_checkpoint_lsn) {
Expand Down Expand Up @@ -309,6 +314,12 @@ export class PostgresBucketBatch

return false;
}

// Don't create a checkpoint if there were no changes
if (!createEmptyCheckpoints && this.persisted_op == null) {
return false;
}

const now = new Date().toISOString();
const update: Partial<models.SyncRules> = {
last_checkpoint_lsn: lsn,
Expand Down Expand Up @@ -488,7 +499,7 @@ export class PostgresBucketBatch
jsonb_array_elements(${{ type: 'jsonb', value: sizeLookups }}::jsonb) AS FILTER
)
SELECT
pg_column_size(c.data) AS data_size,
octet_length(c.data) AS data_size,
c.source_table,
c.source_key
FROM
Expand Down Expand Up @@ -529,31 +540,28 @@ export class PostgresBucketBatch
const current_data_lookup = new Map<string, CurrentDataDecoded>();
for await (const currentDataRows of db.streamRows<CurrentData>({
statement: /* sql */ `
WITH
filter_data AS (
SELECT
decode(FILTER ->> 'source_key', 'hex') AS source_key, -- Decoding from hex to bytea
(FILTER ->> 'source_table') AS source_table_id
FROM
jsonb_array_elements($1::jsonb) AS FILTER
)
SELECT
--- With skipExistingRows, we only need to know whether or not the row exists.
${this.options.skip_existing_rows ? `c.source_table, c.source_key` : 'c.*'}
FROM
current_data c
JOIN filter_data f ON c.source_table = f.source_table_id
JOIN (
SELECT
decode(FILTER ->> 'source_key', 'hex') AS source_key,
FILTER ->> 'source_table' AS source_table_id
FROM
jsonb_array_elements($1::jsonb) AS FILTER
) f ON c.source_table = f.source_table_id
AND c.source_key = f.source_key
WHERE
c.group_id = $2
c.group_id = $2;
`,
params: [
{
type: 'jsonb',
value: lookups
},
{
type: 'int8',
type: 'int4',
value: this.group_id
}
]
Expand Down Expand Up @@ -610,7 +618,12 @@ export class PostgresBucketBatch
await persistedBatch.flush(db);
}
}
return resumeBatch;

// Don't return empty batches
if (resumeBatch?.batch.length) {
return resumeBatch;
}
return null;
}

protected async saveOperation(
Expand Down
Loading