Skip to content

Commit

Permalink
Real cancellation and timeout for ConnectAsync (#3167)
Browse files Browse the repository at this point in the history
Also enables the code analysis rule for flowing cancellation tokens
(part of #3162).

Fixes #2860
  • Loading branch information
roji committed Sep 20, 2020
2 parents f0433cd + 0cefc98 commit 83e8d90
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 19 deletions.
7 changes: 7 additions & 0 deletions .editorconfig
Expand Up @@ -45,3 +45,10 @@ csharp_style_throw_expression = true:suggestion
csharp_style_conditional_delegate_call = true:suggestion
csharp_indent_case_contents_when_block = false
csharp_indent_switch_labels = false

# Code analysis rules

# Reliability Rules

# CA2016: Forward the 'CancellationToken' parameter to methods that take one
dotnet_diagnostic.CA2016.severity = error
16 changes: 8 additions & 8 deletions src/Npgsql/NpgsqlConnection.cs
Expand Up @@ -226,15 +226,15 @@ Task Open(bool async, CancellationToken cancellationToken)
// If we've never connected with this connection string, open a physical connector in order to generate
// any exception (bad user/password, IP address...). This reproduces the standard error behavior.
if (!_pool.IsBootstrapped)
return BootstrapMultiplexing();
return BootstrapMultiplexing(cancellationToken);

CompleteOpen();
return Task.CompletedTask;
}

return OpenAsync();
return OpenAsync(cancellationToken);

async Task OpenAsync()
async Task OpenAsync(CancellationToken cancellationToken2)
{
NpgsqlConnector? connector = null;
try
Expand All @@ -250,7 +250,7 @@ async Task OpenAsync()
_userFacingConnectionString = Settings.ToStringWithoutPassword();

connector = new NpgsqlConnector(this);
await connector.Open(timeout, async, cancellationToken);
await connector.Open(timeout, async, cancellationToken2);
}
else
{
Expand All @@ -269,14 +269,14 @@ async Task OpenAsync()
}
else
{
connector = await _pool.Rent(this, timeout, async, cancellationToken);
connector = await _pool.Rent(this, timeout, async, cancellationToken2);
ConnectorBindingScope = ConnectorBindingScope.Connection;
Connector = connector;
EnlistTransaction(Transaction.Current);
}
}
else
connector = await _pool.Rent(this, timeout, async, cancellationToken);
connector = await _pool.Rent(this, timeout, async, cancellationToken2);
}

ConnectorBindingScope = ConnectorBindingScope.Connection;
Expand Down Expand Up @@ -310,12 +310,12 @@ async Task OpenAsync()
}
}

async Task BootstrapMultiplexing()
async Task BootstrapMultiplexing(CancellationToken cancellationToken2)
{
try
{
var timeout = new NpgsqlTimeout(TimeSpan.FromSeconds(ConnectionTimeout));
await _pool!.BootstrapMultiplexing(this, timeout, async, cancellationToken);
await _pool!.BootstrapMultiplexing(this, timeout, async, cancellationToken2);
CompleteOpen();
}
catch
Expand Down
46 changes: 37 additions & 9 deletions src/Npgsql/NpgsqlConnector.cs
Expand Up @@ -456,7 +456,7 @@ internal async Task Open(NpgsqlTimeout timeout, bool async, CancellationToken ca
// It is intentionally not awaited and will run as long as the connector is alive.
// The CommandsInFlightWriter channel is completed in Cleanup, which should cause this task
// to complete.
_ = Task.Run(MultiplexingReadLoop)
_ = Task.Run(MultiplexingReadLoop, CancellationToken.None)
.ContinueWith(t =>
{
// Note that we *must* observe the exception if the task is faulted.
Expand Down Expand Up @@ -755,7 +755,7 @@ async Task ConnectAsync(NpgsqlTimeout timeout, CancellationToken cancellationTok
}
else
{
// Note that there aren't any timeoutable or cancellable DNS methods
// Note that there aren't any timeout-able or cancellable DNS methods
endpoints = (await Dns.GetHostAddressesAsync(Host).WithCancellation(cancellationToken))
.Select(a => new IPEndPoint(a, Port)).ToArray();
}
Expand All @@ -778,18 +778,41 @@ async Task ConnectAsync(NpgsqlTimeout timeout, CancellationToken cancellationTok
Log.Trace($"Attempting to connect to {endpoint}");
var protocolType = endpoint.AddressFamily == AddressFamily.InterNetwork ? ProtocolType.Tcp : ProtocolType.IP;
var socket = new Socket(endpoint.AddressFamily, SocketType.Stream, protocolType);
CancellationTokenSource? combinedCts = null;
CancellationTokenSource? timeoutCts = null;
try
{
// .NET 5.0 added cancellation support to ConnectAsync, which allows us to implement real
// cancellation and timeout. On older TFMs, we fake-cancel the operation, i.e. stop waiting
// and raise the exception, but the actual connection task is left running.

// TODO: NET5_0 is here because of https://github.com/dotnet/sdk/issues/13377, remove for 5.0.0-rc2
#if (NET461 || NETSTANDARD2_0 || NETSTANDARD2_1 || NETCOREAPP3_1) && !NET5_0
await socket.ConnectAsync(endpoint)
.WithCancellationAndTimeout(perIpTimeout, cancellationToken);
#else
var finalCt = cancellationToken;

if (perIpTimeout.IsSet)
{
timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutCts.CancelAfter(perIpTimeout.TimeLeft.Milliseconds);
finalCt = timeoutCts.Token;
}

await socket.ConnectAsync(endpoint, finalCt);
#endif

SetSocketOptions(socket);
_socket = socket;
return;
}
catch (Exception e)
{
try { socket.Dispose(); }
try
{
socket.Dispose();
}
catch
{
// ignored
Expand All @@ -807,6 +830,11 @@ await socket.ConnectAsync(endpoint)
throw new NpgsqlException("Exception while connecting", e);
}
}
finally
{
timeoutCts?.Dispose();
combinedCts?.Dispose();
}
}
}

Expand Down Expand Up @@ -2028,8 +2056,8 @@ async Task DoWaitAsync(CancellationToken cancellationToken = default)
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
// Intentional, as this method would be rewritten
var msg = await ReadMessageWithNotifications(true);
// TODO: Fully implement cancellation
var msg = await ReadMessageWithNotifications(true, CancellationToken.None);
if (!keepaliveSent)
{
if (msg != null)
Expand All @@ -2042,7 +2070,7 @@ async Task DoWaitAsync(CancellationToken cancellationToken = default)

// A keepalive was sent. Consume the response (RowDescription, CommandComplete,
// ReadyForQuery) while also keeping track if an async message was received in between.
keepaliveLock.Wait();
await keepaliveLock.WaitAsync(CancellationToken.None);
try
{
var receivedNotification = false;
Expand All @@ -2053,7 +2081,7 @@ async Task DoWaitAsync(CancellationToken cancellationToken = default)
while (msg == null)
{
receivedNotification = true;
// Intentional, as this method would be rewritten
// TODO: Fully implement cancellation
msg = await ReadMessage(true, CancellationToken.None);
}

Expand All @@ -2067,9 +2095,9 @@ async Task DoWaitAsync(CancellationToken cancellationToken = default)
expectedMessageCode = BackendMessageCode.DataRow;
break;
case BackendMessageCode.DataRow:
// Intentional, as this method would be rewritten
// TODO: Fully implement cancellation
// DataRow is usually consumed by a reader, here we have to skip it manually.
await ReadBuffer.Skip(((DataRowMessage)msg).Length, true);
await ReadBuffer.Skip(((DataRowMessage)msg).Length, true, CancellationToken.None);
expectedMessageCode = BackendMessageCode.CompletedResponse;
break;
case BackendMessageCode.CompletedResponse:
Expand Down
2 changes: 1 addition & 1 deletion src/Npgsql/NpgsqlDataReader.cs
Expand Up @@ -1346,7 +1346,7 @@ async ValueTask<TextReader> GetTextReader(int ordinal, bool async, CancellationT
if (field.Handler is ITextReaderHandler handler)
return handler.GetTextReader(async
? await GetStreamInternal(field, ordinal, true, cancellationToken)
: GetStreamInternal(field, ordinal, false).Result);
: GetStreamInternal(field, ordinal, false, CancellationToken.None).Result);

throw new InvalidCastException($"The GetTextReader method is not supported for type {field.Handler.PgDisplayName}");
}
Expand Down
3 changes: 3 additions & 0 deletions src/Npgsql/TaskExtensions.cs
Expand Up @@ -64,6 +64,8 @@ internal static async Task<T> WithCancellation<T>(this Task<T> task, Cancellatio
return await task;
}

// TODO: NET5_0 is here because of https://github.com/dotnet/sdk/issues/13377, remove for 5.0.0-rc2
#if (NET461 || NETSTANDARD2_0 || NETSTANDARD2_1 || NETCOREAPP3_1) && !NET5_0
/// <summary>
/// Allows you to cancel awaiting for a non-cancellable task.
/// </summary>
Expand Down Expand Up @@ -93,5 +95,6 @@ internal static Task WithCancellationAndTimeout(this Task task, NpgsqlTimeout ti
.WithCancellation(cancellationToken)
.WithTimeout(timeout);
}
#endif
}
}
4 changes: 3 additions & 1 deletion test/Npgsql.Tests/ConnectionTests.cs
Expand Up @@ -373,7 +373,9 @@ public void ConnectTimeoutAsync()
}.ToString();
using (var conn = new NpgsqlConnection(connString))
{
Assert.That(async () => await conn.OpenAsync(), Throws.Exception.TypeOf<TimeoutException>());
Assert.That(async () => await conn.OpenAsync(), Throws.Exception
.TypeOf<NpgsqlException>()
.With.InnerException.TypeOf<TimeoutException>());
Assert.That(conn.State, Is.EqualTo(ConnectionState.Closed));
}
}
Expand Down

0 comments on commit 83e8d90

Please sign in to comment.