Skip to content

Commit

Permalink
Fix connection pool causing unhandled exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
simolus3 committed Feb 27, 2023
1 parent 46390a2 commit b137b40
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 27 deletions.
44 changes: 17 additions & 27 deletions drift/lib/src/runtime/executor/connection_pool.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,13 @@ abstract class MultiExecutor extends QueryExecutor {
MultiExecutor._();
}

class _ExecutorCompleter {
_ExecutorCompleter(this.statement, this.args)
: _completer = Completer<List<Map<String, Object?>>>();
class _PendingSelect {
_PendingSelect(this.statement, this.args)
: completer = Completer<List<Map<String, Object?>>>();

final String statement;
final List<Object?> args;
final Completer<List<Map<String, Object?>>> _completer;

Future<List<Map<String, Object?>>> get future => _completer.future;

void complete([FutureOr<List<Map<String, Object?>>>? value]) {
_completer.complete(value);
}

void completeError(Object error, [StackTrace? stackTrace]) {
_completer.completeError(error, stackTrace);
}
final Completer<List<Map<String, Object?>>> completer;
}

class _QueryExecutorPool {
Expand All @@ -54,8 +44,8 @@ class _QueryExecutorPool {
final List<QueryExecutor> _executors;
final List<QueryExecutor> _idleExecutors;

final List<_ExecutorCompleter> _queue = [];
final List<_ExecutorCompleter> _running = [];
final List<_PendingSelect> _queue = [];
final List<_PendingSelect> _running = [];

Future<bool> ensureOpen(QueryExecutorUser user) async {
final result = await Future.wait(
Expand All @@ -72,10 +62,10 @@ class _QueryExecutorPool {
return _executors.single.runSelect(statement, args);
}

final executorCompleter = _ExecutorCompleter(statement, args);
final executorCompleter = _PendingSelect(statement, args);
_queue.add(executorCompleter);
_run();
return executorCompleter.future;
return executorCompleter.completer.future;
}

void _run() {
Expand All @@ -87,15 +77,15 @@ class _QueryExecutorPool {

_running.add(completer);

completer.future.whenComplete(() {
_running.remove(completer);
_idleExecutors.add(executor);
_run();
});

executor
.runSelect(completer.statement, completer.args)
.then(completer.complete, onError: completer.completeError);
completer.completer.complete(Future.sync(() async {
try {
return await executor.runSelect(completer.statement, completer.args);
} finally {
_running.remove(completer);
_idleExecutors.add(executor);
_run();
}
}));
}
}

Expand Down
13 changes: 13 additions & 0 deletions drift/test/engines/connection_pool_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,17 @@ void main() {
verify(write.transactions.ensureOpen(any));
verify(write.transactions.runSelect('select', []));
});

test('select failure does not cause an unhandled exception', () async {
// https://github.com/simolus3/drift/issues/2323
final read2 = MockExecutor();
final multi =
MultiExecutor.withReadPool(reads: [read2, read], write: write);

when(read2.runSelect(any, any)).thenThrow('bang!');

await multi.ensureOpen(db);

expect(multi.runSelect('select 1', []), throwsA(isA<String>()));
});
}

0 comments on commit b137b40

Please sign in to comment.