Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/shiny-panthers-ring.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@powersync/service-module-postgres': patch
'@powersync/service-sync-rules': patch
'@powersync/service-core': patch
'@powersync/service-image': patch
---

[Postgres] Fix custom type parsing on initial replication
2 changes: 1 addition & 1 deletion modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export class PostgresRouteAPIAdapter implements api.RouteAPI {
connectionTag?: string,
private config?: types.ResolvedConnectionConfig
) {
this.typeCache = new PostgresTypeResolver(config?.typeRegistry ?? new CustomTypeRegistry(), pool);
this.typeCache = new PostgresTypeResolver(pool);
this.connectionTag = connectionTag ?? sync_rules.DEFAULT_TAG;
}

Expand Down
13 changes: 4 additions & 9 deletions modules/module-postgres/src/module/PostgresModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import { getApplicationName } from '../utils/application-name.js';
import { CustomTypeRegistry } from '../types/registry.js';

export class PostgresModule extends replication.ReplicationModule<types.PostgresConnectionConfig> {
private customTypes: CustomTypeRegistry = new CustomTypeRegistry();

constructor() {
super({
name: 'Postgres',
Expand Down Expand Up @@ -51,7 +49,7 @@ export class PostgresModule extends replication.ReplicationModule<types.Postgres
protected createReplicator(context: system.ServiceContext): replication.AbstractReplicator {
const normalisedConfig = this.resolveConfig(this.decodedConfig!);
const syncRuleProvider = new ConfigurationFileSyncRulesProvider(context.configuration.sync_rules);
const connectionFactory = new ConnectionManagerFactory(normalisedConfig, this.customTypes);
const connectionFactory = new ConnectionManagerFactory(normalisedConfig);

return new WalStreamReplicator({
id: this.getDefaultId(normalisedConfig.database),
Expand All @@ -69,8 +67,7 @@ export class PostgresModule extends replication.ReplicationModule<types.Postgres
private resolveConfig(config: types.PostgresConnectionConfig): types.ResolvedConnectionConfig {
return {
...config,
...types.normalizeConnectionConfig(config),
typeRegistry: this.customTypes
...types.normalizeConnectionConfig(config)
};
}

Expand All @@ -79,8 +76,7 @@ export class PostgresModule extends replication.ReplicationModule<types.Postgres
const connectionManager = new PgManager(normalisedConfig, {
idleTimeout: 30_000,
maxSize: 1,
applicationName: getApplicationName(),
registry: this.customTypes
applicationName: getApplicationName()
});

try {
Expand Down Expand Up @@ -111,8 +107,7 @@ export class PostgresModule extends replication.ReplicationModule<types.Postgres
const connectionManager = new PgManager(normalizedConfig, {
idleTimeout: 30_000,
maxSize: 1,
applicationName: getApplicationName(),
registry: new CustomTypeRegistry()
applicationName: getApplicationName()
});
const connection = await connectionManager.snapshotConnection();
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
import { PgManager } from './PgManager.js';
import { NormalizedPostgresConnectionConfig } from '../types/types.js';
import { PgPoolOptions } from '@powersync/service-jpgwire';
import { logger } from '@powersync/lib-services-framework';
import { CustomTypeRegistry } from '../types/registry.js';
import { PgPoolOptions } from '@powersync/service-jpgwire';
import { NormalizedPostgresConnectionConfig } from '../types/types.js';
import { PgManager } from './PgManager.js';

export class ConnectionManagerFactory {
private readonly connectionManagers = new Set<PgManager>();
public readonly dbConnectionConfig: NormalizedPostgresConnectionConfig;

constructor(
dbConnectionConfig: NormalizedPostgresConnectionConfig,
private readonly registry: CustomTypeRegistry
) {
constructor(dbConnectionConfig: NormalizedPostgresConnectionConfig) {
this.dbConnectionConfig = dbConnectionConfig;
}

create(poolOptions: PgPoolOptions) {
const manager = new PgManager(this.dbConnectionConfig, { ...poolOptions, registry: this.registry });
const manager = new PgManager(this.dbConnectionConfig, { ...poolOptions });
this.connectionManagers.add(manager);

manager.registerListener({
Expand Down
11 changes: 4 additions & 7 deletions modules/module-postgres/src/replication/PgManager.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import { BaseObserver } from '@powersync/lib-services-framework';
import * as pgwire from '@powersync/service-jpgwire';
import semver from 'semver';
import { PostgresTypeResolver } from '../types/resolver.js';
import { NormalizedPostgresConnectionConfig } from '../types/types.js';
import { getApplicationName } from '../utils/application-name.js';
import { PostgresTypeResolver } from '../types/resolver.js';
import { getServerVersion } from '../utils/postgres_version.js';
import { CustomTypeRegistry } from '../types/registry.js';
import { BaseObserver } from '@powersync/lib-services-framework';

export interface PgManagerOptions extends pgwire.PgPoolOptions {
registry: CustomTypeRegistry;
}
export interface PgManagerOptions extends pgwire.PgPoolOptions {}

/**
* Shorter timeout for snapshot connections than for replication connections.
Expand Down Expand Up @@ -37,7 +34,7 @@ export class PgManager extends BaseObserver<PgManagerListener> {
super();
// The pool is lazy - no connections are opened until a query is performed.
this.pool = pgwire.connectPgWirePool(this.options, poolOptions);
this.types = new PostgresTypeResolver(poolOptions.registry, this.pool);
this.types = new PostgresTypeResolver(this.pool);
}

public get connectionTag() {
Expand Down
6 changes: 5 additions & 1 deletion modules/module-postgres/src/replication/WalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ WHERE oid = $1::regclass`,
await q.initialize();

let columns: { i: number; name: string }[] = [];
let columnMap: Record<string, number> = {};
let hasRemainingData = true;
while (hasRemainingData) {
// Fetch 10k at a time.
Expand All @@ -565,6 +566,9 @@ WHERE oid = $1::regclass`,
columns = chunk.payload.map((c) => {
return { i: i++, name: c.name };
});
for (let column of chunk.payload) {
columnMap[column.name] = column.typeOid;
}
continue;
}

Expand All @@ -580,7 +584,7 @@ WHERE oid = $1::regclass`,
}

for (const inputRecord of WalStream.getQueryData(rows)) {
const record = this.syncRulesRecord(inputRecord);
const record = this.syncRulesRecord(this.connections.types.constructRowRecord(columnMap, inputRecord));
// This auto-flushes when the batch reaches its size limit
await batch.save({
tag: storage.SaveOperationTag.INSERT,
Expand Down
33 changes: 27 additions & 6 deletions modules/module-postgres/src/types/resolver.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { DatabaseInputRow, SqliteInputRow, toSyncRulesRow } from '@powersync/service-sync-rules';
import * as pgwire from '@powersync/service-jpgwire';
import { CustomTypeRegistry } from './registry.js';
import { DatabaseInputRow, SqliteInputRow, toSyncRulesRow } from '@powersync/service-sync-rules';
import semver from 'semver';
import { getServerVersion } from '../utils/postgres_version.js';
import { CustomTypeRegistry } from './registry.js';

/**
* Resolves descriptions used to decode values for custom postgres types.
Expand All @@ -11,11 +11,9 @@ import { getServerVersion } from '../utils/postgres_version.js';
*/
export class PostgresTypeResolver {
private cachedVersion: semver.SemVer | null = null;
readonly registry: CustomTypeRegistry;

constructor(
readonly registry: CustomTypeRegistry,
private readonly pool: pgwire.PgClient
) {
constructor(private readonly pool: pgwire.PgClient) {
this.registry = new CustomTypeRegistry();
}

Expand Down Expand Up @@ -188,6 +186,11 @@ WHERE a.attnum > 0
return toSyncRulesRow(record);
}

constructRowRecord(columnMap: Record<string, number>, tupleRaw: Record<string, any>): SqliteInputRow {
const record = this.decodeTupleForTable(columnMap, tupleRaw);
return toSyncRulesRow(record);
}

/**
* We need a high level of control over how values are decoded, to make sure there is no loss
* of precision in the process.
Expand All @@ -206,5 +209,23 @@ WHERE a.attnum > 0
return result;
}

/**
* We need a high level of control over how values are decoded, to make sure there is no loss
* of precision in the process.
*/
private decodeTupleForTable(columnMap: Record<string, number>, tupleRaw: Record<string, any>): DatabaseInputRow {
let result: Record<string, any> = {};
for (let columnName in tupleRaw) {
const rawval = tupleRaw[columnName];
const typeOid = columnMap[columnName];
if (typeof rawval == 'string' && typeOid) {
result[columnName] = this.registry.decodeDatabaseValue(rawval, typeOid);
} else {
result[columnName] = rawval;
}
}
return result;
}

private static minVersionForMultirange: semver.SemVer = semver.parse('14.0.0')!;
}
6 changes: 1 addition & 5 deletions modules/module-postgres/src/types/types.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import * as lib_postgres from '@powersync/lib-service-postgres';
import * as service_types from '@powersync/service-types';
import * as t from 'ts-codec';
import { CustomTypeRegistry } from './registry.js';

// Maintain backwards compatibility by exporting these
export const validatePort = lib_postgres.validatePort;
Expand All @@ -25,10 +24,7 @@ export type PostgresConnectionConfig = t.Decoded<typeof PostgresConnectionConfig
/**
* Resolved version of {@link PostgresConnectionConfig}
*/
export type ResolvedConnectionConfig = PostgresConnectionConfig &
NormalizedPostgresConnectionConfig & {
typeRegistry: CustomTypeRegistry;
};
export type ResolvedConnectionConfig = PostgresConnectionConfig & NormalizedPostgresConnectionConfig;

export function isPostgresConfig(
config: service_types.configFile.DataSourceConfig
Expand Down
4 changes: 2 additions & 2 deletions modules/module-postgres/test/src/pg_test.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'
test('test replication - multiranges', async () => {
const db = await connectPgPool();

if (!(await new PostgresTypeResolver(new CustomTypeRegistry(), db).supportsMultiRanges())) {
if (!(await new PostgresTypeResolver(db).supportsMultiRanges())) {
// This test requires Postgres 14 or later.
return;
}
Expand Down Expand Up @@ -620,7 +620,7 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'
* Return all the inserts from the first transaction in the replication stream.
*/
async function getReplicationTx(db: pgwire.PgClient, replicationStream: pgwire.ReplicationStream) {
const typeCache = new PostgresTypeResolver(new CustomTypeRegistry(), db);
const typeCache = new PostgresTypeResolver(db);
await typeCache.fetchTypesForSchema();

let transformed: SqliteInputRow[] = [];
Expand Down
4 changes: 2 additions & 2 deletions modules/module-postgres/test/src/slow_tests.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ function defineSlowTests(factory: storage.TestStorageFactory) {
});

async function testRepeatedReplication(testOptions: { compact: boolean; maxBatchSize: number; numBatches: number }) {
const connections = new PgManager(TEST_CONNECTION_OPTIONS, { registry: new CustomTypeRegistry() });
const connections = new PgManager(TEST_CONNECTION_OPTIONS, {});
const replicationConnection = await connections.replicationConnection();
const pool = connections.pool;
await clearTestDb(pool);
Expand Down Expand Up @@ -330,7 +330,7 @@ bucket_definitions:
await pool.query(`SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE active = FALSE`);
i += 1;

const connections = new PgManager(TEST_CONNECTION_OPTIONS, { registry: new CustomTypeRegistry() });
const connections = new PgManager(TEST_CONNECTION_OPTIONS, {});
const replicationConnection = await connections.replicationConnection();

abortController = new AbortController();
Expand Down
17 changes: 14 additions & 3 deletions modules/module-postgres/test/src/wal_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -529,13 +529,24 @@ config:
const { pool } = context;
await pool.query(`DROP TABLE IF EXISTS test_data`);
await pool.query(`CREATE TYPE composite AS (foo bool, bar int4);`);
await pool.query(`CREATE TABLE test_data(id text primary key, description composite);`);
await pool.query(`CREATE TABLE test_data(id text primary key, description composite, ts timestamptz);`);

// Covered by initial replication
await pool.query(
`INSERT INTO test_data(id, description, ts) VALUES ('t1', ROW(TRUE, 1)::composite, '2025-11-17T09:11:00Z')`
);

await context.initializeReplication();
await pool.query(`INSERT INTO test_data(id, description) VALUES ('t1', ROW(TRUE, 2)::composite)`);
// Covered by streaming replication
await pool.query(
`INSERT INTO test_data(id, description, ts) VALUES ('t2', ROW(TRUE, 2)::composite, '2025-11-17T09:12:00Z')`
);

const data = await context.getBucketData('1#stream|0[]');
expect(data).toMatchObject([putOp('test_data', { id: 't1', description: '{"foo":1,"bar":2}' })]);
expect(data).toMatchObject([
putOp('test_data', { id: 't1', description: '{"foo":1,"bar":1}', ts: '2025-11-17T09:11:00.000000Z' }),
putOp('test_data', { id: 't2', description: '{"foo":1,"bar":2}', ts: '2025-11-17T09:12:00.000000Z' })
]);
});

test('custom types in primary key', async () => {
Expand Down
2 changes: 1 addition & 1 deletion modules/module-postgres/test/src/wal_stream_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export class WalStreamTestContext implements AsyncDisposable {
options?: { doNotClear?: boolean; walStreamOptions?: Partial<WalStreamOptions> }
) {
const f = await factory({ doNotClear: options?.doNotClear });
const connectionManager = new PgManager(TEST_CONNECTION_OPTIONS, { registry: new CustomTypeRegistry() });
const connectionManager = new PgManager(TEST_CONNECTION_OPTIONS, {});

if (!options?.doNotClear) {
await clearTestDb(connectionManager.pool);
Expand Down
11 changes: 5 additions & 6 deletions packages/sync-rules/src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
import { JSONBig, JsonContainer, Replacer, stringifyRaw } from '@powersync/service-jsonbig';
import { SelectFromStatement, Statement } from 'pgsql-ast-parser';
import { CompatibilityContext } from './compatibility.js';
import { SyncRuleProcessingError as SyncRulesProcessingError } from './errors.js';
import { SQLITE_FALSE, SQLITE_TRUE } from './sql_support.js';
import {
BucketIdTransformer,
DatabaseInputRow,
DatabaseInputValue,
SqliteInputValue,
SqliteInputRow,
SqliteInputValue,
SqliteJsonRow,
SqliteJsonValue,
SqliteRow,
SqliteValue,
BucketIdTransformer,
ToastableSqliteRow
SqliteValue
} from './types.js';
import { SyncRuleProcessingError as SyncRulesProcessingError } from './errors.js';
import { CustomArray, CustomObject, CustomSqliteValue } from './types/custom_sqlite_value.js';
import { CompatibilityContext } from './compatibility.js';

export function isSelectStatement(q: Statement): q is SelectFromStatement {
return q.type == 'select';
Expand Down