Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/clever-keys-learn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': minor
---

Added auto incrementing operation_id column to Trigger based diff temporary tables and results. This allows for better operation ordering compared to using the previous timestamp column.
91 changes: 79 additions & 12 deletions packages/common/src/client/triggers/TriggerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ export enum DiffTriggerOperation {
* @experimental
* Diffs created by {@link TriggerManager#createDiffTrigger} are stored in a temporary table.
* This is the base record structure for all diff records.
*
* @template TOperationId - The type for `operation_id`. Defaults to `number` as returned by default SQLite database queries.
* Use `string` for full 64-bit precision when using `{ castOperationIdAsText: true }` option.
*/
export interface BaseTriggerDiffRecord {
export interface BaseTriggerDiffRecord<TOperationId extends string | number = number> {
/**
* The modified row's `id` column value.
*/
Expand All @@ -24,6 +27,13 @@ export interface BaseTriggerDiffRecord {
* The operation performed which created this record.
*/
operation: DiffTriggerOperation;
/**
* Auto-incrementing primary key for the operation.
* Defaults to number as returned by database queries (wa-sqlite returns lower 32 bits).
* Can be string for full 64-bit precision when using `{ castOperationIdAsText: true }` option.
*/
operation_id: TOperationId;

/**
* Time the change operation was recorded.
* This is in ISO 8601 format, e.g. `2023-10-01T12:00:00.000Z`.
Expand All @@ -37,7 +47,8 @@ export interface BaseTriggerDiffRecord {
* This record contains the new value and optionally the previous value.
* Values are stored as JSON strings.
*/
export interface TriggerDiffUpdateRecord extends BaseTriggerDiffRecord {
export interface TriggerDiffUpdateRecord<TOperationId extends string | number = number>
extends BaseTriggerDiffRecord<TOperationId> {
operation: DiffTriggerOperation.UPDATE;
/**
* The updated state of the row in JSON string format.
Expand All @@ -54,7 +65,8 @@ export interface TriggerDiffUpdateRecord extends BaseTriggerDiffRecord {
* Represents a diff record for a SQLite INSERT operation.
* This record contains the new value represented as a JSON string.
*/
export interface TriggerDiffInsertRecord extends BaseTriggerDiffRecord {
export interface TriggerDiffInsertRecord<TOperationId extends string | number = number>
extends BaseTriggerDiffRecord<TOperationId> {
operation: DiffTriggerOperation.INSERT;
/**
* The value of the row, at the time of INSERT, in JSON string format.
Expand All @@ -67,7 +79,8 @@ export interface TriggerDiffInsertRecord extends BaseTriggerDiffRecord {
* Represents a diff record for a SQLite DELETE operation.
* This record contains the new value represented as a JSON string.
*/
export interface TriggerDiffDeleteRecord extends BaseTriggerDiffRecord {
export interface TriggerDiffDeleteRecord<TOperationId extends string | number = number>
extends BaseTriggerDiffRecord<TOperationId> {
operation: DiffTriggerOperation.DELETE;
/**
* The value of the row, before the DELETE operation, in JSON string format.
Expand All @@ -82,27 +95,53 @@ export interface TriggerDiffDeleteRecord extends BaseTriggerDiffRecord {
*
* Querying the DIFF table directly with {@link TriggerDiffHandlerContext#withDiff} will return records
* with the structure of this type.
*
* @template TOperationId - The type for `operation_id`. Defaults to `number` as returned by database queries.
* Use `string` for full 64-bit precision when using `{ castOperationIdAsText: true }` option.
*
* @example
* ```typescript
* // Default: operation_id is number
* const diffs = await context.withDiff<TriggerDiffRecord>('SELECT * FROM DIFF');
* diff.forEach(diff => console.log(diff.operation, diff.timestamp, JSON.parse(diff.value)))
*
* // With string operation_id for full precision
* const diffsWithString = await context.withDiff<TriggerDiffRecord<string>>(
* 'SELECT * FROM DIFF',
* undefined,
* { castOperationIdAsText: true }
* );
* ```
*/
export type TriggerDiffRecord = TriggerDiffUpdateRecord | TriggerDiffInsertRecord | TriggerDiffDeleteRecord;
export type TriggerDiffRecord<TOperationId extends string | number = number> =
| TriggerDiffUpdateRecord<TOperationId>
| TriggerDiffInsertRecord<TOperationId>
| TriggerDiffDeleteRecord<TOperationId>;

/**
* @experimental
* Querying the DIFF table directly with {@link TriggerDiffHandlerContext#withExtractedDiff} will return records
* with the tracked columns extracted from the JSON value.
* This type represents the structure of such records.
*
* @template T - The type for the extracted columns from the tracked JSON value.
* @template TOperationId - The type for `operation_id`. Defaults to `number` as returned by database queries.
* Use `string` for full 64-bit precision when using `{ castOperationIdAsText: true }` option.
*
* @example
* ```typescript
* // Default: operation_id is number
* const diffs = await context.withExtractedDiff<ExtractedTriggerDiffRecord<{id: string, name: string}>>('SELECT * FROM DIFF');
* diff.forEach(diff => console.log(diff.__operation, diff.__timestamp, diff.columnName))
*
* // With string operation_id for full precision
* const diffsWithString = await context.withExtractedDiff<ExtractedTriggerDiffRecord<{id: string, name: string}, string>>(
* 'SELECT * FROM DIFF',
* undefined,
* { castOperationIdAsText: true }
* );
* ```
*/
export type ExtractedTriggerDiffRecord<T> = T & {
[K in keyof Omit<BaseTriggerDiffRecord, 'id'> as `__${string & K}`]: TriggerDiffRecord[K];
export type ExtractedTriggerDiffRecord<T, TOperationId extends string | number = number> = T & {
[K in keyof Omit<BaseTriggerDiffRecord<TOperationId>, 'id'> as `__${string & K}`]: TriggerDiffRecord<TOperationId>[K];
} & {
__previous_value?: string;
};
Expand Down Expand Up @@ -183,6 +222,21 @@ export interface CreateDiffTriggerOptions extends BaseCreateDiffTriggerOptions {
*/
export type TriggerRemoveCallback = () => Promise<void>;

/**
* @experimental
* Options for {@link TriggerDiffHandlerContext#withDiff}.
*/
export interface WithDiffOptions {
/**
* If true, casts `operation_id` as TEXT in the internal CTE to preserve full 64-bit precision.
* Use this when you need to ensure `operation_id` is treated as a string to avoid precision loss
* for values exceeding JavaScript's Number.MAX_SAFE_INTEGER.
*
* When enabled, use {@link TriggerDiffRecord}<string> to type the result correctly.
*/
castOperationIdAsText?: boolean;
}

/**
* @experimental
* Context for the `onChange` handler provided to {@link TriggerManager#trackTableDiff}.
Expand All @@ -200,9 +254,10 @@ export interface TriggerDiffHandlerContext extends LockContext {
* The `DIFF` table is of the form described in {@link TriggerManager#createDiffTrigger}
* ```sql
* CREATE TEMP DIFF (
* operation_id INTEGER PRIMARY KEY AUTOINCREMENT,
* id TEXT,
* operation TEXT,
* timestamp TEXT
* timestamp TEXT,
* value TEXT,
* previous_value TEXT
* );
Expand All @@ -222,8 +277,19 @@ export interface TriggerDiffHandlerContext extends LockContext {
* JOIN todos ON DIFF.id = todos.id
* WHERE json_extract(DIFF.value, '$.status') = 'active'
* ```
*
* @example
* ```typescript
* // With operation_id cast as TEXT for full precision
* const diffs = await context.withDiff<TriggerDiffRecord<string>>(
* 'SELECT * FROM DIFF',
* undefined,
* { castOperationIdAsText: true }
* );
* // diffs[0].operation_id is now typed as string
* ```
*/
withDiff: <T = any>(query: string, params?: ReadonlyArray<Readonly<any>>) => Promise<T[]>;
withDiff: <T = any>(query: string, params?: ReadonlyArray<Readonly<any>>, options?: WithDiffOptions) => Promise<T[]>;

/**
* Allows querying the database with access to the table containing diff records.
Expand Down Expand Up @@ -292,9 +358,10 @@ export interface TriggerManager {
*
* ```sql
* CREATE TEMP TABLE ${destination} (
* operation_id INTEGER PRIMARY KEY AUTOINCREMENT,
* id TEXT,
* operation TEXT,
* timestamp TEXT
* timestamp TEXT,
* value TEXT,
* previous_value TEXT
* );
Expand Down
18 changes: 12 additions & 6 deletions packages/common/src/client/triggers/TriggerManagerImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import {
DiffTriggerOperation,
TrackDiffOptions,
TriggerManager,
TriggerRemoveCallback
TriggerRemoveCallback,
WithDiffOptions
} from './TriggerManager.js';

export type TriggerManagerImplOptions = {
Expand Down Expand Up @@ -117,6 +118,7 @@ export class TriggerManagerImpl implements TriggerManager {
await hooks?.beforeCreate?.(tx);
await tx.execute(/* sql */ `
CREATE TEMP TABLE ${destination} (
operation_id INTEGER PRIMARY KEY AUTOINCREMENT,
id TEXT,
operation TEXT,
timestamp TEXT,
Expand Down Expand Up @@ -243,17 +245,20 @@ export class TriggerManagerImpl implements TriggerManager {
const callbackResult = await options.onChange({
...tx,
destinationTable: destination,
withDiff: async <T>(query, params) => {
withDiff: async <T>(query, params, options?: WithDiffOptions) => {
// Wrap the query to expose the destination table
const operationIdSelect = options?.castOperationIdAsText
? 'id, operation, CAST(operation_id AS TEXT) as operation_id, timestamp, value, previous_value'
: '*';
const wrappedQuery = /* sql */ `
WITH
DIFF AS (
SELECT
*
${operationIdSelect}
FROM
${destination}
ORDER BY
timestamp ASC
operation_id ASC
) ${query}
`;
return tx.getAll<T>(wrappedQuery, params);
Expand All @@ -267,13 +272,14 @@ export class TriggerManagerImpl implements TriggerManager {
id,
${contextColumns.length > 0
? `${contextColumns.map((col) => `json_extract(value, '$.${col}') as ${col}`).join(', ')},`
: ''} operation as __operation,
: ''} operation_id as __operation_id,
operation as __operation,
timestamp as __timestamp,
previous_value as __previous_value
FROM
${destination}
ORDER BY
__timestamp ASC
__operation_id ASC
) ${query}
`;
return tx.getAll<T>(wrappedQuery, params);
Expand Down
99 changes: 99 additions & 0 deletions packages/node/tests/trigger.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ describe('Triggers', () => {
() => {
expect(results.length).toEqual(3);

// Check that operation_id values exist and are numbers
expect(results[0].operation_id).toBeDefined();
expect(typeof results[0].operation_id).toBe('number');
expect(results[0].operation_id).eq(1);
expect(results[1].operation_id).toBeDefined();
expect(results[1].operation_id).eq(2);

expect(results[0].operation).toEqual(DiffTriggerOperation.INSERT);
const parsedInsert = JSON.parse(results[0].value);
// only the filtered columns should be tracked
Expand Down Expand Up @@ -605,4 +612,96 @@ describe('Triggers', () => {
expect(changes[4].columnB).toBeUndefined();
expect(changes[4].__previous_value).toBeNull();
});

databaseTest('Should cast operation_id as string with withDiff option', async ({ database }) => {
const results: TriggerDiffRecord<string>[] = [];

await database.triggers.trackTableDiff({
source: 'todos',
columns: ['content'],
when: {
[DiffTriggerOperation.INSERT]: 'TRUE'
},
onChange: async (context) => {
const diffs = await context.withDiff<TriggerDiffRecord<string>>('SELECT * FROM DIFF', undefined, {
castOperationIdAsText: true
});
results.push(...diffs);
}
});

await database.execute('INSERT INTO todos (id, content) VALUES (uuid(), ?);', ['test 1']);
await database.execute('INSERT INTO todos (id, content) VALUES (uuid(), ?);', ['test 2']);

await vi.waitFor(
() => {
expect(results.length).toEqual(2);
// Check that operation_id is a string when castOperationIdAsText is enabled
expect(typeof results[0].operation_id).toBe('string');
expect(typeof results[1].operation_id).toBe('string');
// Should be incrementing
expect(Number.parseInt(results[0].operation_id)).toBeLessThan(Number.parseInt(results[1].operation_id));
},
{ timeout: 1000 }
);
});

databaseTest('Should report changes in transaction order using operation_id', async ({ database }) => {
const results: TriggerDiffRecord[] = [];

await database.triggers.trackTableDiff({
source: 'todos',
columns: ['content'],
when: {
[DiffTriggerOperation.INSERT]: 'TRUE',
[DiffTriggerOperation.UPDATE]: 'TRUE',
[DiffTriggerOperation.DELETE]: 'TRUE'
},
onChange: async (context) => {
const diffs = await context.withDiff<TriggerDiffRecord>('SELECT * FROM DIFF');
results.push(...diffs);
}
});

// Perform multiple operations in a single transaction
const contents = ['first', 'second', 'third', 'fourth'];
await database.writeLock(async (tx) => {
// Insert first todo
await tx.execute('INSERT INTO todos (id, content) VALUES (uuid(), ?);', [contents[0]]);
// Insert second todo
await tx.execute('INSERT INTO todos (id, content) VALUES (uuid(), ?);', [contents[1]]);
// Update first todo
await tx.execute('UPDATE todos SET content = ? WHERE content = ?;', [contents[2], contents[0]]);
// Delete second todo
await tx.execute('DELETE FROM todos WHERE content = ?;', [contents[1]]);
});

await vi.waitFor(
() => {
expect(results.length).toEqual(4);

// Verify operation_ids are incrementing (ensuring order)
expect(results[0].operation_id).toBeLessThan(results[1].operation_id);
expect(results[1].operation_id).toBeLessThan(results[2].operation_id);
expect(results[2].operation_id).toBeLessThan(results[3].operation_id);

// Verify operations are in the correct order
expect(results[0].operation).toBe(DiffTriggerOperation.INSERT);
expect(JSON.parse(results[0].value).content).toBe(contents[0]);

expect(results[1].operation).toBe(DiffTriggerOperation.INSERT);
expect(JSON.parse(results[1].value).content).toBe(contents[1]);

expect(results[2].operation).toBe(DiffTriggerOperation.UPDATE);
if (results[2].operation === DiffTriggerOperation.UPDATE) {
expect(JSON.parse(results[2].value).content).toBe(contents[2]);
expect(JSON.parse(results[2].previous_value).content).toBe(contents[0]);
}

expect(results[3].operation).toBe(DiffTriggerOperation.DELETE);
expect(JSON.parse(results[3].value).content).toBe(contents[1]);
},
{ timeout: 1000 }
);
});
});