From 27509b5322b1b8f600aec0763eaa8c5126e47945 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 30 Oct 2025 17:07:55 +0100 Subject: [PATCH 1/2] Update sqlite3_web, simplify --- .../sqlite_async/lib/src/web/database.dart | 220 ++++++++---------- .../sqlite_async/lib/src/web/protocol.dart | 6 +- .../lib/src/web/worker/worker_utils.dart | 44 +--- packages/sqlite_async/pubspec.yaml | 4 +- 4 files changed, 104 insertions(+), 170 deletions(-) diff --git a/packages/sqlite_async/lib/src/web/database.dart b/packages/sqlite_async/lib/src/web/database.dart index f2dc998..23424fc 100644 --- a/packages/sqlite_async/lib/src/web/database.dart +++ b/packages/sqlite_async/lib/src/web/database.dart @@ -1,11 +1,9 @@ import 'dart:async'; import 'dart:developer'; import 'dart:js_interop'; -import 'dart:js_interop_unsafe'; import 'package:sqlite3/common.dart'; import 'package:sqlite3_web/sqlite3_web.dart'; -import 'package:sqlite3_web/protocol_utils.dart' as proto; import 'package:sqlite_async/sqlite_async.dart'; import 'package:sqlite_async/src/utils/profiler.dart'; import 'package:sqlite_async/src/web/database/broadcast_updates.dart'; @@ -97,24 +95,17 @@ class WebDatabase @override Future readLock(Future Function(SqliteReadContext tx) callback, {Duration? lockTimeout, String? debugContext}) async { - if (_mutex case var mutex?) { - return await mutex.lock(timeout: lockTimeout, () { - return ScopedReadContext.assumeReadLock( - _UnscopedContext(this), callback); - }); - } else { - // No custom mutex, coordinate locks through shared worker. - await _database.customRequest( - CustomDatabaseMessage(CustomDatabaseMessageKind.requestSharedLock)); - - try { - return await ScopedReadContext.assumeReadLock( - _UnscopedContext(this), callback); - } finally { - await _database.customRequest( - CustomDatabaseMessage(CustomDatabaseMessageKind.releaseLock)); - } - } + // Since there is only a single physical connection per database on the web, + // we can't enable concurrent readers to a writer. Even supporting multiple + // readers alone isn't safe, since these readers could start read + // transactions where we need to block other tabs from sending `BEGIN` and + // `COMMIT` statements if they were to start their own transactions. + return _lockInternal( + (unscoped) => ScopedReadContext.assumeReadLock(unscoped, callback), + lockTimeout: lockTimeout, + debugContext: debugContext, + flush: false, + ); } @override @@ -122,47 +113,67 @@ class WebDatabase Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, bool? flush}) { - return writeLock((writeContext) { - return ScopedWriteContext.assumeWriteLock( - _UnscopedContext(this), - (ctx) async { - return await ctx.writeTransaction(callback); - }, - ); - }, - debugContext: 'writeTransaction()', - lockTimeout: lockTimeout, - flush: flush); + return _lockInternal( + (context) { + return ScopedWriteContext.assumeWriteLock( + context, + (ctx) async { + return await ctx.writeTransaction(callback); + }, + ); + }, + flush: flush ?? true, + lockTimeout: lockTimeout, + ); } @override Future writeLock(Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext, bool? flush}) async { + return await _lockInternal( + (unscoped) { + return ScopedWriteContext.assumeWriteLock(unscoped, callback); + }, + flush: flush ?? true, + debugContext: debugContext, + lockTimeout: lockTimeout, + ); + } + + Future _lockInternal( + Future Function(_UnscopedContext) callback, { + required bool flush, + Duration? lockTimeout, + String? debugContext, + }) async { if (_mutex case var mutex?) { return await mutex.lock(timeout: lockTimeout, () async { - final context = _UnscopedContext(this); + final context = _UnscopedContext(this, null); try { - return await ScopedWriteContext.assumeWriteLock(context, callback); + return await callback(context); } finally { - if (flush != false) { + if (flush) { await this.flush(); } } }); } else { - // No custom mutex, coordinate locks through shared worker. - await _database.customRequest(CustomDatabaseMessage( - CustomDatabaseMessageKind.requestExclusiveLock)); - final context = _UnscopedContext(this); - try { - return await ScopedWriteContext.assumeWriteLock(context, callback); - } finally { - if (flush != false) { - await this.flush(); + final abortTrigger = switch (lockTimeout) { + null => null, + final duration => Future.delayed(duration), + }; + + return await _database.requestLock(abortTrigger: abortTrigger, + (token) async { + final context = _UnscopedContext(this, token); + try { + return await callback(context); + } finally { + if (flush) { + await this.flush(); + } } - await _database.customRequest( - CustomDatabaseMessage(CustomDatabaseMessageKind.releaseLock)); - } + }); } } @@ -184,9 +195,20 @@ class WebDatabase final class _UnscopedContext extends UnscopedContext { final WebDatabase _database; + /// If this context is scoped to a lock on the database, the [LockToken] from + /// `package:sqlite3_web`. + /// + /// This token needs to be passed to queries to run them. While a lock token + /// exists on the database, all queries not passing that token are blocked. + final LockToken? _lock; + final TimelineTask? _task; - _UnscopedContext(this._database) + /// Whether statements should be rejected if the database is not in an + /// autocommit state. + bool _checkInTransaction = false; + + _UnscopedContext(this._database, this._lock) : _task = _database.profileQueries ? TimelineTask() : null; @override @@ -213,8 +235,15 @@ final class _UnscopedContext extends UnscopedContext { sql: sql, parameters: parameters, () async { - return await wrapSqliteException( - () => _database._database.select(sql, parameters)); + return await wrapSqliteException(() async { + final result = await _database._database.select( + sql, + parameters: parameters, + token: _lock, + checkInTransaction: _checkInTransaction, + ); + return result.result; + }); }, ); } @@ -234,8 +263,15 @@ final class _UnscopedContext extends UnscopedContext { @override Future execute(String sql, [List parameters = const []]) { return _task.timeAsync('execute', sql: sql, parameters: parameters, () { - return wrapSqliteException( - () => _database._database.select(sql, parameters)); + return wrapSqliteException(() async { + final result = await _database._database.select( + sql, + parameters: parameters, + token: _lock, + checkInTransaction: _checkInTransaction, + ); + return result.result; + }); }); } @@ -246,7 +282,12 @@ final class _UnscopedContext extends UnscopedContext { for (final set in parameterSets) { // use execute instead of select to avoid transferring rows from the // worker to this context. - await _database._database.execute(sql, set); + await _database._database.execute( + sql, + parameters: set, + token: _lock, + checkInTransaction: _checkInTransaction, + ); } }); }); @@ -256,75 +297,7 @@ final class _UnscopedContext extends UnscopedContext { UnscopedContext interceptOutermostTransaction() { // All execute calls done in the callback will be checked for the // autocommit state - return _ExclusiveTransactionContext(_database); - } -} - -final class _ExclusiveTransactionContext extends _UnscopedContext { - _ExclusiveTransactionContext(super._database); - - Future _executeInternal( - String sql, List parameters) async { - // Operations inside transactions are executed with custom requests - // in order to verify that the connection does not have autocommit enabled. - // The worker will check if autocommit = true before executing the SQL. - // An exception will be thrown if autocommit is enabled. - // The custom request which does the above will return the ResultSet as a formatted - // JavaScript object. This is the converted into a Dart ResultSet. - return await wrapSqliteException(() async { - var res = await _database._database.customRequest(CustomDatabaseMessage( - CustomDatabaseMessageKind.executeInTransaction, sql, parameters)) - as JSObject; - - if (res.has('format') && (res['format'] as JSNumber).toDartInt == 2) { - // Newer workers use a serialization format more efficient than dartify(). - return proto.deserializeResultSet(res['r'] as JSObject); - } - - var result = Map.from(res.dartify() as Map); - final columnNames = [ - for (final entry in result['columnNames']) entry as String - ]; - final rawTableNames = result['tableNames']; - final tableNames = rawTableNames != null - ? [ - for (final entry in (rawTableNames as List)) - entry as String - ] - : null; - - final rows = >[]; - for (final row in (result['rows'] as List)) { - final dartRow = []; - - for (final column in (row as List)) { - dartRow.add(column); - } - - rows.add(dartRow); - } - final resultSet = ResultSet(columnNames, tableNames, rows); - return resultSet; - }); - } - - @override - Future execute(String sql, - [List parameters = const []]) async { - return _task.timeAsync('execute', sql: sql, parameters: parameters, () { - return _executeInternal(sql, parameters); - }); - } - - @override - Future executeBatch( - String sql, List> parameterSets) async { - return _task.timeAsync('executeBatch', sql: sql, () async { - for (final set in parameterSets) { - await _database._database.customRequest(CustomDatabaseMessage( - CustomDatabaseMessageKind.executeBatchInTransaction, sql, set)); - } - }); + return _UnscopedContext(_database, _lock).._checkInTransaction = true; } } @@ -337,6 +310,11 @@ Future wrapSqliteException(Future Function() callback) async { throw serializedCause; } + if (ex.message.contains('Database is not in a transaction')) { + throw SqliteException( + 0, "Transaction rolled back by earlier statement. Cannot execute."); + } + // Older versions of package:sqlite_web reported SqliteExceptions as strings // only. if (ex.toString().contains('SqliteException')) { diff --git a/packages/sqlite_async/lib/src/web/protocol.dart b/packages/sqlite_async/lib/src/web/protocol.dart index d17c06b..9fcbb57 100644 --- a/packages/sqlite_async/lib/src/web/protocol.dart +++ b/packages/sqlite_async/lib/src/web/protocol.dart @@ -6,12 +6,8 @@ import 'dart:js_interop'; import 'package:sqlite3_web/protocol_utils.dart' as proto; enum CustomDatabaseMessageKind { - requestSharedLock, - requestExclusiveLock, - releaseLock, - lockObtained, + ok, getAutoCommit, - executeInTransaction, executeBatchInTransaction, updateSubscriptionManagement, notifyUpdates, diff --git a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart index 059c281..164c8ff 100644 --- a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart +++ b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart @@ -1,6 +1,5 @@ import 'dart:async'; import 'dart:js_interop'; -import 'dart:js_interop_unsafe'; import 'package:meta/meta.dart'; import 'package:mutex/mutex.dart'; @@ -62,12 +61,6 @@ class AsyncSqliteDatabase extends WorkerDatabase { return _state.putIfAbsent(connection, _ConnectionState.new); } - void _markHoldsMutex(ClientConnection connection) { - final state = _findState(connection); - state.holdsMutex = true; - _registerCloseListener(state, connection); - } - void _registerCloseListener( _ConnectionState state, ClientConnection connection) { if (!state.hasOnCloseListener) { @@ -87,44 +80,11 @@ class AsyncSqliteDatabase extends WorkerDatabase { final message = request as CustomDatabaseMessage; switch (message.kind) { - case CustomDatabaseMessageKind.requestSharedLock: - await mutex.acquireRead(); - _markHoldsMutex(connection); - case CustomDatabaseMessageKind.requestExclusiveLock: - await mutex.acquireWrite(); - _markHoldsMutex(connection); - case CustomDatabaseMessageKind.releaseLock: - _findState(connection).holdsMutex = false; - mutex.release(); - case CustomDatabaseMessageKind.lockObtained: + case CustomDatabaseMessageKind.ok: case CustomDatabaseMessageKind.notifyUpdates: throw UnsupportedError('This is a response, not a request'); case CustomDatabaseMessageKind.getAutoCommit: return database.autocommit.toJS; - case CustomDatabaseMessageKind.executeInTransaction: - final sql = message.rawSql.toDart; - final hasTypeInfo = message.typeInfo.isDefinedAndNotNull; - final parameters = proto.deserializeParameters( - message.rawParameters, message.typeInfo); - if (database.autocommit) { - throw SqliteException(0, - "Transaction rolled back by earlier statement. Cannot execute: $sql"); - } - - var res = database.select(sql, parameters); - if (hasTypeInfo) { - // If the client is sending a request that has parameters with type - // information, it will also support a newer serialization format for - // result sets. - return JSObject() - ..['format'] = 2.toJS - ..['r'] = proto.serializeResultSet(res); - } else { - var dartMap = resultSetToMap(res); - var jsObject = dartMap.jsify(); - return jsObject; - } - case CustomDatabaseMessageKind.executeBatchInTransaction: final sql = message.rawSql.toDart; final parameters = proto.deserializeParameters( @@ -157,7 +117,7 @@ class AsyncSqliteDatabase extends WorkerDatabase { } } - return CustomDatabaseMessage(CustomDatabaseMessageKind.lockObtained); + return CustomDatabaseMessage(CustomDatabaseMessageKind.ok); } Map resultSetToMap(ResultSet resultSet) { diff --git a/packages/sqlite_async/pubspec.yaml b/packages/sqlite_async/pubspec.yaml index a5589d8..3be8dd9 100644 --- a/packages/sqlite_async/pubspec.yaml +++ b/packages/sqlite_async/pubspec.yaml @@ -12,8 +12,8 @@ topics: - flutter dependencies: - sqlite3: ^2.9.0 - sqlite3_web: ^0.3.2 + sqlite3: ^2.9.4 + sqlite3_web: ^0.4.0 async: ^2.10.0 collection: ^1.17.0 mutex: ^3.1.0 From 72ccf55c3fa1965f61386d277f3c04c0b123e365 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 30 Oct 2025 17:23:12 +0100 Subject: [PATCH 2/2] Skip second mutex for OPFS implementations --- .../lib/src/web/web_sqlite_open_factory.dart | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart b/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart index 513b1f2..29248d5 100644 --- a/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart +++ b/packages/sqlite_async/lib/src/web/web_sqlite_open_factory.dart @@ -63,10 +63,19 @@ class DefaultSqliteOpenFactory final workers = await _initialized; final connection = await connectToWorker(workers, path); - // When the database is accessed through a shared worker, we implement - // mutexes over custom messages sent through the shared worker. In other - // cases, we need to implement a mutex locally. - final mutex = connection.access == AccessMode.throughSharedWorker + // When the database is hosted in a shared worker, we don't need a local + // mutex since that worker will hand out leases for us. + // Additionally, package:sqlite3_web uses navigator locks internally for + // OPFS databases. + // Technically, the only other implementation (IndexedDB in a local context + // or a dedicated worker) is inherently unsafe to use across tabs. But + // wrapping those in a mutex and flushing the file system helps a little bit + // (still something we're trying to avoid). + final hasSqliteWebMutex = + connection.access == AccessMode.throughSharedWorker || + connection.storage == StorageMode.opfs; + + final mutex = hasSqliteWebMutex ? null : MutexImpl(identifier: path); // Use the DB path as a mutex identifier