Skip to content

Commit fff0024

Browse files
authored
[Postgres] Fix custom type parsing for initial snapshot queries (#403)
* Fix custom type parsing for initial snapshot queries. * Add changeset. * Update test.
1 parent 90aac49 commit fff0024

File tree

13 files changed

+79
-52
lines changed

13 files changed

+79
-52
lines changed

.changeset/shiny-panthers-ring.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
'@powersync/service-module-postgres': patch
3+
'@powersync/service-sync-rules': patch
4+
'@powersync/service-core': patch
5+
'@powersync/service-image': patch
6+
---
7+
8+
[Postgres] Fix custom type parsing on initial replication

modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export class PostgresRouteAPIAdapter implements api.RouteAPI {
3434
connectionTag?: string,
3535
private config?: types.ResolvedConnectionConfig
3636
) {
37-
this.typeCache = new PostgresTypeResolver(config?.typeRegistry ?? new CustomTypeRegistry(), pool);
37+
this.typeCache = new PostgresTypeResolver(pool);
3838
this.connectionTag = connectionTag ?? sync_rules.DEFAULT_TAG;
3939
}
4040

modules/module-postgres/src/module/PostgresModule.ts

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ import { getApplicationName } from '../utils/application-name.js';
2222
import { CustomTypeRegistry } from '../types/registry.js';
2323

2424
export class PostgresModule extends replication.ReplicationModule<types.PostgresConnectionConfig> {
25-
private customTypes: CustomTypeRegistry = new CustomTypeRegistry();
26-
2725
constructor() {
2826
super({
2927
name: 'Postgres',
@@ -51,7 +49,7 @@ export class PostgresModule extends replication.ReplicationModule<types.Postgres
5149
protected createReplicator(context: system.ServiceContext): replication.AbstractReplicator {
5250
const normalisedConfig = this.resolveConfig(this.decodedConfig!);
5351
const syncRuleProvider = new ConfigurationFileSyncRulesProvider(context.configuration.sync_rules);
54-
const connectionFactory = new ConnectionManagerFactory(normalisedConfig, this.customTypes);
52+
const connectionFactory = new ConnectionManagerFactory(normalisedConfig);
5553

5654
return new WalStreamReplicator({
5755
id: this.getDefaultId(normalisedConfig.database),
@@ -69,8 +67,7 @@ export class PostgresModule extends replication.ReplicationModule<types.Postgres
6967
private resolveConfig(config: types.PostgresConnectionConfig): types.ResolvedConnectionConfig {
7068
return {
7169
...config,
72-
...types.normalizeConnectionConfig(config),
73-
typeRegistry: this.customTypes
70+
...types.normalizeConnectionConfig(config)
7471
};
7572
}
7673

@@ -79,8 +76,7 @@ export class PostgresModule extends replication.ReplicationModule<types.Postgres
7976
const connectionManager = new PgManager(normalisedConfig, {
8077
idleTimeout: 30_000,
8178
maxSize: 1,
82-
applicationName: getApplicationName(),
83-
registry: this.customTypes
79+
applicationName: getApplicationName()
8480
});
8581

8682
try {
@@ -111,8 +107,7 @@ export class PostgresModule extends replication.ReplicationModule<types.Postgres
111107
const connectionManager = new PgManager(normalizedConfig, {
112108
idleTimeout: 30_000,
113109
maxSize: 1,
114-
applicationName: getApplicationName(),
115-
registry: new CustomTypeRegistry()
110+
applicationName: getApplicationName()
116111
});
117112
const connection = await connectionManager.snapshotConnection();
118113
try {

modules/module-postgres/src/replication/ConnectionManagerFactory.ts

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,18 @@
1-
import { PgManager } from './PgManager.js';
2-
import { NormalizedPostgresConnectionConfig } from '../types/types.js';
3-
import { PgPoolOptions } from '@powersync/service-jpgwire';
41
import { logger } from '@powersync/lib-services-framework';
5-
import { CustomTypeRegistry } from '../types/registry.js';
2+
import { PgPoolOptions } from '@powersync/service-jpgwire';
3+
import { NormalizedPostgresConnectionConfig } from '../types/types.js';
4+
import { PgManager } from './PgManager.js';
65

76
export class ConnectionManagerFactory {
87
private readonly connectionManagers = new Set<PgManager>();
98
public readonly dbConnectionConfig: NormalizedPostgresConnectionConfig;
109

11-
constructor(
12-
dbConnectionConfig: NormalizedPostgresConnectionConfig,
13-
private readonly registry: CustomTypeRegistry
14-
) {
10+
constructor(dbConnectionConfig: NormalizedPostgresConnectionConfig) {
1511
this.dbConnectionConfig = dbConnectionConfig;
1612
}
1713

1814
create(poolOptions: PgPoolOptions) {
19-
const manager = new PgManager(this.dbConnectionConfig, { ...poolOptions, registry: this.registry });
15+
const manager = new PgManager(this.dbConnectionConfig, { ...poolOptions });
2016
this.connectionManagers.add(manager);
2117

2218
manager.registerListener({

modules/module-postgres/src/replication/PgManager.ts

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
1+
import { BaseObserver } from '@powersync/lib-services-framework';
12
import * as pgwire from '@powersync/service-jpgwire';
23
import semver from 'semver';
4+
import { PostgresTypeResolver } from '../types/resolver.js';
35
import { NormalizedPostgresConnectionConfig } from '../types/types.js';
46
import { getApplicationName } from '../utils/application-name.js';
5-
import { PostgresTypeResolver } from '../types/resolver.js';
67
import { getServerVersion } from '../utils/postgres_version.js';
7-
import { CustomTypeRegistry } from '../types/registry.js';
8-
import { BaseObserver } from '@powersync/lib-services-framework';
98

10-
export interface PgManagerOptions extends pgwire.PgPoolOptions {
11-
registry: CustomTypeRegistry;
12-
}
9+
export interface PgManagerOptions extends pgwire.PgPoolOptions {}
1310

1411
/**
1512
* Shorter timeout for snapshot connections than for replication connections.
@@ -37,7 +34,7 @@ export class PgManager extends BaseObserver<PgManagerListener> {
3734
super();
3835
// The pool is lazy - no connections are opened until a query is performed.
3936
this.pool = pgwire.connectPgWirePool(this.options, poolOptions);
40-
this.types = new PostgresTypeResolver(poolOptions.registry, this.pool);
37+
this.types = new PostgresTypeResolver(this.pool);
4138
}
4239

4340
public get connectionTag() {

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,7 @@ WHERE oid = $1::regclass`,
546546
await q.initialize();
547547

548548
let columns: { i: number; name: string }[] = [];
549+
let columnMap: Record<string, number> = {};
549550
let hasRemainingData = true;
550551
while (hasRemainingData) {
551552
// Fetch 10k at a time.
@@ -565,6 +566,9 @@ WHERE oid = $1::regclass`,
565566
columns = chunk.payload.map((c) => {
566567
return { i: i++, name: c.name };
567568
});
569+
for (let column of chunk.payload) {
570+
columnMap[column.name] = column.typeOid;
571+
}
568572
continue;
569573
}
570574

@@ -580,7 +584,7 @@ WHERE oid = $1::regclass`,
580584
}
581585

582586
for (const inputRecord of WalStream.getQueryData(rows)) {
583-
const record = this.syncRulesRecord(inputRecord);
587+
const record = this.syncRulesRecord(this.connections.types.constructRowRecord(columnMap, inputRecord));
584588
// This auto-flushes when the batch reaches its size limit
585589
await batch.save({
586590
tag: storage.SaveOperationTag.INSERT,

modules/module-postgres/src/types/resolver.ts

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import { DatabaseInputRow, SqliteInputRow, toSyncRulesRow } from '@powersync/service-sync-rules';
21
import * as pgwire from '@powersync/service-jpgwire';
3-
import { CustomTypeRegistry } from './registry.js';
2+
import { DatabaseInputRow, SqliteInputRow, toSyncRulesRow } from '@powersync/service-sync-rules';
43
import semver from 'semver';
54
import { getServerVersion } from '../utils/postgres_version.js';
5+
import { CustomTypeRegistry } from './registry.js';
66

77
/**
88
* Resolves descriptions used to decode values for custom postgres types.
@@ -11,11 +11,9 @@ import { getServerVersion } from '../utils/postgres_version.js';
1111
*/
1212
export class PostgresTypeResolver {
1313
private cachedVersion: semver.SemVer | null = null;
14+
readonly registry: CustomTypeRegistry;
1415

15-
constructor(
16-
readonly registry: CustomTypeRegistry,
17-
private readonly pool: pgwire.PgClient
18-
) {
16+
constructor(private readonly pool: pgwire.PgClient) {
1917
this.registry = new CustomTypeRegistry();
2018
}
2119

@@ -188,6 +186,11 @@ WHERE a.attnum > 0
188186
return toSyncRulesRow(record);
189187
}
190188

189+
constructRowRecord(columnMap: Record<string, number>, tupleRaw: Record<string, any>): SqliteInputRow {
190+
const record = this.decodeTupleForTable(columnMap, tupleRaw);
191+
return toSyncRulesRow(record);
192+
}
193+
191194
/**
192195
* We need a high level of control over how values are decoded, to make sure there is no loss
193196
* of precision in the process.
@@ -206,5 +209,23 @@ WHERE a.attnum > 0
206209
return result;
207210
}
208211

212+
/**
213+
* We need a high level of control over how values are decoded, to make sure there is no loss
214+
* of precision in the process.
215+
*/
216+
private decodeTupleForTable(columnMap: Record<string, number>, tupleRaw: Record<string, any>): DatabaseInputRow {
217+
let result: Record<string, any> = {};
218+
for (let columnName in tupleRaw) {
219+
const rawval = tupleRaw[columnName];
220+
const typeOid = columnMap[columnName];
221+
if (typeof rawval == 'string' && typeOid) {
222+
result[columnName] = this.registry.decodeDatabaseValue(rawval, typeOid);
223+
} else {
224+
result[columnName] = rawval;
225+
}
226+
}
227+
return result;
228+
}
229+
209230
private static minVersionForMultirange: semver.SemVer = semver.parse('14.0.0')!;
210231
}

modules/module-postgres/src/types/types.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import * as lib_postgres from '@powersync/lib-service-postgres';
22
import * as service_types from '@powersync/service-types';
33
import * as t from 'ts-codec';
4-
import { CustomTypeRegistry } from './registry.js';
54

65
// Maintain backwards compatibility by exporting these
76
export const validatePort = lib_postgres.validatePort;
@@ -25,10 +24,7 @@ export type PostgresConnectionConfig = t.Decoded<typeof PostgresConnectionConfig
2524
/**
2625
* Resolved version of {@link PostgresConnectionConfig}
2726
*/
28-
export type ResolvedConnectionConfig = PostgresConnectionConfig &
29-
NormalizedPostgresConnectionConfig & {
30-
typeRegistry: CustomTypeRegistry;
31-
};
27+
export type ResolvedConnectionConfig = PostgresConnectionConfig & NormalizedPostgresConnectionConfig;
3228

3329
export function isPostgresConfig(
3430
config: service_types.configFile.DataSourceConfig

modules/module-postgres/test/src/pg_test.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,7 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'
551551
test('test replication - multiranges', async () => {
552552
const db = await connectPgPool();
553553

554-
if (!(await new PostgresTypeResolver(new CustomTypeRegistry(), db).supportsMultiRanges())) {
554+
if (!(await new PostgresTypeResolver(db).supportsMultiRanges())) {
555555
// This test requires Postgres 14 or later.
556556
return;
557557
}
@@ -620,7 +620,7 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'
620620
* Return all the inserts from the first transaction in the replication stream.
621621
*/
622622
async function getReplicationTx(db: pgwire.PgClient, replicationStream: pgwire.ReplicationStream) {
623-
const typeCache = new PostgresTypeResolver(new CustomTypeRegistry(), db);
623+
const typeCache = new PostgresTypeResolver(db);
624624
await typeCache.fetchTypesForSchema();
625625

626626
let transformed: SqliteInputRow[] = [];

modules/module-postgres/test/src/slow_tests.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ function defineSlowTests(factory: storage.TestStorageFactory) {
6969
});
7070

7171
async function testRepeatedReplication(testOptions: { compact: boolean; maxBatchSize: number; numBatches: number }) {
72-
const connections = new PgManager(TEST_CONNECTION_OPTIONS, { registry: new CustomTypeRegistry() });
72+
const connections = new PgManager(TEST_CONNECTION_OPTIONS, {});
7373
const replicationConnection = await connections.replicationConnection();
7474
const pool = connections.pool;
7575
await clearTestDb(pool);
@@ -330,7 +330,7 @@ bucket_definitions:
330330
await pool.query(`SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE active = FALSE`);
331331
i += 1;
332332

333-
const connections = new PgManager(TEST_CONNECTION_OPTIONS, { registry: new CustomTypeRegistry() });
333+
const connections = new PgManager(TEST_CONNECTION_OPTIONS, {});
334334
const replicationConnection = await connections.replicationConnection();
335335

336336
abortController = new AbortController();

0 commit comments

Comments
 (0)