Skip to content

Commit

Permalink
core: handle case when updating item while a push is ongoing
Browse files Browse the repository at this point in the history
  • Loading branch information
thecodrr committed May 6, 2024
1 parent 63f44d6 commit ac96354
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 3 deletions.
60 changes: 60 additions & 0 deletions packages/core/__e2e__/sync.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,66 @@ test(
TEST_TIMEOUT
);

test(
"edge case 1: items updated while push is running",
async (t) => {
const [deviceA, deviceB] = await Promise.all([
initializeDevice("deviceA"),
initializeDevice("deviceB")
]);

t.onTestFinished(async () => {
console.log(`${t.task.name} log out`);
await cleanup(deviceA, deviceB);
});

const id = await deviceA.notes.add({ title: "hello" });
for (let i = 0; i < 10; ++i) {
await Promise.all([
deviceA.sync({ type: "send" }),
new Promise((resolve) => setTimeout(resolve), 100).then(() =>
deviceA.notes.add({ id, title: `edit ${i}` })
)
]);

expect((await deviceA.notes.note(id))?.synced).toBe(false);
await deviceA.sync({ type: "send" });
await deviceB.sync({ type: "fetch" });
expect((await deviceB.notes.note(id))?.title).toBe(`edit ${i}`);
}
},
TEST_TIMEOUT * 10
);

test(
"edge case 2: new items added while push is running",
async (t) => {
const [deviceA, deviceB] = await Promise.all([
initializeDevice("deviceA"),
initializeDevice("deviceB")
]);

t.onTestFinished(async () => {
console.log(`${t.task.name} log out`);
await cleanup(deviceA, deviceB);
});

for (let i = 0; i < 10; ++i) {
await Promise.all([
deviceA.sync({ type: "send" }),
new Promise((resolve) => setTimeout(resolve), 100).then(() =>
deviceA.notes.add({ title: `note ${i}` })
)
]);
expect(await deviceB.notes.all.count()).toBe(i);
await deviceA.sync({ type: "send" });
await deviceB.sync({ type: "fetch" });
expect(await deviceB.notes.all.count()).toBe(i + 1);
}
},
TEST_TIMEOUT * 10
);

// test(
// "case 4: Device A's sync is interrupted halfway and Device B makes some changes afterwards and syncs.",
// async () => {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/__mocks__/fs.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async function writeEncryptedBase64(
hashType,
iv: "some iv",
salt: key.salt!,
length: data.length
size: data.length
};
}

Expand Down
15 changes: 14 additions & 1 deletion packages/core/src/api/sync/collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class Collector {
for (const itemType of SYNC_ITEM_TYPES) {
const collectionKey = SYNC_COLLECTIONS_MAP[itemType];
const collection = this.db[collectionKey].collection;
let pushTimestamp = Date.now();
for await (const chunk of collection.unsynced(chunkSize, isForceSync)) {
const items = await this.prepareChunk(chunk, key);
if (!items) continue;
Expand All @@ -54,8 +55,20 @@ class Collector {
await collection.update(
chunk.map((i) => i.id),
{ synced: true },
{ sendEvent: false }
{
sendEvent: false,
// EDGE CASE:
// Sometimes an item can get updated while it's being pushed.
// The result is that its `synced` property becomes true even
// though it's modification wasn't yet synced.
// In order to prevent that, we only set the `synced` property
// to true for items that haven't been modified since we last ran
// the push. Everything else will be collected again in the next
// push.
condition: (eb) => eb("dateModified", "<=", pushTimestamp)
}
);
pushTimestamp = Date.now();
}
}
}
Expand Down
10 changes: 9 additions & 1 deletion packages/core/src/database/sql-collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,14 @@ export class SQLCollection<
async update(
ids: string[],
partial: Partial<SQLiteItem<T>>,
options: { sendEvent: boolean } = { sendEvent: true }
options: {
sendEvent: boolean;
condition?: ExpressionOrFactory<
DatabaseSchema,
keyof DatabaseSchema,
SqlBool
>;
} = { sendEvent: true }
) {
if (!this.sanitizer.sanitize(this.type, partial)) return;

Expand All @@ -237,6 +244,7 @@ export class SQLCollection<
await tx
.updateTable<keyof DatabaseSchema>(this.type)
.where("id", "in", chunk)
.$if(!!options.condition, (eb) => eb.where(options.condition!))
.set({
...partial,
dateModified: Date.now(),
Expand Down

0 comments on commit ac96354

Please sign in to comment.