Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/fluffy-pandas-eat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/service-core': patch
'powersync-open-service': patch
---

Fix replication issue with REPLICA IDENTITY FULL (#27).
5 changes: 5 additions & 0 deletions packages/service-core/src/storage/BucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,12 @@ export interface SaveInsert {
export interface SaveUpdate {
tag: 'update';
sourceTable: SourceTable;

/**
* This is only present when the id has changed, and will only contain replica identity columns.
*/
before?: SqliteRow;

/**
* A null value means null column.
*
Expand Down
13 changes: 11 additions & 2 deletions packages/service-core/src/storage/mongo/MongoBucketBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ export class MongoBucketBatch implements BucketStorageBatch {
}
const currentData = current_data_lookup.get(op.internalBeforeKey) ?? null;
if (currentData != null) {
// If it will be used again later, it will be set again using nextData below
current_data_lookup.delete(op.internalBeforeKey);
}
const nextData = this.saveOperation(persistedBatch!, op, currentData, op_seq);
Expand Down Expand Up @@ -242,6 +243,10 @@ export class MongoBucketBatch implements BucketStorageBatch {
// Not an error if we re-apply a transaction
existing_buckets = [];
existing_lookups = [];
// Log to help with debugging if there was a consistency issue
logger.warn(
`Cannot find previous record for update on ${record.sourceTable.qualifiedName}: ${beforeId} / ${record.before?.id}`
);
} else {
const data = bson.deserialize((result.data as mongo.Binary).buffer, BSON_DESERIALIZE_OPTIONS) as SqliteRow;
existing_buckets = result.buckets;
Expand All @@ -254,6 +259,10 @@ export class MongoBucketBatch implements BucketStorageBatch {
// Not an error if we re-apply a transaction
existing_buckets = [];
existing_lookups = [];
// Log to help with debugging if there was a consistency issue
logger.warn(
`Cannot find previous record for delete on ${record.sourceTable.qualifiedName}: ${beforeId} / ${record.before?.id}`
);
} else {
existing_buckets = result.buckets;
existing_lookups = result.lookups;
Expand Down Expand Up @@ -292,7 +301,7 @@ export class MongoBucketBatch implements BucketStorageBatch {
}

// 2. Save bucket data
if (beforeId != null && beforeId != afterId) {
if (beforeId != null && (afterId == null || !beforeId.equals(afterId))) {
// Source ID updated
if (sourceTable.syncData) {
// Delete old record
Expand Down Expand Up @@ -422,7 +431,7 @@ export class MongoBucketBatch implements BucketStorageBatch {
};
}

if (beforeId != afterId) {
if (afterId == null || !beforeId.equals(afterId)) {
// Either a delete (afterId == null), or replaced the old replication id
batch.deleteCurrentData(before_key);
}
Expand Down
177 changes: 177 additions & 0 deletions packages/service-core/test/src/data_storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,183 @@ bucket_definitions:
]);
});

test('changed data with replica identity full', async () => {
const sync_rules = SqlSyncRules.fromYaml(`
bucket_definitions:
global:
data:
- SELECT id, description FROM "test"
`);
const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' });

const sourceTable = makeTestTable('test', ['id', 'description']);

// Pre-setup
const result1 = await storage.startBatch({}, async (batch) => {
await batch.save({
sourceTable,
tag: 'insert',
after: {
id: 'test1',
description: 'test1a'
}
});
});

const checkpoint1 = result1?.flushed_op ?? '0';

const result2 = await storage.startBatch({}, async (batch) => {
// Unchanged, but has a before id
await batch.save({
sourceTable,
tag: 'update',
before: {
id: 'test1',
description: 'test1a'
},
after: {
id: 'test1',
description: 'test1b'
}
});
});

const result3 = await storage.startBatch({}, async (batch) => {
// Delete
await batch.save({
sourceTable,
tag: 'delete',
before: {
id: 'test1',
description: 'test1b'
},
after: undefined
});
});

const checkpoint3 = result3!.flushed_op;

const batch = await fromAsync(storage.getBucketDataBatch(checkpoint3, new Map([['global[]', checkpoint1]])));
const data = batch[0].data.map((d) => {
return {
op: d.op,
object_id: d.object_id,
data: d.data,
subkey: d.subkey
};
});

// Operations must be in this order
expect(data).toEqual([
// 2
// The REMOVE is expected because the subkey changes
{
op: 'REMOVE',
object_id: 'test1',
data: null,
subkey: '6544e3899293153fa7b38331/740ba9f2-8b0f-53e3-bb17-5f38a9616f0e'
},
{
op: 'PUT',
object_id: 'test1',
data: JSON.stringify({ id: 'test1', description: 'test1b' }),
subkey: '6544e3899293153fa7b38331/500e9b68-a2fd-51ff-9c00-313e2fb9f562'
},
// 3
{
op: 'REMOVE',
object_id: 'test1',
data: null,
subkey: '6544e3899293153fa7b38331/500e9b68-a2fd-51ff-9c00-313e2fb9f562'
}
]);
});

test('unchanged data with replica identity full', async () => {
const sync_rules = SqlSyncRules.fromYaml(`
bucket_definitions:
global:
data:
- SELECT id, description FROM "test"
`);
const storage = (await factory()).getInstance({ id: 1, sync_rules, slot_name: 'test' });

const sourceTable = makeTestTable('test', ['id', 'description']);

// Pre-setup
const result1 = await storage.startBatch({}, async (batch) => {
await batch.save({
sourceTable,
tag: 'insert',
after: {
id: 'test1',
description: 'test1a'
}
});
});

const checkpoint1 = result1?.flushed_op ?? '0';

const result2 = await storage.startBatch({}, async (batch) => {
// Unchanged, but has a before id
await batch.save({
sourceTable,
tag: 'update',
before: {
id: 'test1',
description: 'test1a'
},
after: {
id: 'test1',
description: 'test1a'
}
});
});

const result3 = await storage.startBatch({}, async (batch) => {
// Delete
await batch.save({
sourceTable,
tag: 'delete',
before: {
id: 'test1',
description: 'test1a'
},
after: undefined
});
});

const checkpoint3 = result3!.flushed_op;

const batch = await fromAsync(storage.getBucketDataBatch(checkpoint3, new Map([['global[]', checkpoint1]])));
const data = batch[0].data.map((d) => {
return {
op: d.op,
object_id: d.object_id,
data: d.data,
subkey: d.subkey
};
});

// Operations must be in this order
expect(data).toEqual([
// 2
{
op: 'PUT',
object_id: 'test1',
data: JSON.stringify({ id: 'test1', description: 'test1a' }),
subkey: '6544e3899293153fa7b38331/740ba9f2-8b0f-53e3-bb17-5f38a9616f0e'
},
// 3
{
op: 'REMOVE',
object_id: 'test1',
data: null,
subkey: '6544e3899293153fa7b38331/740ba9f2-8b0f-53e3-bb17-5f38a9616f0e'
}
]);
});

test('large batch', async () => {
// Test syncing a batch of data that is small in count,
// but large enough in size to be split over multiple returned batches.
Expand Down
51 changes: 45 additions & 6 deletions packages/service-core/test/src/slow_tests.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ function defineSlowTests(factory: StorageFactory) {
bucket_definitions:
global:
data:
- SELECT id, description FROM "test_data"
- SELECT * FROM "test_data"
`;
const syncRules = await f.updateSyncRules({ content: syncRuleContent });
const storage = f.getInstance(syncRules.parsed());
Expand All @@ -76,7 +76,10 @@ bucket_definitions:
walStream = new WalStream(options);

await pool.query(`DROP TABLE IF EXISTS test_data`);
await pool.query(`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)`);
await pool.query(
`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text, num decimal)`
);
await pool.query(`ALTER TABLE test_data REPLICA IDENTITY FULL`);

await walStream.initReplication(replicationConnection);
await storage.autoActivate();
Expand All @@ -88,14 +91,17 @@ bucket_definitions:

while (!abort && Date.now() - start < TEST_DURATION_MS) {
const bg = async () => {
for (let j = 0; j < 5 && !abort; j++) {
const n = Math.floor(Math.random() * 50);
for (let j = 0; j < 1 && !abort; j++) {
const n = 1;
let statements: pgwire.Statement[] = [];
for (let i = 0; i < n; i++) {
const description = `test${i}`;
statements.push({
statement: `INSERT INTO test_data(description) VALUES($1) returning id as test_id`,
params: [{ type: 'varchar', value: description }]
statement: `INSERT INTO test_data(description, num) VALUES($1, $2) returning id as test_id`,
params: [
{ type: 'varchar', value: description },
{ type: 'float8', value: Math.random() }
]
});
}
const results = await pool.query(...statements);
Expand All @@ -104,6 +110,24 @@ bucket_definitions:
});
await new Promise((resolve) => setTimeout(resolve, Math.random() * 30));

if (Math.random() > 0.5) {
const updateStatements: pgwire.Statement[] = ids.map((id) => {
return {
statement: `UPDATE test_data SET num = $2 WHERE id = $1`,
params: [
{ type: 'uuid', value: id },
{ type: 'float8', value: Math.random() }
]
};
});

await pool.query(...updateStatements);
if (Math.random() > 0.5) {
// Special case - an update that doesn't change data
await pool.query(...updateStatements);
}
}

const deleteStatements: pgwire.Statement[] = ids.map((id) => {
return {
statement: `DELETE FROM test_data WHERE id = $1`,
Expand All @@ -129,6 +153,21 @@ bucket_definitions:
return bson.deserialize((doc.data as mongo.Binary).buffer) as SqliteRow;
});
expect(transformed).toEqual([]);

// Check that each PUT has a REMOVE
const ops = await f.db.bucket_data.find().sort({ _id: 1 }).toArray();
let active = new Set<string>();
for (let op of ops) {
const key = op.source_key.toHexString();
if (op.op == 'PUT') {
active.add(key);
} else if (op.op == 'REMOVE') {
active.delete(key);
}
}
if (active.size > 0) {
throw new Error(`${active.size} rows not removed`);
}
}

abortController.abort();
Expand Down