From 4cde1a78557e7335e4a5ab5e8af3cbd4788f6146 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 8 Dec 2025 11:41:20 +0200 Subject: [PATCH 1/4] Add failing tests for some postgres type combinations. --- .../module-postgres/test/src/pg_test.test.ts | 99 +++++++++++++++---- 1 file changed, 78 insertions(+), 21 deletions(-) diff --git a/modules/module-postgres/test/src/pg_test.test.ts b/modules/module-postgres/test/src/pg_test.test.ts index 2cf20f6b..4c5e207c 100644 --- a/modules/module-postgres/test/src/pg_test.test.ts +++ b/modules/module-postgres/test/src/pg_test.test.ts @@ -515,9 +515,9 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12' try { await clearTestDb(db); await db.query(`CREATE DOMAIN rating_value AS FLOAT CHECK (VALUE BETWEEN 0 AND 5);`); - await db.query(`CREATE TYPE composite AS (foo rating_value[], bar TEXT);`); - await db.query(`CREATE TYPE nested_composite AS (a BOOLEAN, b composite);`); await db.query(`CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy')`); + await db.query(`CREATE TYPE composite AS (foo rating_value[], bar TEXT, mood mood);`); + await db.query(`CREATE TYPE nested_composite AS (a BOOLEAN, b composite);`); await db.query(`CREATE TABLE test_custom( id serial primary key, @@ -525,7 +525,8 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12' composite composite, nested_composite nested_composite, boxes box[], - mood mood + mood mood, + moods mood[] );`); const slotName = 'test_slot'; @@ -542,13 +543,14 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12' await db.query(` INSERT INTO test_custom - (rating, composite, nested_composite, boxes, mood) + (rating, composite, nested_composite, boxes, mood, moods) VALUES ( 1, - (ARRAY[2,3], 'bar'), - (TRUE, (ARRAY[2,3], 'bar')), + (ARRAY[2,3], 'bar', 'sad'::mood), + (TRUE, (ARRAY[2,3], 'bar', 'sad'::mood)), ARRAY[box(point '(1,2)', point '(3,4)'), box(point '(5, 6)', point '(7,8)')], - 'happy' + 'happy', + ARRAY['sad'::mood, 'happy'::mood] ); `); @@ -562,27 +564,53 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12' }); const [transformed] = await getReplicationTx(db, replicationStream); + const [queried] = await queryAll(db, `SELECT * FROM test_custom`); await pg.end(); - const oldFormat = applyRowContext(transformed, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY); - expect(oldFormat).toMatchObject({ + const oldFormatStreamed = applyRowContext(transformed, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY); + expect(oldFormatStreamed).toMatchObject({ rating: '1', - composite: '("{2,3}",bar)', - nested_composite: '(t,"(""{2,3}"",bar)")', + composite: '("{2,3}",bar,sad)', + nested_composite: '(t,"(""{2,3}"",bar,sad)")', boxes: '["(3","4)","(1","2);(7","8)","(5","6)"]', - mood: 'happy' + mood: 'happy', + moods: '{sad,happy}' }); - const newFormat = applyRowContext( + const oldFormatQueried = applyRowContext(queried, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY); + expect(oldFormatQueried).toMatchObject({ + rating: '1', + composite: '("{2,3}",bar,sad)', + nested_composite: '(t,"(""{2,3}"",bar,sad)")', + boxes: '["(3","4)","(1","2);(7","8)","(5","6)"]', + mood: 'happy', + moods: '{sad,happy}' + }); + + const newFormatStreamed = applyRowContext( transformed, new CompatibilityContext({ edition: CompatibilityEdition.SYNC_STREAMS }) ); - expect(newFormat).toMatchObject({ + expect(newFormatStreamed).toMatchObject({ rating: 1, - composite: '{"foo":[2.0,3.0],"bar":"bar"}', - nested_composite: '{"a":1,"b":{"foo":[2.0,3.0],"bar":"bar"}}', + composite: '{"foo":[2.0,3.0],"bar":"bar","mood":"sad"}', + nested_composite: '{"a":1,"b":{"foo":[2.0,3.0],"bar":"bar","mood":"sad"}}', boxes: JSON.stringify(['(3,4),(1,2)', '(7,8),(5,6)']), - mood: 'happy' + mood: 'happy', + moods: '["sad","happy"]' + }); + + const newFormatQueried = applyRowContext( + queried, + new CompatibilityContext({ edition: CompatibilityEdition.SYNC_STREAMS }) + ); + expect(newFormatQueried).toMatchObject({ + rating: 1, + composite: '{"foo":[2.0,3.0],"bar":"bar","mood":"sad"}', + nested_composite: '{"a":1,"b":{"foo":[2.0,3.0],"bar":"bar","mood":"sad"}}', + boxes: JSON.stringify(['(3,4),(1,2)', '(7,8),(5,6)']), + mood: 'happy', + moods: '["sad","happy"]' }); } finally { await db.end(); @@ -635,18 +663,36 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12' }); const [transformed] = await getReplicationTx(db, replicationStream); + const [queried] = await queryAll(db, `SELECT ranges FROM test_custom`); await pg.end(); - const oldFormat = applyRowContext(transformed, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY); - expect(oldFormat).toMatchObject({ + const oldFormatStreamed = applyRowContext(transformed, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY); + expect(oldFormatStreamed).toMatchObject({ + ranges: '{"{[2,4),[6,8)}"}' + }); + const oldFormatQueried = applyRowContext(queried, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY); + expect(oldFormatQueried).toMatchObject({ ranges: '{"{[2,4),[6,8)}"}' }); - const newFormat = applyRowContext( + const newFormatStreamed = applyRowContext( transformed, new CompatibilityContext({ edition: CompatibilityEdition.SYNC_STREAMS }) ); - expect(newFormat).toMatchObject({ + expect(newFormatStreamed).toMatchObject({ + ranges: JSON.stringify([ + [ + { lower: 2, upper: 4, lower_exclusive: 0, upper_exclusive: 1 }, + { lower: 6, upper: 8, lower_exclusive: 0, upper_exclusive: 1 } + ] + ]) + }); + + const newFormatQueried = applyRowContext( + queried, + new CompatibilityContext({ edition: CompatibilityEdition.SYNC_STREAMS }) + ); + expect(newFormatQueried).toMatchObject({ ranges: JSON.stringify([ [ { lower: 2, upper: 4, lower_exclusive: 0, upper_exclusive: 1 }, @@ -679,3 +725,14 @@ async function getReplicationTx(db: pgwire.PgClient, replicationStream: pgwire.R } return transformed; } + +/** + * Simulates what WalStream does for initial snapshots. + */ +async function queryAll(db: pgwire.PgClient, sql: string) { + const raw = await db.query(sql); + const typeCache = new PostgresTypeResolver(db); + await typeCache.fetchTypesForSchema(); + const columns = Object.fromEntries(raw.columns.map((col) => [col.name, col.typeOid])); + return [...WalStream.getQueryData(pgwire.pgwireRows(raw))].map((row) => typeCache.constructRowRecord(columns, row)); +} From ed0e683021cb9fd93d2745a963ce2a4a610c4cbb Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 12 Dec 2025 09:22:53 +0100 Subject: [PATCH 2/4] Fix decoding enum arrays --- modules/module-postgres/src/types/registry.ts | 5 +---- modules/module-postgres/test/src/pg_test.test.ts | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/modules/module-postgres/src/types/registry.ts b/modules/module-postgres/src/types/registry.ts index 362350cd..3a670879 100644 --- a/modules/module-postgres/src/types/registry.ts +++ b/modules/module-postgres/src/types/registry.ts @@ -256,10 +256,7 @@ export class CustomTypeRegistry { case 'unknown': return true; case 'array': - return ( - type.separatorCharCode == pgwire.CHAR_CODE_COMMA && - this.isParsedWithoutCustomTypesSupport(this.lookupType(type.innerId)) - ); + return type.separatorCharCode == pgwire.CHAR_CODE_COMMA && pgwire.ARRAY_TO_ELEM_OID.has(type.innerId); default: return false; } diff --git a/modules/module-postgres/test/src/pg_test.test.ts b/modules/module-postgres/test/src/pg_test.test.ts index 4c5e207c..13c23222 100644 --- a/modules/module-postgres/test/src/pg_test.test.ts +++ b/modules/module-postgres/test/src/pg_test.test.ts @@ -579,7 +579,7 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12' const oldFormatQueried = applyRowContext(queried, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY); expect(oldFormatQueried).toMatchObject({ - rating: '1', + rating: 1, composite: '("{2,3}",bar,sad)', nested_composite: '(t,"(""{2,3}"",bar,sad)")', boxes: '["(3","4)","(1","2);(7","8)","(5","6)"]', From 8beab92c47da29a1a320d0d1f4345649a727e1de Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 12 Dec 2025 10:09:37 +0100 Subject: [PATCH 3/4] Fix remaining tests --- .../src/replication/WalStream.ts | 46 ++++++++++++------- modules/module-postgres/src/types/resolver.ts | 7 +-- .../module-postgres/test/src/pg_test.test.ts | 43 +++++++---------- 3 files changed, 47 insertions(+), 49 deletions(-) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index f944a96c..3ea2a35b 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -20,16 +20,19 @@ import { } from '@powersync/service-core'; import * as pgwire from '@powersync/service-jpgwire'; import { + applyRowContext, applyValueContext, CompatibilityContext, DatabaseInputRow, SqliteInputRow, SqliteInputValue, SqliteRow, + SqliteValue, SqlSyncRules, TablePattern, ToastableSqliteRow, - toSyncRulesRow + toSyncRulesRow, + toSyncRulesValue } from '@powersync/service-sync-rules'; import { ReplicationMetric } from '@powersync/service-types'; @@ -44,6 +47,7 @@ import { SimpleSnapshotQuery, SnapshotQuery } from './SnapshotQuery.js'; +import { PostgresTypeResolver } from '../types/resolver.js'; export interface WalStreamOptions { logger?: Logger; @@ -462,11 +466,25 @@ WHERE oid = $1::regclass`, } } - static *getQueryData(results: Iterable): Generator { - for (let row of results) { - yield toSyncRulesRow(row); - } + static decodeRow(row: pgwire.PgRow, types: PostgresTypeResolver): SqliteInputRow { + let result: SqliteInputRow = {}; + + row.raw.forEach((rawValue, i) => { + const column = row.columns[i]; + let mappedValue: SqliteInputValue; + + if (typeof rawValue == 'string') { + mappedValue = toSyncRulesValue(types.registry.decodeDatabaseValue(rawValue, column.typeOid), false, true); + } else { + // Binary format, expose as-is. + mappedValue = rawValue; + } + + result[column.name] = mappedValue; + }); + return result; } + private async snapshotTableInTx( batch: storage.BucketStorageBatch, db: pgwire.PgConnection, @@ -568,19 +586,13 @@ WHERE oid = $1::regclass`, continue; } - const rows = chunk.rows.map((row) => { - let q: DatabaseInputRow = {}; - for (let c of columns) { - q[c.name] = pgwire.PgType.decode(row.raw[c.i], c.typeOid); - } - return q; - }); - if (rows.length > 0) { + if (chunk.rows.length > 0) { hasRemainingData = true; } - for (const inputRecord of WalStream.getQueryData(rows)) { - const record = this.syncRulesRecord(this.connections.types.constructRowRecord(columnMap, inputRecord)); + for (const rawRow of chunk.rows) { + const record = this.sync_rules.applyRowContext(WalStream.decodeRow(rawRow, this.connections.types)); + // This auto-flushes when the batch reaches its size limit await batch.save({ tag: storage.SaveOperationTag.INSERT, @@ -592,8 +604,8 @@ WHERE oid = $1::regclass`, }); } - at += rows.length; - this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(rows.length); + at += chunk.rows.length; + this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(chunk.rows.length); this.touch(); } diff --git a/modules/module-postgres/src/types/resolver.ts b/modules/module-postgres/src/types/resolver.ts index 1ee3323c..7115ec6c 100644 --- a/modules/module-postgres/src/types/resolver.ts +++ b/modules/module-postgres/src/types/resolver.ts @@ -153,7 +153,7 @@ WHERE a.attnum > 0 AND cn.nspname not in ('information_schema', 'pg_catalog', 'pg_toast') `; - const query = await this.pool.query({ statement: sql }); + const query = await this.pool.query(sql); let ids: number[] = []; for (const row of pgwire.pgwireRows(query)) { ids.push(Number(row.type_oid)); @@ -186,11 +186,6 @@ WHERE a.attnum > 0 return toSyncRulesRow(record); } - constructRowRecord(columnMap: Record, tupleRaw: Record): SqliteInputRow { - const record = this.decodeTupleForTable(columnMap, tupleRaw); - return toSyncRulesRow(record); - } - /** * We need a high level of control over how values are decoded, to make sure there is no loss * of precision in the process. diff --git a/modules/module-postgres/test/src/pg_test.test.ts b/modules/module-postgres/test/src/pg_test.test.ts index 13c23222..48c678b8 100644 --- a/modules/module-postgres/test/src/pg_test.test.ts +++ b/modules/module-postgres/test/src/pg_test.test.ts @@ -327,10 +327,7 @@ VALUES(10, ARRAY['null']::TEXT[]); await insert(db); - const transformed = [ - ...WalStream.getQueryData(pgwire.pgwireRows(await db.query(`SELECT * FROM test_data ORDER BY id`))) - ]; - + const transformed = await queryAll(db, `SELECT * FROM test_data ORDER BY id`); checkResults(transformed); } finally { await db.end(); @@ -346,17 +343,11 @@ VALUES(10, ARRAY['null']::TEXT[]); await insert(db); - const transformed = [ - ...WalStream.getQueryData( - pgwire.pgwireRows( - await db.query({ - statement: `SELECT * FROM test_data WHERE $1 ORDER BY id`, - params: [{ type: 'bool', value: true }] - }) - ) - ) - ]; - + const raw = await db.query({ + statement: `SELECT * FROM test_data WHERE $1 ORDER BY id`, + params: [{ type: 'bool', value: true }] + }); + const transformed = await interpretResults(db, raw); checkResults(transformed); } finally { await db.end(); @@ -370,9 +361,9 @@ VALUES(10, ARRAY['null']::TEXT[]); await insertArrays(db); - const transformed = [ - ...WalStream.getQueryData(pgwire.pgwireRows(await db.query(`SELECT * FROM test_data_arrays ORDER BY id`))) - ].map((e) => applyRowContext(e, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY)); + const transformed = (await queryAll(db, `SELECT * FROM test_data_arrays ORDER BY id`)).map((e) => + applyRowContext(e, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY) + ); checkResultArrays(transformed); } finally { @@ -465,7 +456,7 @@ VALUES(10, ARRAY['null']::TEXT[]); }); test('date formats', async () => { - const db = await connectPgWire(); + const db = await connectPgPool(); try { await setupTable(db); @@ -473,11 +464,7 @@ VALUES(10, ARRAY['null']::TEXT[]); INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12', '2023-03-06 15:47:12.4', '2023-03-06 15:47+02'); `); - const [row] = [ - ...WalStream.getQueryData( - pgwire.pgwireRows(await db.query(`SELECT time, timestamp, timestamptz FROM test_data`)) - ) - ]; + const [row] = await queryAll(db, `SELECT time, timestamp, timestamptz FROM test_data`); const oldFormat = applyRowContext(row, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY); expect(oldFormat).toMatchObject({ @@ -731,8 +718,12 @@ async function getReplicationTx(db: pgwire.PgClient, replicationStream: pgwire.R */ async function queryAll(db: pgwire.PgClient, sql: string) { const raw = await db.query(sql); + return await interpretResults(db, raw); +} + +async function interpretResults(db: pgwire.PgClient, results: pgwire.PgResult) { const typeCache = new PostgresTypeResolver(db); await typeCache.fetchTypesForSchema(); - const columns = Object.fromEntries(raw.columns.map((col) => [col.name, col.typeOid])); - return [...WalStream.getQueryData(pgwire.pgwireRows(raw))].map((row) => typeCache.constructRowRecord(columns, row)); + + return results.rows.map((row) => WalStream.decodeRow(row, typeCache)); } From a402a0223d050b17ec3a77f6906ba891793624da Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Fri, 12 Dec 2025 10:21:57 +0100 Subject: [PATCH 4/4] Avoid custom rowdescription parsing --- .changeset/afraid-weeks-matter.md | 5 +++++ modules/module-postgres/src/replication/WalStream.ts | 11 ----------- 2 files changed, 5 insertions(+), 11 deletions(-) create mode 100644 .changeset/afraid-weeks-matter.md diff --git a/.changeset/afraid-weeks-matter.md b/.changeset/afraid-weeks-matter.md new file mode 100644 index 00000000..ba3f98bf --- /dev/null +++ b/.changeset/afraid-weeks-matter.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-module-postgres': patch +--- + +Fix decoding arrays of enums, fix decoding `box[]` columns during initial replication. diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 3ea2a35b..b56dad78 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -559,8 +559,6 @@ WHERE oid = $1::regclass`, } await q.initialize(); - let columns: { i: number; name: string; typeOid: number }[] = []; - let columnMap: Record = {}; let hasRemainingData = true; while (hasRemainingData) { // Fetch 10k at a time. @@ -574,15 +572,6 @@ WHERE oid = $1::regclass`, // There are typically 100-200 rows per chunk. for await (let chunk of cursor) { if (chunk.tag == 'RowDescription') { - // We get a RowDescription for each FETCH call, but they should - // all be the same. - let i = 0; - columns = chunk.payload.map((c) => { - return { i: i++, name: c.name, typeOid: c.typeOid }; - }); - for (let column of chunk.payload) { - columnMap[column.name] = column.typeOid; - } continue; }