diff --git a/.changeset/chilled-queens-explain.md b/.changeset/chilled-queens-explain.md new file mode 100644 index 0000000..bdb9a25 --- /dev/null +++ b/.changeset/chilled-queens-explain.md @@ -0,0 +1,5 @@ +--- +'@journeyapps/react-native-quick-sqlite': minor +--- + +Added `registerTablesChangedHook` to DB connections which reports batched table updates once `writeTransaction`s and `writeLock`s have been committed. Maintained API compatibility with `registerUpdateHook` which reports table change events as they occur. Added listeners for when write transactions have been committed or rolled back. diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 1843f9c..26717a7 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -8,7 +8,7 @@ jobs: name: Test runs-on: macos-latest env: - AVD_NAME: macOS-avd-x86_64-29 + AVD_NAME: macOS-avd-x86_64-31 steps: - uses: actions/checkout@v4 with: @@ -24,7 +24,7 @@ jobs: path: | ~/.android/avd/* ~/.android/adb* - key: avd-29 + key: avd-31 - name: Setup NodeJS uses: actions/setup-node@v2 @@ -54,9 +54,9 @@ jobs: - name: create AVD and generate snapshot for caching if: steps.avd-cache.outputs.cache-hit != 'true' - uses: reactivecircus/android-emulator-runner@v2 + uses: reactivecircus/android-emulator-runner@v2.28.0 with: - api-level: 29 + api-level: 31 force-avd-creation: false target: google_apis arch: x86_64 @@ -66,9 +66,9 @@ jobs: script: echo "Generated AVD snapshot for caching." - name: Run connected tests - uses: ReactiveCircus/android-emulator-runner@v2 + uses: ReactiveCircus/android-emulator-runner@v2.28.0 with: - api-level: 29 + api-level: 31 target: google_apis arch: x86_64 avd-name: $AVD_NAME diff --git a/cpp/ConnectionPool.cpp b/cpp/ConnectionPool.cpp index 8288e78..7ab5ccf 100644 --- a/cpp/ConnectionPool.cpp +++ b/cpp/ConnectionPool.cpp @@ -9,7 +9,13 @@ ConnectionPool::ConnectionPool(std::string dbName, std::string docPath, : dbName(dbName), maxReads(numReadConnections), writeConnection(dbName, docPath, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | - SQLITE_OPEN_FULLMUTEX) { + SQLITE_OPEN_FULLMUTEX), + commitPayload( + {.dbName = &this->dbName, .event = TransactionEvent::COMMIT}), + rollbackPayload({ + .dbName = &this->dbName, + .event = TransactionEvent::ROLLBACK, + }) { onContextCallback = nullptr; isConcurrencyEnabled = maxReads > 0; @@ -127,6 +133,26 @@ void ConnectionPool::setTableUpdateHandler( (void *)(dbName.c_str())); } +/** + * The SQLite callback needs to return `0` in order for the commit to + * proceed correctly + */ +int onCommitIntermediate(ConnectionPool *pool) { + if (pool->onCommitCallback != NULL) { + pool->onCommitCallback(&(pool->commitPayload)); + } + return 0; +} + +void ConnectionPool::setTransactionFinalizerHandler( + void (*callback)(const TransactionCallbackPayload *)) { + this->onCommitCallback = callback; + sqlite3_commit_hook(writeConnection.connection, + (int (*)(void *))onCommitIntermediate, (void *)this); + sqlite3_rollback_hook(writeConnection.connection, (void (*)(void *))callback, + (void *)&rollbackPayload); +} + void ConnectionPool::closeContext(ConnectionLockId contextId) { if (writeConnection.matchesLock(contextId)) { if (writeQueue.size() > 0) { diff --git a/cpp/ConnectionPool.h b/cpp/ConnectionPool.h index 2ebecae..28d9b1f 100644 --- a/cpp/ConnectionPool.h +++ b/cpp/ConnectionPool.h @@ -7,6 +7,13 @@ #ifndef ConnectionPool_h #define ConnectionPool_h +enum TransactionEvent { COMMIT, ROLLBACK }; + +struct TransactionCallbackPayload { + std::string *dbName; + TransactionEvent event; +}; + // The number of concurrent read connections to the database. /** * Concurrent connection pool class. @@ -57,7 +64,12 @@ class ConnectionPool { std::vector readQueue; std::vector writeQueue; + // Cached constant payloads for c style commit/rollback callbacks + const TransactionCallbackPayload commitPayload; + const TransactionCallbackPayload rollbackPayload; + void (*onContextCallback)(std::string, ConnectionLockId); + void (*onCommitCallback)(const TransactionCallbackPayload *); bool isConcurrencyEnabled; @@ -66,6 +78,8 @@ class ConnectionPool { unsigned int numReadConnections); ~ConnectionPool(); + friend int onCommitIntermediate(ConnectionPool *pool); + /** * Add a task to the read queue. If there are no available connections, * the task will be queued. @@ -94,6 +108,12 @@ class ConnectionPool { void setTableUpdateHandler(void (*callback)(void *, int, const char *, const char *, sqlite3_int64)); + /** + * Set a callback function for transaction commits/rollbacks + */ + void setTransactionFinalizerHandler( + void (*callback)(const TransactionCallbackPayload *)); + /** * Close a context in order to progress queue */ @@ -124,4 +144,6 @@ class ConnectionPool { sqlite3 **db, int sqlOpenFlags); }; +int onCommitIntermediate(ConnectionPool *pool); + #endif \ No newline at end of file diff --git a/cpp/bindings.cpp b/cpp/bindings.cpp index a7fe689..320d39c 100644 --- a/cpp/bindings.cpp +++ b/cpp/bindings.cpp @@ -67,6 +67,32 @@ void updateTableHandler(void *voidDBName, int opType, char const *dbName, }); } +/** + * Callback handler for SQLite transaction updates + */ +void transactionFinalizerHandler(const TransactionCallbackPayload *payload) { + /** + * No DB operations should occur when this callback is fired from SQLite. + * This function triggers an async invocation to call watch callbacks, + * avoiding holding SQLite up. + */ + invoker->invokeAsync([payload] { + try { + auto global = runtime->global(); + jsi::Function handlerFunction = global.getPropertyAsFunction( + *runtime, "triggerTransactionFinalizerHook"); + + auto jsiDbName = jsi::String::createFromAscii(*runtime, *payload->dbName); + auto jsiEventType = jsi::Value((int)payload->event); + handlerFunction.call(*runtime, move(jsiDbName), move(jsiEventType)); + } catch (jsi::JSINativeException e) { + std::cout << e.what() << std::endl; + } catch (...) { + std::cout << "Unknown error" << std::endl; + } + }); +} + /** * Callback handler for Concurrent context is available */ @@ -137,9 +163,9 @@ void osp::install(jsi::Runtime &rt, } } - auto result = - sqliteOpenDb(dbName, tempDocPath, &contextLockAvailableHandler, - &updateTableHandler, numReadConnections); + auto result = sqliteOpenDb( + dbName, tempDocPath, &contextLockAvailableHandler, &updateTableHandler, + &transactionFinalizerHandler, numReadConnections); if (result.type == SQLiteError) { throw jsi::JSError(rt, result.errorMessage.c_str()); } diff --git a/cpp/sqliteBridge.cpp b/cpp/sqliteBridge.cpp index 2450077..c43cf11 100644 --- a/cpp/sqliteBridge.cpp +++ b/cpp/sqliteBridge.cpp @@ -40,6 +40,8 @@ sqliteOpenDb(string const dbName, string const docPath, void (*contextAvailableCallback)(std::string, ConnectionLockId), void (*updateTableCallback)(void *, int, const char *, const char *, sqlite3_int64), + void (*onTransactionFinalizedCallback)( + const TransactionCallbackPayload *event), uint32_t numReadConnections) { if (dbMap.count(dbName) == 1) { return SQLiteOPResult{ @@ -51,6 +53,7 @@ sqliteOpenDb(string const dbName, string const docPath, dbMap[dbName] = new ConnectionPool(dbName, docPath, numReadConnections); dbMap[dbName]->setOnContextAvailable(contextAvailableCallback); dbMap[dbName]->setTableUpdateHandler(updateTableCallback); + dbMap[dbName]->setTransactionFinalizerHandler(onTransactionFinalizedCallback); return SQLiteOPResult{ .type = SQLiteOk, diff --git a/cpp/sqliteBridge.h b/cpp/sqliteBridge.h index a24222d..d099d5c 100644 --- a/cpp/sqliteBridge.h +++ b/cpp/sqliteBridge.h @@ -29,6 +29,8 @@ sqliteOpenDb(std::string const dbName, std::string const docPath, void (*contextAvailableCallback)(std::string, ConnectionLockId), void (*updateTableCallback)(void *, int, const char *, const char *, sqlite3_int64), + void (*onTransactionFinalizedCallback)( + const TransactionCallbackPayload *event), uint32_t numReadConnections); SQLiteOPResult sqliteCloseDb(string const dbName); diff --git a/src/DBListenerManager.ts b/src/DBListenerManager.ts new file mode 100644 index 0000000..4cf6e7c --- /dev/null +++ b/src/DBListenerManager.ts @@ -0,0 +1,99 @@ +import _ from 'lodash'; +import { registerTransactionHook, registerUpdateHook } from './table-updates'; +import { + BatchedUpdateCallback, + BatchedUpdateNotification, + TransactionEvent, + UpdateCallback, + UpdateNotification +} from './types'; +import { BaseListener, BaseObserver } from './utils/BaseObserver'; + +export interface DBListenerManagerOptions { + dbName: string; +} + +export interface WriteTransactionEvent { + type: TransactionEvent; +} + +export interface DBListener extends BaseListener { + /** + * Register a listener to be fired for any table change. + * Changes inside write locks and transactions are reported immediately. + */ + rawTableChange: UpdateCallback; + + /** + * Register a listener for when table changes are persisted + * into the DB. Changes during write transactions which are + * rolled back are not reported. + * Any changes during write locks are buffered and reported + * after transaction commit and lock release. + * Table changes are reported individually for now in order to maintain + * API compatibility. These can be batched in future. + */ + tablesUpdated: BatchedUpdateCallback; + + /** + * Listener event triggered whenever a write transaction + * is started, committed or rolled back. + */ + writeTransaction: (event: WriteTransactionEvent) => void; +} + +export class DBListenerManager extends BaseObserver {} + +export class DBListenerManagerInternal extends DBListenerManager { + private updateBuffer: UpdateNotification[]; + + constructor(protected options: DBListenerManagerOptions) { + super(); + this.updateBuffer = []; + registerUpdateHook(this.options.dbName, (update) => this.handleTableUpdates(update)); + registerTransactionHook(this.options.dbName, (eventType) => { + switch (eventType) { + case TransactionEvent.COMMIT: + this.flushUpdates(); + break; + case TransactionEvent.ROLLBACK: + this.transactionReverted(); + break; + } + + this.iterateListeners((l) => + l.writeTransaction?.({ + type: eventType + }) + ); + }); + } + + flushUpdates() { + if (!this.updateBuffer.length) { + return; + } + + const groupedUpdates = _.groupBy(this.updateBuffer, (update) => update.table); + const batchedUpdate: BatchedUpdateNotification = { + groupedUpdates, + rawUpdates: this.updateBuffer, + tables: _.keys(groupedUpdates) + }; + this.updateBuffer = []; + this.iterateListeners((l) => l.tablesUpdated?.(batchedUpdate)); + } + + protected transactionReverted() { + // clear updates + this.updateBuffer = []; + } + + handleTableUpdates(notification: UpdateNotification) { + // Fire updates for any change + this.iterateListeners((l) => l.rawTableChange?.({ ...notification })); + + // Queue changes until they are flushed + this.updateBuffer.push(notification); + } +} diff --git a/src/lock-hooks.ts b/src/lock-hooks.ts new file mode 100644 index 0000000..c8fb89e --- /dev/null +++ b/src/lock-hooks.ts @@ -0,0 +1,7 @@ +/** + * Hooks which can be triggered during the execution of read/write locks + */ +export interface LockHooks { + lockAcquired?: () => Promise; + lockReleased?: () => Promise; +} diff --git a/src/setup-open.ts b/src/setup-open.ts index 5e776df..7d62cf5 100644 --- a/src/setup-open.ts +++ b/src/setup-open.ts @@ -8,19 +8,26 @@ import { TransactionContext, UpdateCallback, SQLBatchTuple, - OpenOptions + OpenOptions, + QueryResult } from './types'; import uuid from 'uuid'; import _ from 'lodash'; import { enhanceQueryResult } from './utils'; -import { registerUpdateHook } from './table-updates'; +import { DBListenerManagerInternal } from './DBListenerManager'; +import { LockHooks } from './lock-hooks'; type LockCallbackRecord = { callback: (context: LockContext) => Promise; timeout?: NodeJS.Timeout; }; +enum TransactionFinalizer { + COMMIT = 'commit', + ROLLBACK = 'rollback' +} + const DEFAULT_READ_CONNECTIONS = 4; const LockCallbacks: Record = {}; @@ -90,6 +97,8 @@ export function setupOpen(QuickSQLite: ISQLite) { numReadConnections: options?.numReadConnections ?? DEFAULT_READ_CONNECTIONS }); + const listenerManager = new DBListenerManagerInternal({ dbName }); + /** * Wraps lock requests and their callbacks in order to resolve the lock * request with the callback result once triggered from the connection pool. @@ -97,7 +106,8 @@ export function setupOpen(QuickSQLite: ISQLite) { const requestLock = ( type: ConcurrentLockType, callback: (context: LockContext) => Promise, - options?: LockOptions + options?: LockOptions, + hooks?: LockHooks ): Promise => { const id = uuid.v4(); // TODO maybe do this in C++ // Wrap the callback in a promise that will resolve to the callback result @@ -106,12 +116,15 @@ export function setupOpen(QuickSQLite: ISQLite) { const record = (LockCallbacks[id] = { callback: async (context: LockContext) => { try { + await hooks?.lockAcquired?.(); const res = await callback(context); // Ensure that we only resolve after locks are freed _.defer(() => resolve(res)); } catch (ex) { _.defer(() => reject(ex)); + } finally { + _.defer(() => hooks?.lockReleased?.()); } } } as LockCallbackRecord); @@ -138,12 +151,17 @@ export function setupOpen(QuickSQLite: ISQLite) { requestLock(ConcurrentLockType.READ, callback, options); const writeLock = (callback: (context: LockContext) => Promise, options?: LockOptions): Promise => - requestLock(ConcurrentLockType.WRITE, callback, options); + requestLock(ConcurrentLockType.WRITE, callback, options, { + lockReleased: async () => { + // flush updates once a write lock has been released + listenerManager.flushUpdates(); + } + }); const wrapTransaction = async ( context: LockContext, callback: (context: TransactionContext) => Promise, - defaultFinally: 'commit' | 'rollback' = 'commit' + defaultFinalizer: TransactionFinalizer = TransactionFinalizer.COMMIT ) => { await context.execute('BEGIN TRANSACTION'); let finalized = false; @@ -158,15 +176,15 @@ export function setupOpen(QuickSQLite: ISQLite) { return action(); }; - const commit = finalizedStatement(() => context.execute('COMMIT')); - const commitAsync = finalizedStatement(() => context.execute('COMMIT')); + const commit = finalizedStatement(async () => context.execute('COMMIT')); - const rollback = finalizedStatement(() => context.execute('ROLLBACK')); - const rollbackAsync = finalizedStatement(() => context.execute('ROLLBACK')); + const rollback = finalizedStatement(async () => context.execute('ROLLBACK')); const wrapExecute = - (method: (sql: string, params?: any[]) => T): ((sql: string, params?: any[]) => T) => - (sql: string, params?: any[]) => { + ( + method: (sql: string, params?: any[]) => Promise + ): ((sql: string, params?: any[]) => Promise) => + async (sql: string, params?: any[]) => { if (finalized) { throw new Error(`Cannot execute in transaction after it has been finalized with commit/rollback.`); } @@ -180,17 +198,17 @@ export function setupOpen(QuickSQLite: ISQLite) { rollback, execute: wrapExecute(context.execute) }); - switch (defaultFinally) { - case 'commit': - await commitAsync(); + switch (defaultFinalizer) { + case TransactionFinalizer.COMMIT: + await commit(); break; - case 'rollback': - await rollbackAsync(); + case TransactionFinalizer.ROLLBACK: + await rollback(); break; } return res; } catch (ex) { - await rollbackAsync(); + await rollback(); throw ex; } }; @@ -204,10 +222,7 @@ export function setupOpen(QuickSQLite: ISQLite) { readLock((context) => wrapTransaction(context, callback)), writeLock, writeTransaction: async (callback: (context: TransactionContext) => Promise, options?: LockOptions) => - writeLock((context) => wrapTransaction(context, callback), options), - registerUpdateHook: (callback: UpdateCallback) => { - registerUpdateHook(dbName, callback); - }, + writeLock((context) => wrapTransaction(context, callback, TransactionFinalizer.COMMIT), options), delete: () => QuickSQLite.delete(dbName, options?.location), executeBatch: (commands: SQLBatchTuple[]) => writeLock((context) => QuickSQLite.executeBatch(dbName, commands, (context as any)._contextId)), @@ -215,7 +230,11 @@ export function setupOpen(QuickSQLite: ISQLite) { QuickSQLite.attach(dbName, dbNameToAttach, alias, location), detach: (alias: string) => QuickSQLite.detach(dbName, alias), loadFile: (location: string) => - writeLock((context) => QuickSQLite.loadFile(dbName, location, (context as any)._contextId)) + writeLock((context) => QuickSQLite.loadFile(dbName, location, (context as any)._contextId)), + listenerManager, + registerUpdateHook: (callback: UpdateCallback) => + listenerManager.registerListener({ rawTableChange: callback }), + registerTablesChangedHook: (callback) => listenerManager.registerListener({ tablesUpdated: callback }) }; } }; diff --git a/src/table-updates.ts b/src/table-updates.ts index 52d3e9d..9af3fcc 100644 --- a/src/table-updates.ts +++ b/src/table-updates.ts @@ -1,6 +1,7 @@ -import { RowUpdateType, UpdateCallback } from './types'; +import { RowUpdateType, TransactionCallback, TransactionEvent, UpdateCallback } from './types'; const updateCallbacks: Record = {}; +const transactionCallbacks: Record = {}; /** * Entry point for update callbacks. This is triggered from C++ with params. @@ -22,3 +23,20 @@ global.triggerUpdateHook = function (dbName: string, table: string, opType: RowU export const registerUpdateHook = (dbName: string, callback: UpdateCallback) => { updateCallbacks[dbName] = callback; }; + +/** + * Entry point for transaction callbacks. This is triggered from C++ with params. + */ +global.triggerTransactionFinalizerHook = function (dbName: string, eventType: TransactionEvent) { + const callback = transactionCallbacks[dbName]; + if (!callback) { + return; + } + + callback(eventType); + return null; +}; + +export const registerTransactionHook = (dbName: string, callback: TransactionCallback) => { + transactionCallbacks[dbName] = callback; +}; diff --git a/src/types.ts b/src/types.ts index 8191295..c4c29ac 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,3 +1,5 @@ +import { DBListenerManager } from './DBListenerManager'; + /** * Object returned by SQL Query executions { * insertId: Represent the auto-generated row id if applicable @@ -72,13 +74,30 @@ export enum RowUpdateType { SQLITE_DELETE = 9, SQLITE_UPDATE = 23 } -export interface UpdateNotification { + +export interface TableUpdateOperation { opType: RowUpdateType; - table: string; rowId: number; } +export interface UpdateNotification extends TableUpdateOperation { + table: string; +} + +export interface BatchedUpdateNotification { + rawUpdates: UpdateNotification[]; + tables: string[]; + groupedUpdates: Record; +} export type UpdateCallback = (update: UpdateNotification) => void; +export type BatchedUpdateCallback = (update: BatchedUpdateNotification) => void; + +export enum TransactionEvent { + COMMIT, + ROLLBACK +} + +export type TransactionCallback = (eventType: TransactionEvent) => void; export type ContextLockID = string; @@ -149,8 +168,22 @@ export type QuickSQLiteConnection = { executeBatch: (commands: SQLBatchTuple[]) => Promise; loadFile: (location: string) => Promise; /** - * Note that only one listener can be registered per database connection. - * Any new hook registration will override the previous one. + * Register a callback which will be fired for each ROWID table change event. + * Table changes are reported immediately. + * Changes might not yet be committed if using a transaction. + * - Listen to transaction events in listenerManager if extra logic is required + * For most use cases use `registerTablesChangedHook` instead. + * @returns a function which will deregister the callback + */ + registerUpdateHook(callback: UpdateCallback): () => void; + /** + * Register a callback which will be fired whenever a update to a ROWID table + * has been committed. + * Changes inside write locks will be buffered until the lock is released or + * if a transaction inside the lock has been committed. + * Reverting a transaction inside a write lock will not fire table updates. + * @returns a function which will deregister the callback */ - registerUpdateHook(callback: UpdateCallback): void; + registerTablesChangedHook(callback: BatchedUpdateCallback): () => void; + listenerManager: DBListenerManager; }; diff --git a/src/utils/BaseObserver.ts b/src/utils/BaseObserver.ts new file mode 100644 index 0000000..90195bf --- /dev/null +++ b/src/utils/BaseObserver.ts @@ -0,0 +1,31 @@ +import { v4 } from 'uuid'; + +export interface BaseObserverInterface { + registerListener(listener: Partial): () => void; +} + +export type BaseListener = { + [key: string]: (...event: any) => any; +}; + +export class BaseObserver implements BaseObserverInterface { + protected listeners: { [id: string]: Partial }; + + constructor() { + this.listeners = {}; + } + + registerListener(listener: Partial): () => void { + const id = v4(); + this.listeners[id] = listener; + return () => { + delete this.listeners[id]; + }; + } + + iterateListeners(cb: (listener: Partial) => any) { + for (let i in this.listeners) { + cb(this.listeners[i]); + } + } +} diff --git a/tests/ios/Podfile.lock b/tests/ios/Podfile.lock index 0a5a6af..1b69350 100644 --- a/tests/ios/Podfile.lock +++ b/tests/ios/Podfile.lock @@ -340,7 +340,7 @@ PODS: - glog - react-native-get-random-values (1.9.0): - React-Core - - react-native-quick-sqlite (0.0.1): + - react-native-quick-sqlite (1.0.0): - powersync-sqlite-core - React - React-callinvoker @@ -658,7 +658,7 @@ SPEC CHECKSUMS: React-jsinspector: 194e32c6aab382d88713ad3dd0025c5f5c4ee072 React-logger: cebf22b6cf43434e471dc561e5911b40ac01d289 react-native-get-random-values: dee677497c6a740b71e5612e8dbd83e7539ed5bb - react-native-quick-sqlite: bb695834cb1f4b88c3c2ae413ecf6522e9d221c3 + react-native-quick-sqlite: d55edadf2d63f9ca9e3bc4bb138bf61739294413 react-native-safe-area-context: 238cd8b619e05cb904ccad97ef42e84d1b5ae6ec React-NativeModulesApple: 02e35e9a51e10c6422f04f5e4076a7c02243fff2 React-perflogger: e3596db7e753f51766bceadc061936ef1472edc3 diff --git a/tests/tests/sqlite/rawQueries.spec.ts b/tests/tests/sqlite/rawQueries.spec.ts index 19890eb..46ea651 100644 --- a/tests/tests/sqlite/rawQueries.spec.ts +++ b/tests/tests/sqlite/rawQueries.spec.ts @@ -1,5 +1,14 @@ import Chance from 'chance'; -import { open, QuickSQLite, QuickSQLiteConnection, SQLBatchTuple, UpdateNotification } from 'react-native-quick-sqlite'; +import { + BatchedUpdateNotification, + open, + QueryResult, + QuickSQLite, + QuickSQLiteConnection, + SQLBatchTuple, + TransactionEvent, + UpdateNotification +} from 'react-native-quick-sqlite'; import { beforeEach, describe, it } from '../mocha/MochaRNAdapter'; import chai from 'chai'; @@ -10,10 +19,37 @@ const chance = new Chance(); // Attempting to open an already open DB results in an error. let db: QuickSQLiteConnection = global.db; +const NUM_READ_CONNECTIONS = 3; + function generateUserInfo() { return { id: chance.integer(), name: chance.name(), age: chance.integer(), networth: chance.floating() }; } +function createTestUser(context: { execute: (sql: string, args?: any[]) => Promise } = db) { + const { id, name, age, networth } = generateUserInfo(); + return context.execute('INSERT INTO User (id, name, age, networth) VALUES(?, ?, ?, ?)', [id, name, age, networth]); +} + +/** + * Creates read locks then queries the User table. + * Returns an array of promises which resolve once each + * read connection contains rows. + */ +const createReaders = (callbacks: Array<() => void>) => + new Array(NUM_READ_CONNECTIONS).fill(null).map(() => { + return db.readLock(async (tx) => { + return new Promise((resolve) => { + // start a read lock for each connection + callbacks.push(async () => { + const result = await tx.execute('SELECT * from User'); + const length = result.rows?.length; + console.log(`Reading Users returned ${length} rows`); + resolve(result.rows?.length); + }); + }); + }); + }); + export function registerBaseTests() { beforeEach(async () => { try { @@ -22,7 +58,7 @@ export function registerBaseTests() { db.delete(); } - global.db = db = open('test'); + global.db = db = open('test', { numReadConnections: NUM_READ_CONNECTIONS }); await db.execute('DROP TABLE IF EXISTS User; '); await db.execute('CREATE TABLE User ( id INT PRIMARY KEY, name TEXT NOT NULL, age INT, networth REAL) STRICT;'); @@ -33,13 +69,7 @@ export function registerBaseTests() { describe('Raw queries', () => { it('Insert', async () => { - const { id, name, age, networth } = generateUserInfo(); - const res = await db.execute('INSERT INTO "User" (id, name, age, networth) VALUES(?, ?, ?, ?)', [ - id, - name, - age, - networth - ]); + const res = await createTestUser(); expect(res.rowsAffected).to.equal(1); expect(res.insertId).to.equal(1); expect(res.metadata).to.eql([]); @@ -464,6 +494,86 @@ export function registerBaseTests() { singleConnection.close(); }); + it('should trigger write transaction commit hooks', async () => { + const commitPromise = new Promise((resolve) => + db.listenerManager.registerListener({ + writeTransaction: (event) => { + if (event.type == TransactionEvent.COMMIT) { + resolve(); + } + } + }) + ); + + const rollbackPromise = new Promise((resolve) => + db.listenerManager.registerListener({ + writeTransaction: (event) => { + if (event.type == TransactionEvent.ROLLBACK) { + resolve(); + } + } + }) + ); + + await db.writeTransaction(async (tx) => tx.rollback()); + await rollbackPromise; + + // Need to actually do something for the commit hook to fire + await db.writeTransaction(async (tx) => { + await createTestUser(tx); + }); + await commitPromise; + }); + + it('should batch table update changes', async () => { + const updatePromise = new Promise((resolve) => db.registerTablesChangedHook(resolve)); + + await db.writeTransaction(async (tx) => { + await createTestUser(tx); + await createTestUser(tx); + }); + + const update = await updatePromise; + + expect(update.rawUpdates.length).to.equal(2); + }); + + it('Should reflect writeTransaction updates on read connections', async () => { + const readTriggerCallbacks = []; + + // Execute the read test whenever a table change ocurred + db.registerTablesChangedHook((update) => readTriggerCallbacks.forEach((cb) => cb())); + + // Test writeTransaction + const readerPromises = createReaders(readTriggerCallbacks); + + await db.writeTransaction(async (tx) => { + return createTestUser(tx); + }); + + let resolved = await Promise.all(readerPromises); + // The query result length for 1 item should be returned for all connections + expect(resolved).to.deep.equal(readerPromises.map(() => 1)); + }); + + it('Should reflect writeLock updates on read connections', async () => { + const readTriggerCallbacks = []; + // Test writeLock + const readerPromises = createReaders(readTriggerCallbacks); + // Execute the read test whenever a table change ocurred + db.registerTablesChangedHook((update) => readTriggerCallbacks.forEach((cb) => cb())); + + await db.writeLock(async (tx) => { + await tx.execute('BEGIN'); + await createTestUser(tx); + await tx.execute('COMMIT'); + }); + + const resolved = await Promise.all(readerPromises); + // The query result length for 1 item should be returned for all connections + expect(resolved).to.deep.equal(readerPromises.map(() => 1)); + }); + it('Should attach DBs', async () => { const singleConnection = open('single_connection', { numReadConnections: 0 }); await singleConnection.execute('DROP TABLE IF EXISTS Places; ');