Skip to content

CSHARP-3671: Better wait queue timeout errors for load balanced clusters. #579

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 17 additions & 29 deletions src/MongoDB.Driver.Core/Core/ChannelPinningHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using System.Threading;
using MongoDB.Driver.Core.Bindings;
using MongoDB.Driver.Core.Clusters;
using MongoDB.Driver.Core.ConnectionPools;
using MongoDB.Driver.Core.Connections;
using MongoDB.Driver.Core.Servers;

Expand Down Expand Up @@ -89,18 +90,20 @@ public static IReadWriteBindingHandle CreateReadWriteBinding(ICluster cluster, I
return new ReadWriteBindingHandle(readWriteBinding);
}

internal static IChannelSourceHandle CreateGetMoreChannelSource(IChannelSourceHandle channelSource, long cursorId)
internal static IChannelSourceHandle CreateGetMoreChannelSource(IChannelSourceHandle channelSource, IChannelHandle channel, long cursorId)
{
IChannelSource effectiveChannelSource;
if (IsInLoadBalancedMode(channelSource.ServerDescription) && cursorId != 0)
{
var getMoreChannel = channelSource.GetChannel(CancellationToken.None); // no need for cancellation token since we already have channel in the source
var getMoreSession = channelSource.Session.Fork();
if (channel.Connection is ICheckOutReasonTracker checkOutReasonTracker)
{
checkOutReasonTracker.SetCheckOutReasonIfNotAlreadySet(CheckOutReason.Cursor);
}

effectiveChannelSource = new ChannelChannelSource(
channelSource.Server,
getMoreChannel,
getMoreSession);
channel.Fork(),
channelSource.Session.Fork());
}
else
{
Expand All @@ -110,37 +113,22 @@ internal static IChannelSourceHandle CreateGetMoreChannelSource(IChannelSourceHa
return new ChannelSourceHandle(effectiveChannelSource);
}

internal static bool PinChannelSourceAndChannelIfRequired(
internal static void PinChannellIfRequired(
IChannelSourceHandle channelSource,
IChannelHandle channel,
ICoreSessionHandle session,
out IChannelSourceHandle pinnedChannelSource,
out IChannelHandle pinnedChannel)
ICoreSessionHandle session)
{
if (IsInLoadBalancedMode(channel.ConnectionDescription))
if (IsInLoadBalancedMode(channel.ConnectionDescription) &&
session.IsInTransaction &&
!IsChannelPinned(session.CurrentTransaction))
{
var server = channelSource.Server;

pinnedChannelSource = new ChannelSourceHandle(
new ChannelChannelSource(
server,
channel.Fork(),
session.Fork()));

if (session.IsInTransaction && !IsChannelPinned(session.CurrentTransaction))
if (channel.Connection is ICheckOutReasonTracker checkOutReasonTracker)
{
session.CurrentTransaction.PinChannel(channel.Fork());
session.CurrentTransaction.PinnedServer = server;
checkOutReasonTracker.SetCheckOutReasonIfNotAlreadySet(CheckOutReason.Transaction);
}

pinnedChannel = channel.Fork();

return true;
session.CurrentTransaction.PinChannel(channel.Fork());
session.CurrentTransaction.PinnedServer = channelSource.Server;
}

pinnedChannelSource = null;
pinnedChannel = null;
return false;
}

// private methods
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/* Copyright 2021-present MongoDB Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Threading;

namespace MongoDB.Driver.Core.ConnectionPools
{
internal enum CheckOutReason
{
Cursor,
Transaction
}

internal interface ICheckOutReasonTracker
{
CheckOutReason? CheckOutReason { get; }
void SetCheckOutReasonIfNotAlreadySet(CheckOutReason reason);
}

internal sealed class CheckOutReasonCounter
{
public int _cursorCheckOutsCount = 0;
public int _transactionCheckOutsCount = 0;

public int GetCheckOutsCount(CheckOutReason reason) =>
reason switch
{
CheckOutReason.Cursor => _cursorCheckOutsCount,
CheckOutReason.Transaction => _transactionCheckOutsCount,
_ => throw new InvalidOperationException($"Invalid checkout reason {reason}.")
};

public void Increment(CheckOutReason reason)
{
switch (reason)
{
case CheckOutReason.Cursor:
Interlocked.Increment(ref _cursorCheckOutsCount);
break;
case CheckOutReason.Transaction:
Interlocked.Increment(ref _transactionCheckOutsCount);
break;
default:
throw new InvalidOperationException($"Invalid checkout reason {reason}.");
}
}

public void Decrement(CheckOutReason? reason)
{
switch (reason)
{
case null:
break;
case CheckOutReason.Cursor:
Interlocked.Decrement(ref _cursorCheckOutsCount);
break;
case CheckOutReason.Transaction:
Interlocked.Decrement(ref _transactionCheckOutsCount);
break;
default:
throw new InvalidOperationException($"Invalid checkout reason {reason}.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,33 @@

namespace MongoDB.Driver.Core.ConnectionPools
{
internal sealed partial class ExclusiveConnectionPool : IConnectionPool
internal sealed partial class ExclusiveConnectionPool
{
// private methods
private Exception CreateTimeoutException(Stopwatch stopwatch, string message)
{
var checkOutsForCursorCount = _checkOutReasonCounter.GetCheckOutsCount(CheckOutReason.Cursor);
var checkOutsForTransactionCount = _checkOutReasonCounter.GetCheckOutsCount(CheckOutReason.Transaction);

// only use the expanded message format when connected to a load balancer
if (checkOutsForCursorCount != 0 || checkOutsForTransactionCount != 0)
{
var maxPoolSize = _settings.MaxConnections;
var availableConnectionsCount = AvailableCount;
var checkOutsCount = maxPoolSize - availableConnectionsCount;
var checkOutsForOtherCount = checkOutsCount - checkOutsForCursorCount - checkOutsForTransactionCount;

message =
$"Timed out after {stopwatch.ElapsedMilliseconds}ms waiting for a connection from the connection pool. " +
$"maxPoolSize: {maxPoolSize}, " +
$"connections in use by cursors: {checkOutsForCursorCount}, " +
$"connections in use by transactions: {checkOutsForTransactionCount}, " +
$"connections in use by other operations: {checkOutsForOtherCount}.";
}

return new TimeoutException(message);
}

// nested classes
private static class State
{
Expand Down Expand Up @@ -125,7 +150,7 @@ private AcquiredConnection FinalizePoolEnterance(PooledConnection pooledConnecti
_stopwatch.Stop();

var message = $"Timed out waiting for a connection after {_stopwatch.ElapsedMilliseconds}ms.";
throw new TimeoutException(message);
throw _pool.CreateTimeoutException(_stopwatch, message);
}
}

Expand Down Expand Up @@ -173,8 +198,9 @@ public void HandleException(Exception ex)
}
}

private sealed class PooledConnection : IConnection
private sealed class PooledConnection : IConnection, ICheckOutReasonTracker
{
private CheckOutReason? _checkOutReason;
private readonly IConnection _connection;
private readonly ExclusiveConnectionPool _connectionPool;
private int _generation;
Expand All @@ -187,6 +213,14 @@ public PooledConnection(ExclusiveConnectionPool connectionPool, IConnection conn
_generation = connectionPool._generation;
}

public CheckOutReason? CheckOutReason
{
get
{
return _checkOutReason;
}
}

public ConnectionId ConnectionId
{
get { return _connection.ConnectionId; }
Expand Down Expand Up @@ -313,6 +347,15 @@ public async Task SendMessagesAsync(IEnumerable<RequestMessage> messages, Messag
}
}

public void SetCheckOutReasonIfNotAlreadySet(CheckOutReason reason)
{
if (_checkOutReason == null)
{
_checkOutReason = reason;
_connectionPool._checkOutReasonCounter.Increment(reason);
}
}

public void SetReadTimeout(TimeSpan timeout)
{
_connection.SetReadTimeout(timeout);
Expand All @@ -335,7 +378,7 @@ private void SetEffectiveGenerationIfRequired(ConnectionDescription description)
}
}

private sealed class AcquiredConnection : IConnectionHandle
private sealed class AcquiredConnection : IConnectionHandle, ICheckOutReasonTracker
{
private ExclusiveConnectionPool _connectionPool;
private bool _disposed;
Expand All @@ -347,6 +390,14 @@ public AcquiredConnection(ExclusiveConnectionPool connectionPool, ReferenceCount
_reference = reference;
}

public CheckOutReason? CheckOutReason
{
get
{
return _reference.Instance.CheckOutReason;
}
}

public ConnectionId ConnectionId
{
get { return _reference.Instance.ConnectionId; }
Expand Down Expand Up @@ -432,6 +483,12 @@ public Task SendMessagesAsync(IEnumerable<RequestMessage> messages, MessageEncod
return _reference.Instance.SendMessagesAsync(messages, messageEncoderSettings, cancellationToken);
}

public void SetCheckOutReasonIfNotAlreadySet(CheckOutReason reason)
{
ThrowIfDisposed();
_reference.Instance.SetCheckOutReasonIfNotAlreadySet(reason);
}

public void SetReadTimeout(TimeSpan timeout)
{
ThrowIfDisposed();
Expand Down Expand Up @@ -674,15 +731,15 @@ public PooledConnection CreateOpenedOrReuse(CancellationToken cancellationToken)
{
SemaphoreSlimSignalable.SemaphoreWaitResult.Signaled => _pool._connectionHolder.Acquire(),
SemaphoreSlimSignalable.SemaphoreWaitResult.Entered => CreateOpenedInternal(cancellationToken),
SemaphoreSlimSignalable.SemaphoreWaitResult.TimedOut => throw new TimeoutException($"Timed out waiting in connecting queue after {stopwatch.ElapsedMilliseconds}ms."),
SemaphoreSlimSignalable.SemaphoreWaitResult.TimedOut => throw CreateTimeoutException(stopwatch),
_ => throw new InvalidOperationException($"Invalid wait result {_connectingWaitStatus}")
};

waitTimeout = _connectingTimeout - stopwatch.Elapsed;

if (connection == null && waitTimeout <= TimeSpan.Zero)
{
throw TimoutException(stopwatch);
throw CreateTimeoutException(stopwatch);
}
}

Expand All @@ -708,15 +765,15 @@ public async Task<PooledConnection> CreateOpenedOrReuseAsync(CancellationToken c
{
SemaphoreSlimSignalable.SemaphoreWaitResult.Signaled => _pool._connectionHolder.Acquire(),
SemaphoreSlimSignalable.SemaphoreWaitResult.Entered => await CreateOpenedInternalAsync(cancellationToken).ConfigureAwait(false),
SemaphoreSlimSignalable.SemaphoreWaitResult.TimedOut => throw TimoutException(stopwatch),
SemaphoreSlimSignalable.SemaphoreWaitResult.TimedOut => throw CreateTimeoutException(stopwatch),
_ => throw new InvalidOperationException($"Invalid wait result {_connectingWaitStatus}")
};

waitTimeout = _connectingTimeout - stopwatch.Elapsed;

if (connection == null && waitTimeout <= TimeSpan.Zero)
{
throw TimoutException(stopwatch);
throw CreateTimeoutException(stopwatch);
}
}

Expand Down Expand Up @@ -783,8 +840,11 @@ private void FinishCreating(ConnectionDescription description)
_pool._serviceStates.IncrementConnectionCount(description?.ServiceId);
}

private Exception TimoutException(Stopwatch stopwatch) =>
new TimeoutException($"Timed out waiting in connecting queue after {stopwatch.ElapsedMilliseconds}ms.");
private Exception CreateTimeoutException(Stopwatch stopwatch)
{
var message = $"Timed out waiting in connecting queue after {stopwatch.ElapsedMilliseconds}ms.";
return _pool.CreateTimeoutException(stopwatch, message);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace MongoDB.Driver.Core.ConnectionPools
internal sealed partial class ExclusiveConnectionPool : IConnectionPool
{
// fields
private readonly CheckOutReasonCounter _checkOutReasonCounter;
private readonly IConnectionFactory _connectionFactory;
private readonly ListConnectionHolder _connectionHolder;
private readonly EndPoint _endPoint;
Expand Down Expand Up @@ -71,6 +72,7 @@ public ExclusiveConnectionPool(
_connectionFactory = Ensure.IsNotNull(connectionFactory, nameof(connectionFactory));
Ensure.IsNotNull(eventSubscriber, nameof(eventSubscriber));

_checkOutReasonCounter = new CheckOutReasonCounter();
_connectingQueue = new SemaphoreSlimSignalable(MongoInternalDefaults.ConnectionPool.MaxConnecting);
_connectionHolder = new ListConnectionHolder(eventSubscriber, _connectingQueue);
_serviceStates = new ServiceStates();
Expand Down Expand Up @@ -385,6 +387,8 @@ private void ReleaseConnection(PooledConnection connection)
_checkedInConnectionEventHandler(new ConnectionPoolCheckedInConnectionEvent(connection.ConnectionId, TimeSpan.Zero, EventContext.OperationId));
}

_checkOutReasonCounter.Decrement(connection.CheckOutReason);

if (!connection.IsExpired && _state.Value != State.Disposed)
{
_connectionHolder.Return(connection);
Expand Down
Loading