Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/afraid-weeks-matter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-module-postgres': patch
---

Fix decoding arrays of enums, fix decoding `box[]` columns during initial replication.
57 changes: 29 additions & 28 deletions modules/module-postgres/src/replication/WalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@ import {
} from '@powersync/service-core';
import * as pgwire from '@powersync/service-jpgwire';
import {
applyRowContext,
applyValueContext,
CompatibilityContext,
DatabaseInputRow,
SqliteInputRow,
SqliteInputValue,
SqliteRow,
SqliteValue,
SqlSyncRules,
TablePattern,
ToastableSqliteRow,
toSyncRulesRow
toSyncRulesRow,
toSyncRulesValue
} from '@powersync/service-sync-rules';

import { ReplicationMetric } from '@powersync/service-types';
Expand All @@ -44,6 +47,7 @@ import {
SimpleSnapshotQuery,
SnapshotQuery
} from './SnapshotQuery.js';
import { PostgresTypeResolver } from '../types/resolver.js';

export interface WalStreamOptions {
logger?: Logger;
Expand Down Expand Up @@ -462,11 +466,25 @@ WHERE oid = $1::regclass`,
}
}

static *getQueryData(results: Iterable<DatabaseInputRow>): Generator<SqliteInputRow> {
for (let row of results) {
yield toSyncRulesRow(row);
}
static decodeRow(row: pgwire.PgRow, types: PostgresTypeResolver): SqliteInputRow {
let result: SqliteInputRow = {};

row.raw.forEach((rawValue, i) => {
const column = row.columns[i];
let mappedValue: SqliteInputValue;

if (typeof rawValue == 'string') {
mappedValue = toSyncRulesValue(types.registry.decodeDatabaseValue(rawValue, column.typeOid), false, true);
} else {
// Binary format, expose as-is.
mappedValue = rawValue;
}

result[column.name] = mappedValue;
});
return result;
}

private async snapshotTableInTx(
batch: storage.BucketStorageBatch,
db: pgwire.PgConnection,
Expand Down Expand Up @@ -541,8 +559,6 @@ WHERE oid = $1::regclass`,
}
await q.initialize();

let columns: { i: number; name: string; typeOid: number }[] = [];
let columnMap: Record<string, number> = {};
let hasRemainingData = true;
while (hasRemainingData) {
// Fetch 10k at a time.
Expand All @@ -556,31 +572,16 @@ WHERE oid = $1::regclass`,
// There are typically 100-200 rows per chunk.
for await (let chunk of cursor) {
if (chunk.tag == 'RowDescription') {
// We get a RowDescription for each FETCH call, but they should
// all be the same.
let i = 0;
columns = chunk.payload.map((c) => {
return { i: i++, name: c.name, typeOid: c.typeOid };
});
for (let column of chunk.payload) {
columnMap[column.name] = column.typeOid;
}
continue;
}

const rows = chunk.rows.map((row) => {
let q: DatabaseInputRow = {};
for (let c of columns) {
q[c.name] = pgwire.PgType.decode(row.raw[c.i], c.typeOid);
}
return q;
});
if (rows.length > 0) {
if (chunk.rows.length > 0) {
hasRemainingData = true;
}

for (const inputRecord of WalStream.getQueryData(rows)) {
const record = this.syncRulesRecord(this.connections.types.constructRowRecord(columnMap, inputRecord));
for (const rawRow of chunk.rows) {
const record = this.sync_rules.applyRowContext<never>(WalStream.decodeRow(rawRow, this.connections.types));

// This auto-flushes when the batch reaches its size limit
await batch.save({
tag: storage.SaveOperationTag.INSERT,
Expand All @@ -592,8 +593,8 @@ WHERE oid = $1::regclass`,
});
}

at += rows.length;
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(rows.length);
at += chunk.rows.length;
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(chunk.rows.length);

this.touch();
}
Expand Down
5 changes: 1 addition & 4 deletions modules/module-postgres/src/types/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,7 @@ export class CustomTypeRegistry {
case 'unknown':
return true;
case 'array':
return (
type.separatorCharCode == pgwire.CHAR_CODE_COMMA &&
this.isParsedWithoutCustomTypesSupport(this.lookupType(type.innerId))
);
return type.separatorCharCode == pgwire.CHAR_CODE_COMMA && pgwire.ARRAY_TO_ELEM_OID.has(type.innerId);
default:
return false;
}
Expand Down
7 changes: 1 addition & 6 deletions modules/module-postgres/src/types/resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ WHERE a.attnum > 0
AND cn.nspname not in ('information_schema', 'pg_catalog', 'pg_toast')
`;

const query = await this.pool.query({ statement: sql });
const query = await this.pool.query(sql);
let ids: number[] = [];
for (const row of pgwire.pgwireRows(query)) {
ids.push(Number(row.type_oid));
Expand Down Expand Up @@ -186,11 +186,6 @@ WHERE a.attnum > 0
return toSyncRulesRow(record);
}

constructRowRecord(columnMap: Record<string, number>, tupleRaw: Record<string, any>): SqliteInputRow {
const record = this.decodeTupleForTable(columnMap, tupleRaw);
return toSyncRulesRow(record);
}

/**
* We need a high level of control over how values are decoded, to make sure there is no loss
* of precision in the process.
Expand Down
138 changes: 93 additions & 45 deletions modules/module-postgres/test/src/pg_test.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,7 @@ VALUES(10, ARRAY['null']::TEXT[]);

await insert(db);

const transformed = [
...WalStream.getQueryData(pgwire.pgwireRows(await db.query(`SELECT * FROM test_data ORDER BY id`)))
];

const transformed = await queryAll(db, `SELECT * FROM test_data ORDER BY id`);
checkResults(transformed);
} finally {
await db.end();
Expand All @@ -346,17 +343,11 @@ VALUES(10, ARRAY['null']::TEXT[]);

await insert(db);

const transformed = [
...WalStream.getQueryData(
pgwire.pgwireRows(
await db.query({
statement: `SELECT * FROM test_data WHERE $1 ORDER BY id`,
params: [{ type: 'bool', value: true }]
})
)
)
];

const raw = await db.query({
statement: `SELECT * FROM test_data WHERE $1 ORDER BY id`,
params: [{ type: 'bool', value: true }]
});
const transformed = await interpretResults(db, raw);
checkResults(transformed);
} finally {
await db.end();
Expand All @@ -370,9 +361,9 @@ VALUES(10, ARRAY['null']::TEXT[]);

await insertArrays(db);

const transformed = [
...WalStream.getQueryData(pgwire.pgwireRows(await db.query(`SELECT * FROM test_data_arrays ORDER BY id`)))
].map((e) => applyRowContext(e, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY));
const transformed = (await queryAll(db, `SELECT * FROM test_data_arrays ORDER BY id`)).map((e) =>
applyRowContext(e, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY)
);

checkResultArrays(transformed);
} finally {
Expand Down Expand Up @@ -465,19 +456,15 @@ VALUES(10, ARRAY['null']::TEXT[]);
});

test('date formats', async () => {
const db = await connectPgWire();
const db = await connectPgPool();
try {
await setupTable(db);

await db.query(`
INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12', '2023-03-06 15:47:12.4', '2023-03-06 15:47+02');
`);

const [row] = [
...WalStream.getQueryData(
pgwire.pgwireRows(await db.query(`SELECT time, timestamp, timestamptz FROM test_data`))
)
];
const [row] = await queryAll(db, `SELECT time, timestamp, timestamptz FROM test_data`);

const oldFormat = applyRowContext(row, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
expect(oldFormat).toMatchObject({
Expand Down Expand Up @@ -515,17 +502,18 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'
try {
await clearTestDb(db);
await db.query(`CREATE DOMAIN rating_value AS FLOAT CHECK (VALUE BETWEEN 0 AND 5);`);
await db.query(`CREATE TYPE composite AS (foo rating_value[], bar TEXT);`);
await db.query(`CREATE TYPE nested_composite AS (a BOOLEAN, b composite);`);
await db.query(`CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy')`);
await db.query(`CREATE TYPE composite AS (foo rating_value[], bar TEXT, mood mood);`);
await db.query(`CREATE TYPE nested_composite AS (a BOOLEAN, b composite);`);

await db.query(`CREATE TABLE test_custom(
id serial primary key,
rating rating_value,
composite composite,
nested_composite nested_composite,
boxes box[],
mood mood
mood mood,
moods mood[]
);`);

const slotName = 'test_slot';
Expand All @@ -542,13 +530,14 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'

await db.query(`
INSERT INTO test_custom
(rating, composite, nested_composite, boxes, mood)
(rating, composite, nested_composite, boxes, mood, moods)
VALUES (
1,
(ARRAY[2,3], 'bar'),
(TRUE, (ARRAY[2,3], 'bar')),
(ARRAY[2,3], 'bar', 'sad'::mood),
(TRUE, (ARRAY[2,3], 'bar', 'sad'::mood)),
ARRAY[box(point '(1,2)', point '(3,4)'), box(point '(5, 6)', point '(7,8)')],
'happy'
'happy',
ARRAY['sad'::mood, 'happy'::mood]
);
`);

Expand All @@ -562,27 +551,53 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'
});

const [transformed] = await getReplicationTx(db, replicationStream);
const [queried] = await queryAll(db, `SELECT * FROM test_custom`);
await pg.end();

const oldFormat = applyRowContext(transformed, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
expect(oldFormat).toMatchObject({
const oldFormatStreamed = applyRowContext(transformed, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
expect(oldFormatStreamed).toMatchObject({
rating: '1',
composite: '("{2,3}",bar)',
nested_composite: '(t,"(""{2,3}"",bar)")',
composite: '("{2,3}",bar,sad)',
nested_composite: '(t,"(""{2,3}"",bar,sad)")',
boxes: '["(3","4)","(1","2);(7","8)","(5","6)"]',
mood: 'happy',
moods: '{sad,happy}'
});

const oldFormatQueried = applyRowContext(queried, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
expect(oldFormatQueried).toMatchObject({
rating: 1,
composite: '("{2,3}",bar,sad)',
nested_composite: '(t,"(""{2,3}"",bar,sad)")',
boxes: '["(3","4)","(1","2);(7","8)","(5","6)"]',
mood: 'happy'
mood: 'happy',
moods: '{sad,happy}'
});

const newFormat = applyRowContext(
const newFormatStreamed = applyRowContext(
transformed,
new CompatibilityContext({ edition: CompatibilityEdition.SYNC_STREAMS })
);
expect(newFormat).toMatchObject({
expect(newFormatStreamed).toMatchObject({
rating: 1,
composite: '{"foo":[2.0,3.0],"bar":"bar"}',
nested_composite: '{"a":1,"b":{"foo":[2.0,3.0],"bar":"bar"}}',
composite: '{"foo":[2.0,3.0],"bar":"bar","mood":"sad"}',
nested_composite: '{"a":1,"b":{"foo":[2.0,3.0],"bar":"bar","mood":"sad"}}',
boxes: JSON.stringify(['(3,4),(1,2)', '(7,8),(5,6)']),
mood: 'happy'
mood: 'happy',
moods: '["sad","happy"]'
});

const newFormatQueried = applyRowContext(
queried,
new CompatibilityContext({ edition: CompatibilityEdition.SYNC_STREAMS })
);
expect(newFormatQueried).toMatchObject({
rating: 1,
composite: '{"foo":[2.0,3.0],"bar":"bar","mood":"sad"}',
nested_composite: '{"a":1,"b":{"foo":[2.0,3.0],"bar":"bar","mood":"sad"}}',
boxes: JSON.stringify(['(3,4),(1,2)', '(7,8),(5,6)']),
mood: 'happy',
moods: '["sad","happy"]'
});
} finally {
await db.end();
Expand Down Expand Up @@ -635,18 +650,36 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'
});

const [transformed] = await getReplicationTx(db, replicationStream);
const [queried] = await queryAll(db, `SELECT ranges FROM test_custom`);
await pg.end();

const oldFormat = applyRowContext(transformed, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
expect(oldFormat).toMatchObject({
const oldFormatStreamed = applyRowContext(transformed, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
expect(oldFormatStreamed).toMatchObject({
ranges: '{"{[2,4),[6,8)}"}'
});
const oldFormatQueried = applyRowContext(queried, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
expect(oldFormatQueried).toMatchObject({
ranges: '{"{[2,4),[6,8)}"}'
});

const newFormat = applyRowContext(
const newFormatStreamed = applyRowContext(
transformed,
new CompatibilityContext({ edition: CompatibilityEdition.SYNC_STREAMS })
);
expect(newFormat).toMatchObject({
expect(newFormatStreamed).toMatchObject({
ranges: JSON.stringify([
[
{ lower: 2, upper: 4, lower_exclusive: 0, upper_exclusive: 1 },
{ lower: 6, upper: 8, lower_exclusive: 0, upper_exclusive: 1 }
]
])
});

const newFormatQueried = applyRowContext(
queried,
new CompatibilityContext({ edition: CompatibilityEdition.SYNC_STREAMS })
);
expect(newFormatQueried).toMatchObject({
ranges: JSON.stringify([
[
{ lower: 2, upper: 4, lower_exclusive: 0, upper_exclusive: 1 },
Expand Down Expand Up @@ -679,3 +712,18 @@ async function getReplicationTx(db: pgwire.PgClient, replicationStream: pgwire.R
}
return transformed;
}

/**
* Simulates what WalStream does for initial snapshots.
*/
async function queryAll(db: pgwire.PgClient, sql: string) {
const raw = await db.query(sql);
return await interpretResults(db, raw);
}

async function interpretResults(db: pgwire.PgClient, results: pgwire.PgResult) {
const typeCache = new PostgresTypeResolver(db);
await typeCache.fetchTypesForSchema();

return results.rows.map((row) => WalStream.decodeRow(row, typeCache));
}