diff --git a/.changeset/metal-fans-argue.md b/.changeset/metal-fans-argue.md new file mode 100644 index 000000000..e933212c1 --- /dev/null +++ b/.changeset/metal-fans-argue.md @@ -0,0 +1,6 @@ +--- +'@powersync/op-sqlite': minor +--- + +Fixed single write transaction operations in `ps_crud` not being processed. Batching update notifications per write lock. +This will also fix downstream features such as watched queries and reactive query hooks in cases where the query is fired before the data was committed, and batching will improve performance specifically in cases where a lot of data changes occur. diff --git a/packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts b/packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts index 52f73531b..bb95dd16e 100644 --- a/packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts +++ b/packages/powersync-op-sqlite/src/db/OPSQLiteConnection.ts @@ -1,40 +1,85 @@ -import { DB, SQLBatchTuple } from '@op-engineering/op-sqlite'; -import { BaseObserver, DBAdapterListener, QueryResult, RowUpdateType } from '@powersync/common'; +import { DB, SQLBatchTuple, UpdateHookOperation } from '@op-engineering/op-sqlite'; +import { + BaseObserver, + BatchedUpdateNotification, + DBAdapterListener, + QueryResult, + RowUpdateType, + UpdateNotification +} from '@powersync/common'; export type OPSQLiteConnectionOptions = { baseDB: DB; }; +export type OPSQLiteUpdateNotification = { + table: string; + operation: UpdateHookOperation; + row?: any; + rowId: number; +}; + export class OPSQLiteConnection extends BaseObserver { protected DB: DB; + private updateBuffer: UpdateNotification[]; + constructor(protected options: OPSQLiteConnectionOptions) { super(); this.DB = options.baseDB; + this.updateBuffer = []; + + this.DB.rollbackHook(() => { + this.updateBuffer = []; + }); - // link table update commands this.DB.updateHook((update) => { - this.iterateListeners((cb) => { - let opType: RowUpdateType; - switch (update.operation) { - case 'INSERT': - opType = RowUpdateType.SQLITE_INSERT; - break; - case 'DELETE': - opType = RowUpdateType.SQLITE_DELETE; - break; - case 'UPDATE': - opType = RowUpdateType.SQLITE_UPDATE; - break; - } - cb.tablesUpdated?.({ - table: update.table, - opType, - rowId: update.rowId - }); - }); + this.addTableUpdate(update); }); } + addTableUpdate(update: OPSQLiteUpdateNotification) { + let opType: RowUpdateType; + switch (update.operation) { + case 'INSERT': + opType = RowUpdateType.SQLITE_INSERT; + break; + case 'DELETE': + opType = RowUpdateType.SQLITE_DELETE; + break; + case 'UPDATE': + opType = RowUpdateType.SQLITE_UPDATE; + break; + } + + this.updateBuffer.push({ + table: update.table, + opType, + rowId: update.rowId + }); + } + + flushUpdates() { + if (!this.updateBuffer.length) { + return; + } + + const groupedUpdates = this.updateBuffer.reduce((grouping: Record, update) => { + const { table } = update; + const updateGroup = grouping[table] || (grouping[table] = []); + updateGroup.push(update); + return grouping; + }, {}); + + const batchedUpdate: BatchedUpdateNotification = { + groupedUpdates, + rawUpdates: this.updateBuffer, + tables: Object.keys(groupedUpdates) + }; + + this.updateBuffer = []; + this.iterateListeners((l) => l.tablesUpdated?.(batchedUpdate)); + } + close() { return this.DB.close(); } diff --git a/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts b/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts index 37a962ef1..4b5be5ca9 100644 --- a/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts +++ b/packages/powersync-op-sqlite/src/db/OPSqliteAdapter.ts @@ -1,12 +1,4 @@ -import { - BaseObserver, - DBAdapter, - DBAdapterListener, - DBLockOptions, - QueryResult, - SQLOpenOptions, - Transaction -} from '@powersync/common'; +import { BaseObserver, DBAdapter, DBAdapterListener, DBLockOptions, QueryResult, Transaction } from '@powersync/common'; import { ANDROID_DATABASE_PATH, IOS_LIBRARY_PATH, open, type DB } from '@op-engineering/op-sqlite'; import Lock from 'async-lock'; import { OPSQLiteConnection } from './OPSQLiteConnection'; @@ -194,13 +186,18 @@ export class OPSQLiteDBAdapter extends BaseObserver implement return new Promise(async (resolve, reject) => { try { - await this.locks.acquire( - LockType.WRITE, - async () => { - resolve(await fn(this.writeConnection!)); - }, - { timeout: options?.timeoutMs } - ); + await this.locks + .acquire( + LockType.WRITE, + async () => { + resolve(await fn(this.writeConnection!)); + }, + { timeout: options?.timeoutMs } + ) + .then(() => { + // flush updates once a write lock has been released + this.writeConnection!.flushUpdates(); + }); } catch (ex) { reject(ex); }