Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement transactions in v3 API #98

Merged
merged 3 commits into from
Jul 22, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 13 additions & 1 deletion lib/postgres_v3_experimental.dart
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,20 @@ abstract class PgSession {
}

abstract class PgSessionExecutor {
// TODO: also add retry options similarly to postgres_pool
/// Obtains a [PgSession] capable of running statements and calls [fn] with
/// it.
///
/// Returns the result (either the value or an error) of invoking [fn]. No
/// updates will be reverted in the event of an error.
Future<R> run<R>(Future<R> Function(PgSession session) fn);

/// Obtains a [PgSession] running in a transaction and calls [fn] with it.
///
/// Returns the result of invoking [fn] (either the value or an error). In
/// case of [fn] throwing, the transaction will be reverted.
///
/// Note that other invocations on a [PgConnection] are blocked while a
/// transaction is active.
Future<R> runTx<R>(Future<R> Function(PgSession session) fn);
}

Expand Down
250 changes: 153 additions & 97 deletions lib/src/v3/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,108 @@ class _ResolvedSettings {
}
}

class PgConnectionImplementation implements PgConnection {
abstract class _PgSessionBase implements PgSession {
/// The lock to guard operations that must run sequentially, like sending
/// RPC messages to the postgres server and waiting for them to complete.
///
/// Each session base has its own operation lock, but child sessions hold the
/// parent lock while they are active. For instance, when starting a
/// transaction,the [_operationLock] of the connection is held until the
/// transaction completes. This ensures that no other statement can use the
/// connection in the meantime.
final Pool _operationLock = Pool(1);

PgConnectionImplementation get _connection;

/// Sends a message to the server and waits for a response [T], gracefully
/// handling error messages that might come in instead.
Future<T> _sendAndWaitForQuery<T extends ServerMessage>(ClientMessage send) {
final trace = StackTrace.current;

return _operationLock.withResource(() {
_connection._channel.sink
.add(AggregatedClientMessage([send, const SyncMessage()]));

final completer = Completer<T>();
final syncComplete = Completer<void>.sync();

_connection._pending = _CallbackOperation(_connection, (message) async {
if (message is T) {
completer.complete(message);
} else if (message is ErrorResponseMessage) {
completer.completeError(
PostgreSQLException.fromFields(message.fields), trace);
} else if (message is ReadyForQueryMessage) {
if (!completer.isCompleted) {
completer.completeError(
StateError('Operation did not complete'), trace);
}

syncComplete.complete();
} else {
completer.completeError(
StateError('Unexpected message $message'), trace);
}
});

return syncComplete.future
.whenComplete(() => _connection._pending = null)
.then((value) => completer.future);
});
}

@override
Future<PgResult> execute(Object query,
{Object? parameters, Duration? timeout}) async {
final description = InternalQueryDescription.wrap(query);
final variables = description.bindParameters(parameters);

if (variables.isNotEmpty) {
// The simple query protocol does not support variables, so we have to
// prepare a statement explicitly.
final prepared = await prepare(description, timeout: timeout);
try {
return await prepared.run(variables, timeout: timeout);
} finally {
await prepared.dispose();
}
} else {
// Great, we can just run a simple query.
final controller = StreamController<PgResultRow>();
final items = <PgResultRow>[];

final querySubscription = _PgResultStreamSubscription.simpleQuery(
description.transformedSql,
this,
controller,
controller.stream.listen(items.add),
);
await querySubscription.asFuture();
await querySubscription.cancel();

return PgResult(items, await querySubscription.affectedRows,
await querySubscription.schema);
}
}

@override
Future<PgStatement> prepare(Object query, {Duration? timeout}) async {
final conn = _connection;
final name = 's/${conn._statementCounter++}';
final description = InternalQueryDescription.wrap(query);

await _sendAndWaitForQuery<ParseCompleteMessage>(ParseMessage(
description.transformedSql,
statementName: name,
types: description.parameterTypes,
));

return _PreparedStatement(description, name, this);
}
}

class PgConnectionImplementation extends _PgSessionBase
implements PgConnection {
static Future<PgConnectionImplementation> connect(
PgEndpoint endpoint, {
PgSessionSettings? sessionSettings,
Expand Down Expand Up @@ -173,7 +274,6 @@ class PgConnectionImplementation implements PgConnection {

final _ResolvedSettings _settings;

final Pool _operationLock = Pool(1);
_PendingOperation? _pending;

final Map<String, String> _parameters = {};
Expand All @@ -186,6 +286,9 @@ class PgConnectionImplementation implements PgConnection {
@override
PgChannels get channels => _channels;

@override
PgConnectionImplementation get _connection => this;

PgConnectionImplementation._(this._channel, this._settings) {
_serverMessages = _channel.stream.listen(_handleMessage);
}
Expand Down Expand Up @@ -224,98 +327,30 @@ class PgConnectionImplementation implements PgConnection {
}
}

Future<T> _sendAndWaitForQuery<T extends ServerMessage>(ClientMessage send) {
final trace = StackTrace.current;

return _operationLock.withResource(() {
_channel.sink.add(AggregatedClientMessage([send, const SyncMessage()]));

final completer = Completer<T>();
final syncComplete = Completer<void>.sync();

_pending = _CallbackOperation(this, (message) async {
if (message is T) {
completer.complete(message);
} else if (message is ErrorResponseMessage) {
completer.completeError(
PostgreSQLException.fromFields(message.fields), trace);
} else if (message is ReadyForQueryMessage) {
if (!completer.isCompleted) {
completer.completeError(
StateError('Operation did not complete'), trace);
}

syncComplete.complete();
} else {
completer.completeError(
StateError('Unexpected message $message'), trace);
}
});

return syncComplete.future
.whenComplete(() => _pending = null)
.then((value) => completer.future);
});
}

@override
Future<PgStatement> prepare(Object query, {Duration? timeout}) async {
final name = 's/${_statementCounter++}';
final description = InternalQueryDescription.wrap(query);

await _sendAndWaitForQuery<ParseCompleteMessage>(ParseMessage(
description.transformedSql,
statementName: name,
types: description.parameterTypes,
));

return _PreparedStatement(description, name, this);
Future<R> run<R>(Future<R> Function(PgSession session) fn) {
return Future.sync(() => fn(this));
simolus3 marked this conversation as resolved.
Show resolved Hide resolved
}

@override
Future<PgResult> execute(Object query,
{Object? parameters, Duration? timeout}) async {
final description = InternalQueryDescription.wrap(query);
final variables = description.bindParameters(parameters);
Future<R> runTx<R>(Future<R> Function(PgSession session) fn) {
// Keep this database is locked while the transaction is active.
return _operationLock.withResource(() async {
// The transaction has its own _operationLock, which means that it (and
// only it) can be used to run statements while it's active.
final transaction = _TransactionSession(this);
await transaction.execute('BEGIN;');

if (variables.isNotEmpty) {
// The simple query protocol does not support variables, so we have to
// prepare a statement explicitly.
final prepared = await prepare(description, timeout: timeout);
try {
return await prepared.run(variables, timeout: timeout);
} finally {
await prepared.dispose();
}
} else {
// Great, we can just run a simple query.
final controller = StreamController<PgResultRow>();
final items = <PgResultRow>[];

final querySubscription = _PgResultStreamSubscription.simpleQuery(
description.transformedSql,
this,
controller,
controller.stream.listen(items.add),
);
await querySubscription.asFuture();
await querySubscription.cancel();

return PgResult(items, await querySubscription.affectedRows,
await querySubscription.schema);
}
}
final result = await fn(transaction);
await transaction.execute('COMMIT;');

@override
Future<R> run<R>(Future<R> Function(PgSession session) fn) {
// TODO: implement run
throw UnimplementedError();
}

@override
Future<R> runTx<R>(Future<R> Function(PgSession session) fn) {
// TODO: implement runTx
throw UnimplementedError();
return result;
} catch (e) {
await transaction.execute('ROLLBACK;');
rethrow;
}
});
}

@override
Expand All @@ -332,9 +367,9 @@ class PgConnectionImplementation implements PgConnection {
class _PreparedStatement extends PgStatement {
final InternalQueryDescription _description;
final String _name;
final PgConnectionImplementation _connection;
final _PgSessionBase _session;

_PreparedStatement(this._description, this._name, this._connection);
_PreparedStatement(this._description, this._name, this._session);

@override
PgResultStream bind(Object? parameters) {
Expand All @@ -343,7 +378,7 @@ class _PreparedStatement extends PgStatement {

@override
Future<void> dispose() async {
await _connection._sendAndWaitForQuery<CloseCompleteMessage>(
await _session._sendAndWaitForQuery<CloseCompleteMessage>(
CloseMessage.statement(_name));
}
}
Expand All @@ -369,7 +404,7 @@ class _BoundStatement extends Stream<PgResultRow> implements PgResultStream {
class _PgResultStreamSubscription
implements PgResultStreamSubscription, _PendingOperation {
@override
final PgConnectionImplementation connection;
final _PgSessionBase session;
final StreamController<PgResultRow> _controller;
final StreamSubscription<PgResultRow> _source;

Expand All @@ -378,12 +413,15 @@ class _PgResultStreamSubscription
final Completer<void> _done = Completer();
PgResultSchema? _resultSchema;

@override
PgConnectionImplementation get connection => session._connection;

late final _portalName = 'p/${connection._portalCounter++}';

_PgResultStreamSubscription(
_BoundStatement statement, this._controller, this._source)
: connection = statement.statement._connection {
connection._operationLock.withResource(() async {
: session = statement.statement._session {
session._operationLock.withResource(() async {
connection._pending = this;

connection._channel.sink.add(AggregatedClientMessage([
Expand All @@ -405,8 +443,8 @@ class _PgResultStreamSubscription
}

_PgResultStreamSubscription.simpleQuery(
String sql, this.connection, this._controller, this._source) {
connection._operationLock.withResource(() async {
String sql, this.session, this._controller, this._source) {
session._operationLock.withResource(() async {
connection._pending = this;

connection._channel.sink.add(QueryMessage(sql));
Expand Down Expand Up @@ -597,10 +635,28 @@ class _Channels implements PgChannels {
}
}

class _TransactionSession extends _PgSessionBase {
@override
final PgConnectionImplementation _connection;

_TransactionSession(this._connection);

@override
Future<void> close() async {
throw UnsupportedError(
'Transactions cannot be closed explicitly. Instead, return from the '
'`runTx` callback with a value to complete it or throw an exception to '
'revert the transaction.',
);
}
}

abstract class _PendingOperation {
final PgConnectionImplementation connection;
final _PgSessionBase session;

PgConnectionImplementation get connection => session._connection;

_PendingOperation(this.connection);
_PendingOperation(this.session);

Future<void> handleMessage(ServerMessage message);
}
Expand Down