From 14a85aa38c39529acb9fad40eb577adf5325fd99 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 22 Sep 2025 09:05:03 +0200 Subject: [PATCH 1/8] Start looking at OPFS tab close issue --- .../db/adapters/LockedAsyncDatabaseAdapter.ts | 20 +++++--- .../WorkerWrappedAsyncDatabaseConnection.ts | 50 ++++++++++++++++++- .../worker/sync/SharedSyncImplementation.ts | 20 ++++++-- packages/web/src/worker/sync/WorkerClient.ts | 4 +- 4 files changed, 79 insertions(+), 15 deletions(-) diff --git a/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts b/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts index 25e0afa56..65552b0f1 100644 --- a/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts +++ b/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts @@ -203,6 +203,8 @@ export class LockedAsyncDatabaseAdapter ); } + static dbLockRequestId = 0; + protected async acquireLock(callback: () => Promise, options?: { timeoutMs?: number }): Promise { await this.waitForInitialized(); @@ -221,13 +223,17 @@ export class LockedAsyncDatabaseAdapter }, timeoutMs) : null; - return getNavigatorLocks().request(`db-lock-${this._dbIdentifier}`, { signal: abortController.signal }, () => { - this.pendingAbortControllers.delete(abortController); - if (timoutId) { - clearTimeout(timoutId); - } - return callback(); - }); + const id = LockedAsyncDatabaseAdapter.dbLockRequestId++; + console.trace('Requesting database lock', this._dbIdentifier, id); + return getNavigatorLocks() + .request(`db-lock-${this._dbIdentifier}`, { signal: abortController.signal }, () => { + this.pendingAbortControllers.delete(abortController); + if (timoutId) { + clearTimeout(timoutId); + } + return callback(); + }) + .finally(() => console.log('returning database lock', id)); } async readTransaction(fn: (tx: Transaction) => Promise, options?: DBLockOptions | undefined): Promise { diff --git a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts index 57808e3ce..2eba09188 100644 --- a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts +++ b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts @@ -15,6 +15,7 @@ export type SharedConnectionWorker = { export type WrappedWorkerConnectionOptions = { baseConnection: AsyncDatabaseConnection; identifier: string; + remoteCanCloseUnexpectedly: boolean; /** * Need a remote in order to keep a reference to the Proxied worker */ @@ -29,10 +30,13 @@ export type WrappedWorkerConnectionOptions implements AsyncDatabaseConnection { - protected lockAbortController: AbortController; + protected lockAbortController = new AbortController(); + protected notifyRemoteClosed: AbortController | undefined; constructor(protected options: WrappedWorkerConnectionOptions) { - this.lockAbortController = new AbortController(); + if (options.remoteCanCloseUnexpectedly) { + this.notifyRemoteClosed = new AbortController(); + } } protected get baseConnection() { @@ -43,6 +47,48 @@ export class WorkerWrappedAsyncDatabaseConnection(inner: () => Promise): Promise { + const controller = this.notifyRemoteClosed; + if (controller) { + return new Promise((resolve, reject) => { + if (controller.signal.aborted) { + reject(new Error('Called operation on closed remote')); + } + + function handleAbort() { + reject(new Error('Remote peer closed with request in flight')); + } + + function markResolved(inner: () => void) { + controller!.signal.removeEventListener('abort', handleAbort); + inner(); + } + + controller.signal.addEventListener('abort', handleAbort); + + inner() + .then((data) => markResolved(() => resolve(data))) + .catch((e) => markResolved(() => reject(e))); + }); + } else { + // Can't close, so just return the inner promise unguarded. + return inner(); + } + } + /** * Get a MessagePort which can be used to share the internals of this connection. */ diff --git a/packages/web/src/worker/sync/SharedSyncImplementation.ts b/packages/web/src/worker/sync/SharedSyncImplementation.ts index 7db98c3d0..6df05d512 100644 --- a/packages/web/src/worker/sync/SharedSyncImplementation.ts +++ b/packages/web/src/worker/sync/SharedSyncImplementation.ts @@ -75,6 +75,7 @@ export type WrappedSyncPort = { clientProvider: Comlink.Remote; db?: DBAdapter; currentSubscriptions: SubscribedStream[]; + closeListeners: (() => void)[]; }; /** @@ -274,7 +275,8 @@ export class SharedSyncImplementation extends BaseObserver(port), - currentSubscriptions: [] + currentSubscriptions: [], + closeListeners: [] } satisfies WrappedSyncPort; this.ports.push(portProvider); @@ -331,10 +333,13 @@ export class SharedSyncImplementation extends BaseObserver {}; } + for (const closeListener of trackedPort.closeListeners) { + closeListener(); + } + if (this.dbAdapter && this.dbAdapter == trackedPort.db) { - if (shouldReconnect) { - await this.connectionManager.disconnect(); - } + // Unconditionally close the connection because the database it's writing to has just been closed. + await this.connectionManager.disconnect(); // Clearing the adapter will result in a new one being opened in connect this.dbAdapter = null; @@ -473,11 +478,16 @@ export class SharedSyncImplementation extends BaseObserver { - return new WorkerWrappedAsyncDatabaseConnection({ + const wrapped = new WorkerWrappedAsyncDatabaseConnection({ remote, baseConnection: db, identifier }); + lastClient.closeListeners.push(() => { + wrapped.markRemoteClosed(); + }); + + return wrapped; }, logger: this.logger }); diff --git a/packages/web/src/worker/sync/WorkerClient.ts b/packages/web/src/worker/sync/WorkerClient.ts index 22c44f4ba..1dfffe9b2 100644 --- a/packages/web/src/worker/sync/WorkerClient.ts +++ b/packages/web/src/worker/sync/WorkerClient.ts @@ -41,7 +41,9 @@ export class WorkerClient { private async removePort() { if (this.resolvedPort) { - const release = await this.sync.removePort(this.resolvedPort); + const resolved = this.resolvedPort; + this.resolvedPort = null; + const release = await this.sync.removePort(resolved); this.resolvedPort = null; this.port.postMessage({ event: SharedSyncClientEvent.CLOSE_ACK, From 39b5de260ae2b9c60682b2b97bef758ca81c632e Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 22 Sep 2025 17:23:43 +0200 Subject: [PATCH 2/8] Fix compilation warnings --- packages/web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts | 1 + .../web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts | 2 ++ packages/web/src/worker/sync/SharedSyncImplementation.ts | 6 +++++- 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/packages/web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts b/packages/web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts index 01ffef3b0..1367ed811 100644 --- a/packages/web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts +++ b/packages/web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts @@ -54,6 +54,7 @@ export class WASQLiteDBAdapter extends LockedAsyncDatabaseAdapter { const remote = Comlink.wrap(workerPort); return new WorkerWrappedAsyncDatabaseConnection({ remote, + remoteCanCloseUnexpectedly: false, identifier: options.dbFilename, baseConnection: await remote({ ...options, diff --git a/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts b/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts index 129a3b130..487d121aa 100644 --- a/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts +++ b/packages/web/src/db/adapters/wa-sqlite/WASQLiteOpenFactory.ts @@ -82,6 +82,8 @@ export class WASQLiteOpenFactory extends AbstractWebSQLOpenFactory { return new WorkerWrappedAsyncDatabaseConnection({ remote: workerDBOpener, + // This tab owns the worker, so we're guaranteed to outlive it. + remoteCanCloseUnexpectedly: false, baseConnection: await workerDBOpener({ dbFilename: this.options.dbFilename, vfs, diff --git a/packages/web/src/worker/sync/SharedSyncImplementation.ts b/packages/web/src/worker/sync/SharedSyncImplementation.ts index 6df05d512..81a50e8aa 100644 --- a/packages/web/src/worker/sync/SharedSyncImplementation.ts +++ b/packages/web/src/worker/sync/SharedSyncImplementation.ts @@ -481,9 +481,13 @@ export class SharedSyncImplementation extends BaseObserver { + this.logger.info('Aborting open connection because associated tab closed.'); wrapped.markRemoteClosed(); }); From 112932686cfcc18cd71131e404b2542e3fc8e7e0 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 22 Sep 2025 18:57:09 +0200 Subject: [PATCH 3/8] Use withRemote --- .../adapters/WorkerWrappedAsyncDatabaseConnection.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts index 2eba09188..4b72724a8 100644 --- a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts +++ b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts @@ -149,24 +149,24 @@ export class WorkerWrappedAsyncDatabaseConnection { // Abort any pending lock requests. this.lockAbortController.abort(); - await this.baseConnection.close(); + await this.withRemote(() => this.baseConnection.close()); this.options.remote[Comlink.releaseProxy](); this.options.onClose?.(); } execute(sql: string, params?: any[]): Promise { - return this.baseConnection.execute(sql, params); + return this.withRemote(() => this.baseConnection.execute(sql, params)); } executeRaw(sql: string, params?: any[]): Promise { - return this.baseConnection.executeRaw(sql, params); + return this.withRemote(() => this.baseConnection.executeRaw(sql, params)); } executeBatch(sql: string, params?: any[]): Promise { - return this.baseConnection.executeBatch(sql, params); + return this.withRemote(() => this.baseConnection.executeBatch(sql, params)); } getConfig(): Promise { - return this.baseConnection.getConfig(); + return this.withRemote(() => this.baseConnection.getConfig()); } } From 2e76dc537f403e66a061af894cdb10a8a6992407 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 22 Sep 2025 19:12:27 +0200 Subject: [PATCH 4/8] Cleanup --- .../components/providers/SystemProvider.tsx | 22 ++++++++++++++----- .../db/adapters/LockedAsyncDatabaseAdapter.ts | 17 ++++++-------- .../WorkerWrappedAsyncDatabaseConnection.ts | 2 +- .../worker/sync/SharedSyncImplementation.ts | 5 +---- 4 files changed, 26 insertions(+), 20 deletions(-) diff --git a/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx b/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx index 8a3f3c209..b3521c007 100644 --- a/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx +++ b/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx @@ -3,7 +3,15 @@ import { AppSchema, ListRecord, LISTS_TABLE, TODOS_TABLE } from '@/library/power import { SupabaseConnector } from '@/library/powersync/SupabaseConnector'; import { CircularProgress } from '@mui/material'; import { PowerSyncContext } from '@powersync/react'; -import { createBaseLogger, DifferentialWatchedQuery, LogLevel, PowerSyncDatabase } from '@powersync/web'; +import { + createBaseLogger, + DifferentialWatchedQuery, + LogLevel, + PowerSyncDatabase, + SyncClientImplementation, + WASQLiteOpenFactory, + WASQLiteVFS +} from '@powersync/web'; import React, { Suspense } from 'react'; import { NavigationPanelContextProvider } from '../navigation/NavigationPanelContext'; @@ -12,9 +20,13 @@ export const useSupabase = () => React.useContext(SupabaseContext); export const db = new PowerSyncDatabase({ schema: AppSchema, - database: { - dbFilename: 'example.db' - } + database: new WASQLiteOpenFactory({ + dbFilename: 'example.db', + vfs: WASQLiteVFS.OPFSCoopSyncVFS, + flags: { + enableMultiTabs: typeof SharedWorker !== 'undefined' + } + }) }); export type EnhancedListRecord = ListRecord & { total_tasks: number; completed_tasks: number }; @@ -68,7 +80,7 @@ export const SystemProvider = ({ children }: { children: React.ReactNode }) => { const l = connector.registerListener({ initialized: () => {}, sessionStarted: () => { - powerSync.connect(connector); + powerSync.connect(connector, { clientImplementation: SyncClientImplementation.RUST }); } }); diff --git a/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts b/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts index 65552b0f1..b7b65c9a8 100644 --- a/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts +++ b/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts @@ -224,16 +224,13 @@ export class LockedAsyncDatabaseAdapter : null; const id = LockedAsyncDatabaseAdapter.dbLockRequestId++; - console.trace('Requesting database lock', this._dbIdentifier, id); - return getNavigatorLocks() - .request(`db-lock-${this._dbIdentifier}`, { signal: abortController.signal }, () => { - this.pendingAbortControllers.delete(abortController); - if (timoutId) { - clearTimeout(timoutId); - } - return callback(); - }) - .finally(() => console.log('returning database lock', id)); + return getNavigatorLocks().request(`db-lock-${this._dbIdentifier}`, { signal: abortController.signal }, () => { + this.pendingAbortControllers.delete(abortController); + if (timoutId) { + clearTimeout(timoutId); + } + return callback(); + }); } async readTransaction(fn: (tx: Transaction) => Promise, options?: DBLockOptions | undefined): Promise { diff --git a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts index 4b72724a8..fee41526f 100644 --- a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts +++ b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts @@ -149,9 +149,9 @@ export class WorkerWrappedAsyncDatabaseConnection { // Abort any pending lock requests. this.lockAbortController.abort(); - await this.withRemote(() => this.baseConnection.close()); this.options.remote[Comlink.releaseProxy](); this.options.onClose?.(); + await this.baseConnection.close(); } execute(sql: string, params?: any[]): Promise { diff --git a/packages/web/src/worker/sync/SharedSyncImplementation.ts b/packages/web/src/worker/sync/SharedSyncImplementation.ts index 81a50e8aa..f8923c69f 100644 --- a/packages/web/src/worker/sync/SharedSyncImplementation.ts +++ b/packages/web/src/worker/sync/SharedSyncImplementation.ts @@ -349,10 +349,6 @@ export class SharedSyncImplementation extends BaseObserver { this.logger.info('Aborting open connection because associated tab closed.'); + wrapped.close(); wrapped.markRemoteClosed(); }); From 2418c5c904225f4ff520fdc80d3ca9a8a40b7922 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 22 Sep 2025 19:14:18 +0200 Subject: [PATCH 5/8] Undo demo changes --- .../components/providers/SystemProvider.tsx | 22 +++++-------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx b/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx index b3521c007..8a3f3c209 100644 --- a/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx +++ b/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx @@ -3,15 +3,7 @@ import { AppSchema, ListRecord, LISTS_TABLE, TODOS_TABLE } from '@/library/power import { SupabaseConnector } from '@/library/powersync/SupabaseConnector'; import { CircularProgress } from '@mui/material'; import { PowerSyncContext } from '@powersync/react'; -import { - createBaseLogger, - DifferentialWatchedQuery, - LogLevel, - PowerSyncDatabase, - SyncClientImplementation, - WASQLiteOpenFactory, - WASQLiteVFS -} from '@powersync/web'; +import { createBaseLogger, DifferentialWatchedQuery, LogLevel, PowerSyncDatabase } from '@powersync/web'; import React, { Suspense } from 'react'; import { NavigationPanelContextProvider } from '../navigation/NavigationPanelContext'; @@ -20,13 +12,9 @@ export const useSupabase = () => React.useContext(SupabaseContext); export const db = new PowerSyncDatabase({ schema: AppSchema, - database: new WASQLiteOpenFactory({ - dbFilename: 'example.db', - vfs: WASQLiteVFS.OPFSCoopSyncVFS, - flags: { - enableMultiTabs: typeof SharedWorker !== 'undefined' - } - }) + database: { + dbFilename: 'example.db' + } }); export type EnhancedListRecord = ListRecord & { total_tasks: number; completed_tasks: number }; @@ -80,7 +68,7 @@ export const SystemProvider = ({ children }: { children: React.ReactNode }) => { const l = connector.registerListener({ initialized: () => {}, sessionStarted: () => { - powerSync.connect(connector, { clientImplementation: SyncClientImplementation.RUST }); + powerSync.connect(connector); } }); From a45f72aa185f708c2fb79c45725861317a5d2dfc Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 22 Sep 2025 19:31:01 +0200 Subject: [PATCH 6/8] Fix tests --- .../web/src/db/adapters/LockedAsyncDatabaseAdapter.ts | 3 --- .../db/adapters/WorkerWrappedAsyncDatabaseConnection.ts | 9 ++++++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts b/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts index b7b65c9a8..25e0afa56 100644 --- a/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts +++ b/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts @@ -203,8 +203,6 @@ export class LockedAsyncDatabaseAdapter ); } - static dbLockRequestId = 0; - protected async acquireLock(callback: () => Promise, options?: { timeoutMs?: number }): Promise { await this.waitForInitialized(); @@ -223,7 +221,6 @@ export class LockedAsyncDatabaseAdapter }, timeoutMs) : null; - const id = LockedAsyncDatabaseAdapter.dbLockRequestId++; return getNavigatorLocks().request(`db-lock-${this._dbIdentifier}`, { signal: abortController.signal }, () => { this.pendingAbortControllers.delete(abortController); if (timoutId) { diff --git a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts index fee41526f..abdd8fd76 100644 --- a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts +++ b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts @@ -149,9 +149,12 @@ export class WorkerWrappedAsyncDatabaseConnection { // Abort any pending lock requests. this.lockAbortController.abort(); - this.options.remote[Comlink.releaseProxy](); - this.options.onClose?.(); - await this.baseConnection.close(); + try { + await this.withRemote(() => this.baseConnection.close()); + } finally { + this.options.remote[Comlink.releaseProxy](); + this.options.onClose?.(); + } } execute(sql: string, params?: any[]): Promise { From 58e9a0c1ff2f7875d94b3514789894fcd93a9558 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 22 Sep 2025 19:32:19 +0200 Subject: [PATCH 7/8] Add changeset --- .changeset/funny-baboons-wonder.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/funny-baboons-wonder.md diff --git a/.changeset/funny-baboons-wonder.md b/.changeset/funny-baboons-wonder.md new file mode 100644 index 000000000..91e7b9f89 --- /dev/null +++ b/.changeset/funny-baboons-wonder.md @@ -0,0 +1,5 @@ +--- +'@powersync/web': patch +--- + +Fix issues with multiple tabs when the Rust client and OPFS is used. From 8efff94996818d1dad4df66b395ee11e27c8cc8f Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 23 Sep 2025 17:34:38 +0200 Subject: [PATCH 8/8] Better names --- .../WorkerWrappedAsyncDatabaseConnection.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts index abdd8fd76..5c4c6902e 100644 --- a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts +++ b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts @@ -60,7 +60,7 @@ export class WorkerWrappedAsyncDatabaseConnection(inner: () => Promise): Promise { + private withRemote(workerPromise: () => Promise): Promise { const controller = this.notifyRemoteClosed; if (controller) { return new Promise((resolve, reject) => { @@ -72,20 +72,20 @@ export class WorkerWrappedAsyncDatabaseConnection void) { + function completePromise(action: () => void) { controller!.signal.removeEventListener('abort', handleAbort); - inner(); + action(); } controller.signal.addEventListener('abort', handleAbort); - inner() - .then((data) => markResolved(() => resolve(data))) - .catch((e) => markResolved(() => reject(e))); + workerPromise() + .then((data) => completePromise(() => resolve(data))) + .catch((e) => completePromise(() => reject(e))); }); } else { - // Can't close, so just return the inner promise unguarded. - return inner(); + // Can't close, so just return the inner worker promise unguarded. + return workerPromise(); } }