diff --git a/.changeset/shiny-panthers-ring.md b/.changeset/shiny-panthers-ring.md new file mode 100644 index 000000000..ce15592f9 --- /dev/null +++ b/.changeset/shiny-panthers-ring.md @@ -0,0 +1,8 @@ +--- +'@powersync/service-module-postgres': patch +'@powersync/service-sync-rules': patch +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +[Postgres] Fix custom type parsing on initial replication diff --git a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts index c40eddc7a..6275676ba 100644 --- a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts +++ b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts @@ -34,7 +34,7 @@ export class PostgresRouteAPIAdapter implements api.RouteAPI { connectionTag?: string, private config?: types.ResolvedConnectionConfig ) { - this.typeCache = new PostgresTypeResolver(config?.typeRegistry ?? new CustomTypeRegistry(), pool); + this.typeCache = new PostgresTypeResolver(pool); this.connectionTag = connectionTag ?? sync_rules.DEFAULT_TAG; } diff --git a/modules/module-postgres/src/module/PostgresModule.ts b/modules/module-postgres/src/module/PostgresModule.ts index 48aefd9f5..6065dacc8 100644 --- a/modules/module-postgres/src/module/PostgresModule.ts +++ b/modules/module-postgres/src/module/PostgresModule.ts @@ -22,8 +22,6 @@ import { getApplicationName } from '../utils/application-name.js'; import { CustomTypeRegistry } from '../types/registry.js'; export class PostgresModule extends replication.ReplicationModule { - private customTypes: CustomTypeRegistry = new CustomTypeRegistry(); - constructor() { super({ name: 'Postgres', @@ -51,7 +49,7 @@ export class PostgresModule extends replication.ReplicationModule(); public readonly dbConnectionConfig: NormalizedPostgresConnectionConfig; - constructor( - dbConnectionConfig: NormalizedPostgresConnectionConfig, - private readonly registry: CustomTypeRegistry - ) { + constructor(dbConnectionConfig: NormalizedPostgresConnectionConfig) { this.dbConnectionConfig = dbConnectionConfig; } create(poolOptions: PgPoolOptions) { - const manager = new PgManager(this.dbConnectionConfig, { ...poolOptions, registry: this.registry }); + const manager = new PgManager(this.dbConnectionConfig, { ...poolOptions }); this.connectionManagers.add(manager); manager.registerListener({ diff --git a/modules/module-postgres/src/replication/PgManager.ts b/modules/module-postgres/src/replication/PgManager.ts index 8bcd7ab4f..79d820092 100644 --- a/modules/module-postgres/src/replication/PgManager.ts +++ b/modules/module-postgres/src/replication/PgManager.ts @@ -1,15 +1,12 @@ +import { BaseObserver } from '@powersync/lib-services-framework'; import * as pgwire from '@powersync/service-jpgwire'; import semver from 'semver'; +import { PostgresTypeResolver } from '../types/resolver.js'; import { NormalizedPostgresConnectionConfig } from '../types/types.js'; import { getApplicationName } from '../utils/application-name.js'; -import { PostgresTypeResolver } from '../types/resolver.js'; import { getServerVersion } from '../utils/postgres_version.js'; -import { CustomTypeRegistry } from '../types/registry.js'; -import { BaseObserver } from '@powersync/lib-services-framework'; -export interface PgManagerOptions extends pgwire.PgPoolOptions { - registry: CustomTypeRegistry; -} +export interface PgManagerOptions extends pgwire.PgPoolOptions {} /** * Shorter timeout for snapshot connections than for replication connections. @@ -37,7 +34,7 @@ export class PgManager extends BaseObserver { super(); // The pool is lazy - no connections are opened until a query is performed. this.pool = pgwire.connectPgWirePool(this.options, poolOptions); - this.types = new PostgresTypeResolver(poolOptions.registry, this.pool); + this.types = new PostgresTypeResolver(this.pool); } public get connectionTag() { diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 5dca68d91..a68cf820d 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -546,6 +546,7 @@ WHERE oid = $1::regclass`, await q.initialize(); let columns: { i: number; name: string }[] = []; + let columnMap: Record = {}; let hasRemainingData = true; while (hasRemainingData) { // Fetch 10k at a time. @@ -565,6 +566,9 @@ WHERE oid = $1::regclass`, columns = chunk.payload.map((c) => { return { i: i++, name: c.name }; }); + for (let column of chunk.payload) { + columnMap[column.name] = column.typeOid; + } continue; } @@ -580,7 +584,7 @@ WHERE oid = $1::regclass`, } for (const inputRecord of WalStream.getQueryData(rows)) { - const record = this.syncRulesRecord(inputRecord); + const record = this.syncRulesRecord(this.connections.types.constructRowRecord(columnMap, inputRecord)); // This auto-flushes when the batch reaches its size limit await batch.save({ tag: storage.SaveOperationTag.INSERT, diff --git a/modules/module-postgres/src/types/resolver.ts b/modules/module-postgres/src/types/resolver.ts index 694b53537..eb8ba59e8 100644 --- a/modules/module-postgres/src/types/resolver.ts +++ b/modules/module-postgres/src/types/resolver.ts @@ -1,8 +1,8 @@ -import { DatabaseInputRow, SqliteInputRow, toSyncRulesRow } from '@powersync/service-sync-rules'; import * as pgwire from '@powersync/service-jpgwire'; -import { CustomTypeRegistry } from './registry.js'; +import { DatabaseInputRow, SqliteInputRow, toSyncRulesRow } from '@powersync/service-sync-rules'; import semver from 'semver'; import { getServerVersion } from '../utils/postgres_version.js'; +import { CustomTypeRegistry } from './registry.js'; /** * Resolves descriptions used to decode values for custom postgres types. @@ -11,11 +11,9 @@ import { getServerVersion } from '../utils/postgres_version.js'; */ export class PostgresTypeResolver { private cachedVersion: semver.SemVer | null = null; + readonly registry: CustomTypeRegistry; - constructor( - readonly registry: CustomTypeRegistry, - private readonly pool: pgwire.PgClient - ) { + constructor(private readonly pool: pgwire.PgClient) { this.registry = new CustomTypeRegistry(); } @@ -188,6 +186,11 @@ WHERE a.attnum > 0 return toSyncRulesRow(record); } + constructRowRecord(columnMap: Record, tupleRaw: Record): SqliteInputRow { + const record = this.decodeTupleForTable(columnMap, tupleRaw); + return toSyncRulesRow(record); + } + /** * We need a high level of control over how values are decoded, to make sure there is no loss * of precision in the process. @@ -206,5 +209,23 @@ WHERE a.attnum > 0 return result; } + /** + * We need a high level of control over how values are decoded, to make sure there is no loss + * of precision in the process. + */ + private decodeTupleForTable(columnMap: Record, tupleRaw: Record): DatabaseInputRow { + let result: Record = {}; + for (let columnName in tupleRaw) { + const rawval = tupleRaw[columnName]; + const typeOid = columnMap[columnName]; + if (typeof rawval == 'string' && typeOid) { + result[columnName] = this.registry.decodeDatabaseValue(rawval, typeOid); + } else { + result[columnName] = rawval; + } + } + return result; + } + private static minVersionForMultirange: semver.SemVer = semver.parse('14.0.0')!; } diff --git a/modules/module-postgres/src/types/types.ts b/modules/module-postgres/src/types/types.ts index 3fd3c9fbb..4de2ac0ed 100644 --- a/modules/module-postgres/src/types/types.ts +++ b/modules/module-postgres/src/types/types.ts @@ -1,7 +1,6 @@ import * as lib_postgres from '@powersync/lib-service-postgres'; import * as service_types from '@powersync/service-types'; import * as t from 'ts-codec'; -import { CustomTypeRegistry } from './registry.js'; // Maintain backwards compatibility by exporting these export const validatePort = lib_postgres.validatePort; @@ -25,10 +24,7 @@ export type PostgresConnectionConfig = t.Decoded { const db = await connectPgPool(); - if (!(await new PostgresTypeResolver(new CustomTypeRegistry(), db).supportsMultiRanges())) { + if (!(await new PostgresTypeResolver(db).supportsMultiRanges())) { // This test requires Postgres 14 or later. return; } @@ -620,7 +620,7 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12' * Return all the inserts from the first transaction in the replication stream. */ async function getReplicationTx(db: pgwire.PgClient, replicationStream: pgwire.ReplicationStream) { - const typeCache = new PostgresTypeResolver(new CustomTypeRegistry(), db); + const typeCache = new PostgresTypeResolver(db); await typeCache.fetchTypesForSchema(); let transformed: SqliteInputRow[] = []; diff --git a/modules/module-postgres/test/src/slow_tests.test.ts b/modules/module-postgres/test/src/slow_tests.test.ts index 49dd23952..e792de8be 100644 --- a/modules/module-postgres/test/src/slow_tests.test.ts +++ b/modules/module-postgres/test/src/slow_tests.test.ts @@ -69,7 +69,7 @@ function defineSlowTests(factory: storage.TestStorageFactory) { }); async function testRepeatedReplication(testOptions: { compact: boolean; maxBatchSize: number; numBatches: number }) { - const connections = new PgManager(TEST_CONNECTION_OPTIONS, { registry: new CustomTypeRegistry() }); + const connections = new PgManager(TEST_CONNECTION_OPTIONS, {}); const replicationConnection = await connections.replicationConnection(); const pool = connections.pool; await clearTestDb(pool); @@ -330,7 +330,7 @@ bucket_definitions: await pool.query(`SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE active = FALSE`); i += 1; - const connections = new PgManager(TEST_CONNECTION_OPTIONS, { registry: new CustomTypeRegistry() }); + const connections = new PgManager(TEST_CONNECTION_OPTIONS, {}); const replicationConnection = await connections.replicationConnection(); abortController = new AbortController(); diff --git a/modules/module-postgres/test/src/wal_stream.test.ts b/modules/module-postgres/test/src/wal_stream.test.ts index 0eb0bdcf8..bd7440809 100644 --- a/modules/module-postgres/test/src/wal_stream.test.ts +++ b/modules/module-postgres/test/src/wal_stream.test.ts @@ -529,13 +529,24 @@ config: const { pool } = context; await pool.query(`DROP TABLE IF EXISTS test_data`); await pool.query(`CREATE TYPE composite AS (foo bool, bar int4);`); - await pool.query(`CREATE TABLE test_data(id text primary key, description composite);`); + await pool.query(`CREATE TABLE test_data(id text primary key, description composite, ts timestamptz);`); + + // Covered by initial replication + await pool.query( + `INSERT INTO test_data(id, description, ts) VALUES ('t1', ROW(TRUE, 1)::composite, '2025-11-17T09:11:00Z')` + ); await context.initializeReplication(); - await pool.query(`INSERT INTO test_data(id, description) VALUES ('t1', ROW(TRUE, 2)::composite)`); + // Covered by streaming replication + await pool.query( + `INSERT INTO test_data(id, description, ts) VALUES ('t2', ROW(TRUE, 2)::composite, '2025-11-17T09:12:00Z')` + ); const data = await context.getBucketData('1#stream|0[]'); - expect(data).toMatchObject([putOp('test_data', { id: 't1', description: '{"foo":1,"bar":2}' })]); + expect(data).toMatchObject([ + putOp('test_data', { id: 't1', description: '{"foo":1,"bar":1}', ts: '2025-11-17T09:11:00.000000Z' }), + putOp('test_data', { id: 't2', description: '{"foo":1,"bar":2}', ts: '2025-11-17T09:12:00.000000Z' }) + ]); }); test('custom types in primary key', async () => { diff --git a/modules/module-postgres/test/src/wal_stream_utils.ts b/modules/module-postgres/test/src/wal_stream_utils.ts index fd680918e..33ebecee8 100644 --- a/modules/module-postgres/test/src/wal_stream_utils.ts +++ b/modules/module-postgres/test/src/wal_stream_utils.ts @@ -33,7 +33,7 @@ export class WalStreamTestContext implements AsyncDisposable { options?: { doNotClear?: boolean; walStreamOptions?: Partial } ) { const f = await factory({ doNotClear: options?.doNotClear }); - const connectionManager = new PgManager(TEST_CONNECTION_OPTIONS, { registry: new CustomTypeRegistry() }); + const connectionManager = new PgManager(TEST_CONNECTION_OPTIONS, {}); if (!options?.doNotClear) { await clearTestDb(connectionManager.pool); diff --git a/packages/sync-rules/src/utils.ts b/packages/sync-rules/src/utils.ts index 06d933ea2..04f849e08 100644 --- a/packages/sync-rules/src/utils.ts +++ b/packages/sync-rules/src/utils.ts @@ -1,21 +1,20 @@ import { JSONBig, JsonContainer, Replacer, stringifyRaw } from '@powersync/service-jsonbig'; import { SelectFromStatement, Statement } from 'pgsql-ast-parser'; +import { CompatibilityContext } from './compatibility.js'; +import { SyncRuleProcessingError as SyncRulesProcessingError } from './errors.js'; import { SQLITE_FALSE, SQLITE_TRUE } from './sql_support.js'; import { + BucketIdTransformer, DatabaseInputRow, DatabaseInputValue, - SqliteInputValue, SqliteInputRow, + SqliteInputValue, SqliteJsonRow, SqliteJsonValue, SqliteRow, - SqliteValue, - BucketIdTransformer, - ToastableSqliteRow + SqliteValue } from './types.js'; -import { SyncRuleProcessingError as SyncRulesProcessingError } from './errors.js'; import { CustomArray, CustomObject, CustomSqliteValue } from './types/custom_sqlite_value.js'; -import { CompatibilityContext } from './compatibility.js'; export function isSelectStatement(q: Statement): q is SelectFromStatement { return q.type == 'select';