Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement multiplexing #4839

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 19 additions & 29 deletions src/Npgsql/Internal/NpgsqlConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -334,17 +334,20 @@ internal bool PostgresCancellationPerformed

#region Constructors

internal NpgsqlConnector(NpgsqlDataSource dataSource, NpgsqlConnection conn)
internal NpgsqlConnector(NpgsqlDataSource dataSource, NpgsqlConnection? conn)
: this(dataSource)
{
if (conn.ProvideClientCertificatesCallback is not null)
ClientCertificatesCallback = certs => conn.ProvideClientCertificatesCallback(certs);
if (conn.UserCertificateValidationCallback is not null)
UserCertificateValidationCallback = conn.UserCertificateValidationCallback;
if (conn is not null)
{
if (conn.ProvideClientCertificatesCallback is not null)
ClientCertificatesCallback = certs => conn.ProvideClientCertificatesCallback(certs);
if (conn.UserCertificateValidationCallback is not null)
UserCertificateValidationCallback = conn.UserCertificateValidationCallback;

#pragma warning disable CS0618 // Obsolete
ProvidePasswordCallback = conn.ProvidePasswordCallback;
ProvidePasswordCallback = conn.ProvidePasswordCallback;
#pragma warning restore CS0618
}
}

NpgsqlConnector(NpgsqlConnector connector)
Expand All @@ -355,7 +358,7 @@ internal NpgsqlConnector(NpgsqlDataSource dataSource, NpgsqlConnection conn)
ProvidePasswordCallback = connector.ProvidePasswordCallback;
}

NpgsqlConnector(NpgsqlDataSource dataSource)
internal NpgsqlConnector(NpgsqlDataSource dataSource)
{
Debug.Assert(dataSource.OwnsConnectors);

Expand Down Expand Up @@ -396,7 +399,7 @@ internal NpgsqlConnector(NpgsqlDataSource dataSource, NpgsqlConnection conn)
// Note: the in-flight channel can probably be single-writer, but that doesn't actually do anything
// at this point. And we currently rely on being able to complete the channel at any point (from
// Break). We may want to revisit this if an optimized, SingleWriter implementation is introduced.
var commandsInFlightChannel = Channel.CreateUnbounded<NpgsqlCommand>(
var commandsInFlightChannel = Channel.CreateUnbounded<MultiplexingNpgsqlCommand>(
new UnboundedChannelOptions { SingleReader = true });
CommandsInFlightReader = commandsInFlightChannel.Reader;
CommandsInFlightWriter = commandsInFlightChannel.Writer;
Expand Down Expand Up @@ -1101,8 +1104,8 @@ void SetSocketOptions(Socket socket)

#region I/O

readonly ChannelReader<NpgsqlCommand>? CommandsInFlightReader;
internal readonly ChannelWriter<NpgsqlCommand>? CommandsInFlightWriter;
readonly ChannelReader<MultiplexingNpgsqlCommand>? CommandsInFlightReader;
internal readonly ChannelWriter<MultiplexingNpgsqlCommand>? CommandsInFlightWriter;

internal volatile int CommandsInFlightCount;

Expand All @@ -1114,7 +1117,7 @@ async Task MultiplexingReadLoop()
Debug.Assert(Settings.Multiplexing);
Debug.Assert(CommandsInFlightReader != null);

NpgsqlCommand? command = null;
MultiplexingNpgsqlCommand? command = null;
var commandsRead = 0;

try
Expand Down Expand Up @@ -1151,11 +1154,13 @@ async Task MultiplexingReadLoop()
// returned to the pool, it is *never* written to unless properly dequeued from the Idle channel.
if (Interlocked.Add(ref CommandsInFlightCount, -commandsRead) == 0)
{
var sw = new SpinWait();
// There's a race condition where the continuation of an asynchronous multiplexing write may not
// have executed yet, and the flush may still be in progress. We know all I/O has already
// been sent - because the reader has already consumed the entire resultset. So we wait until
// the connector's write lock has been released (long waiting will never occur here).
SpinWait.SpinUntil(() => MultiplexAsyncWritingLock == 0 || IsBroken);
while (MultiplexAsyncWritingLock != 0 && !IsBroken)
sw.SpinOnce();

ResetReadBuffer();
DataSource.Return(this);
Expand Down Expand Up @@ -1689,7 +1694,7 @@ internal void ResetCancellation()
internal void PerformUserCancellation()
{
var connection = Connection;
if (connection is null || connection.ConnectorBindingScope == ConnectorBindingScope.Reader || UserCancellationRequested)
if (connection is null || UserCancellationRequested)
return;

// Take the lock first to make sure there is no concurrent Break.
Expand Down Expand Up @@ -2067,11 +2072,9 @@ internal Exception Break(Exception reason)
// On the other hand leaving the state Open could indicate to the user that the connection is functional.
// (see https://github.com/npgsql/npgsql/issues/3705#issuecomment-839908772)
Connection = null;
if (connection.ConnectorBindingScope != ConnectorBindingScope.None)
Return();
Return();
connection.EnlistedTransaction = null;
connection.Connector = null;
connection.ConnectorBindingScope = ConnectorBindingScope.None;
}

connection.FullState = ConnectionState.Broken;
Expand Down Expand Up @@ -2229,8 +2232,6 @@ void GenerateResetMessage()
/// </summary>
internal async Task Reset(bool async)
{
bool endBindingScope;

// We start user action in case a keeplive happens concurrently, or a concurrent user command (bug)
using (StartUserAction(attemptPgCancellation: false))
{
Expand All @@ -2247,21 +2248,17 @@ internal async Task Reset(bool async)
switch (TransactionStatus)
{
case TransactionStatus.Idle:
// There is an undisposed transaction on multiplexing connection
endBindingScope = Connection?.ConnectorBindingScope == ConnectorBindingScope.Transaction;
break;
case TransactionStatus.Pending:
// BeginTransaction() was called, but was left in the write buffer and not yet sent to server.
// Just clear the transaction state.
ProcessNewTransactionStatus(TransactionStatus.Idle);
ClearTransaction();
endBindingScope = true;
break;
case TransactionStatus.InTransactionBlock:
case TransactionStatus.InFailedTransactionBlock:
await Rollback(async).ConfigureAwait(false);
ClearTransaction();
endBindingScope = true;
break;
default:
ThrowHelper.ThrowInvalidOperationException($"Internal Npgsql bug: unexpected value {TransactionStatus} of enum {nameof(TransactionStatus)}. Please file a bug.");
Expand All @@ -2286,13 +2283,6 @@ internal async Task Reset(bool async)

DataReader.UnbindIfNecessary();
}

if (endBindingScope)
{
// Connection is null if a connection enlisted in a TransactionScope was closed before the
// TransactionScope completed - the connector is still enlisted, but has no connection.
Connection?.EndBindingScope(ConnectorBindingScope.Transaction);
}
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion src/Npgsql/MultiHostDataSourceWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ internal override ValueTask<NpgsqlConnector> Get(NpgsqlConnection conn, NpgsqlTi
=> _wrappedSource.Get(conn, timeout, async, cancellationToken);
internal override bool TryGetIdleConnector([NotNullWhen(true)] out NpgsqlConnector? connector)
=> throw new NpgsqlException("Npgsql bug: trying to get an idle connector from " + nameof(MultiHostDataSourceWrapper));
internal override ValueTask<NpgsqlConnector?> OpenNewConnector(NpgsqlConnection conn, NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken)
internal override ValueTask<NpgsqlConnector?> OpenNewConnector(NpgsqlConnection? conn, NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken)
=> throw new NpgsqlException("Npgsql bug: trying to open a new connector from " + nameof(MultiHostDataSourceWrapper));
internal override void Return(NpgsqlConnector connector)
=> _wrappedSource.Return(connector);
Expand Down
22 changes: 12 additions & 10 deletions src/Npgsql/MultiplexingDataSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ sealed class MultiplexingDataSource : PoolingDataSource

readonly bool _autoPrepare;

readonly ChannelReader<NpgsqlCommand> _multiplexCommandReader;
internal ChannelWriter<NpgsqlCommand> MultiplexCommandWriter { get; }
readonly ChannelReader<MultiplexingNpgsqlCommand> _multiplexCommandReader;
internal ChannelWriter<MultiplexingNpgsqlCommand> MultiplexCommandWriter { get; }

readonly Task _multiplexWriteLoop;

Expand All @@ -45,7 +45,7 @@ internal MultiplexingDataSource(

_writeCoalescingBufferThresholdBytes = Settings.WriteCoalescingBufferThresholdBytes;

var multiplexCommandChannel = Channel.CreateBounded<NpgsqlCommand>(
var multiplexCommandChannel = Channel.CreateBounded<MultiplexingNpgsqlCommand>(
new BoundedChannelOptions(MultiplexingCommandChannelBound)
{
FullMode = BoundedChannelFullMode.Wait,
Expand Down Expand Up @@ -81,7 +81,7 @@ async Task MultiplexingWriteLoop()
while (true)
{
NpgsqlConnector? connector;
NpgsqlCommand? command;
MultiplexingNpgsqlCommand? command;

try
{
Expand All @@ -108,19 +108,15 @@ async Task MultiplexingWriteLoop()
}

connector = await OpenNewConnector(
command.InternalConnection!,
conn: null,
new NpgsqlTimeout(TimeSpan.FromSeconds(Settings.Timeout)),
async: true,
CancellationToken.None).ConfigureAwait(false);

if (connector != null)
{
// Managed to created a new connector
connector.Connection = null;

// See increment under over-capacity mode below
Interlocked.Increment(ref connector.CommandsInFlightCount);

break;
}

Expand Down Expand Up @@ -215,7 +211,7 @@ async Task MultiplexingWriteLoop()
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
bool WriteCommand(NpgsqlConnector connector, NpgsqlCommand command, ref MultiplexingStats stats)
bool WriteCommand(NpgsqlConnector connector, MultiplexingNpgsqlCommand command, ref MultiplexingStats stats)
{
// Note: this method *never* awaits on I/O - doing so would suspend all outgoing multiplexing commands
// for the entire pool. In the normal/fast case, writing the command is purely synchronous (serialize
Expand Down Expand Up @@ -365,6 +361,12 @@ static void CompleteWrite(NpgsqlConnector connector, ref MultiplexingStats stats
// ReSharper disable once FunctionNeverReturns
}

public override NpgsqlCommand CreateCommand(string? commandText = null)
=> new MultiplexingNpgsqlCommand(this) { CommandText = commandText };

public override NpgsqlBatch CreateBatch()
=> new NpgsqlDataSourceBatch(new MultiplexingNpgsqlCommand(this));

protected override void DisposeBase()
{
MultiplexCommandWriter.Complete(new ObjectDisposedException(nameof(MultiplexingDataSource)));
Expand Down
87 changes: 87 additions & 0 deletions src/Npgsql/MultiplexingNpgsqlCommand.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
using System;
using System.Data;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Npgsql.Internal;
using Npgsql.Util;

namespace Npgsql;

sealed class MultiplexingNpgsqlCommand : NpgsqlDataSourceCommand
{
readonly MultiplexingDataSource _dataSource;

// TODO: Maybe pool these?
internal ManualResetValueTaskSource<NpgsqlConnector> ExecutionCompletion { get; }
= new();

public MultiplexingNpgsqlCommand(MultiplexingDataSource dataSource) : base(null) => _dataSource = dataSource;

internal override async ValueTask<NpgsqlDataReader> ExecuteReader(bool async, CommandBehavior behavior, CancellationToken cancellationToken)
{
// The waiting on the ExecutionCompletion ManualResetValueTaskSource is necessarily
// asynchronous, so allowing sync would mean sync-over-async.
if (!async)
ThrowHelper.ThrowNotSupportedException("Synchronous command execution is not supported when multiplexing is on");

NpgsqlDataReader? reader = null;

try
{
if (IsWrappedByBatch)
{
foreach (var batchCommand in InternalBatchCommands)
{
batchCommand._parameters?.ProcessParameters(_dataSource.SerializerOptions, validateValues: true, batchCommand.CommandType);
ProcessRawQuery(null, standardConformingStrings: true, batchCommand);
}
}
else
{
_parameters?.ProcessParameters(_dataSource.SerializerOptions, validateValues: true, CommandType);
ProcessRawQuery(null, standardConformingStrings: true, batchCommand: null);
}

State = CommandState.InProgress;

TraceCommandStart(_dataSource.Settings);

// TODO: Experiment: do we want to wait on *writing* here, or on *reading*?
// Previous behavior was to wait on reading, which throw the exception from ExecuteReader (and not from
// the first read). But waiting on writing would allow us to do sync writing and async reading.
ExecutionCompletion.Reset();
try
{
await _dataSource.MultiplexCommandWriter.WriteAsync(this, cancellationToken).ConfigureAwait(false);
}
catch (ChannelClosedException ex)
{
Debug.Assert(ex.InnerException is not null);
throw ex.InnerException;
}
var connector = await new ValueTask<NpgsqlConnector>(ExecutionCompletion, ExecutionCompletion.Version).ConfigureAwait(false);

reader = connector.DataReader;
reader.Init(this, behavior, InternalBatchCommands);
connector.CurrentReader = reader;
await reader.NextResultAsync(cancellationToken).ConfigureAwait(false);

return reader;
}
catch (Exception e)
{
if (reader is not null)
await reader.Cleanup(async: true, isDisposing: true).ConfigureAwait(false);

TraceSetException(e);

State = CommandState.Idle;

throw;
}
}

protected override void Dispose(bool disposing) => State = CommandState.Disposed;
}
1 change: 0 additions & 1 deletion src/Npgsql/NpgsqlBinaryExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,6 @@ void Cleanup()
if (!ReferenceEquals(connector, null))
{
connector.CurrentCopyOperation = null;
_connector.Connection?.EndBindingScope(ConnectorBindingScope.Copy);
_connector = null!;
}

Expand Down
1 change: 0 additions & 1 deletion src/Npgsql/NpgsqlBinaryImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,6 @@ void Cleanup()
{
connector.EndUserAction();
connector.CurrentCopyOperation = null;
connector.Connection?.EndBindingScope(ConnectorBindingScope.Copy);
_connector = null;
}

Expand Down
Loading