Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 30 additions & 31 deletions src/MongoDB.Driver/Core/Clusters/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ internal abstract class Cluster : IClusterInternal
private readonly TimeSpan _minHeartbeatInterval = __minHeartbeatIntervalDefault;
private readonly IClusterClock _clusterClock = new ClusterClock();
private readonly ClusterId _clusterId;
private ClusterDescription _description;
private TaskCompletionSource<bool> _descriptionChangedTaskCompletionSource;
private readonly object _descriptionLock = new object();
private ClusterDescriptionChangeSource _descriptionWithChangedTaskCompletionSource;
private readonly LatencyLimitingServerSelector _latencyLimitingServerSelector;
protected readonly EventLogger<LogCategories.SDAM> _clusterEventLogger;
protected readonly EventLogger<LogCategories.ServerSelection> _serverSelectionEventLogger;
Expand All @@ -70,10 +68,8 @@ protected Cluster(ClusterSettings settings, IClusterableServerFactory serverFact
Ensure.IsNotNull(eventSubscriber, nameof(eventSubscriber));
_state = new InterlockedInt32(State.Initial);
_rapidHeartbeatTimerCallbackState = new InterlockedInt32(RapidHeartbeatTimerCallbackState.NotRunning);

_clusterId = new ClusterId();
_description = ClusterDescription.CreateInitial(_clusterId, _settings.DirectConnection);
_descriptionChangedTaskCompletionSource = new TaskCompletionSource<bool>();
_descriptionWithChangedTaskCompletionSource = new (ClusterDescription.CreateInitial(_clusterId, _settings.DirectConnection));
_latencyLimitingServerSelector = new LatencyLimitingServerSelector(settings.LocalThreshold);

_rapidHeartbeatTimer = new Timer(RapidHeartbeatTimerCallback, null, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
Expand All @@ -97,10 +93,7 @@ public ClusterDescription Description
{
get
{
lock (_descriptionLock)
{
return _description;
}
return _descriptionWithChangedTaskCompletionSource.ClusterDescription;
}
}

Expand Down Expand Up @@ -134,7 +127,7 @@ protected virtual void Dispose(bool disposing)

var newClusterDescription = new ClusterDescription(
_clusterId,
_description.DirectConnection,
_descriptionWithChangedTaskCompletionSource.ClusterDescription.DirectConnection,
dnsMonitorException: null,
ClusterType.Unknown,
Enumerable.Empty<ServerDescription>());
Expand Down Expand Up @@ -293,22 +286,11 @@ public ICoreSessionHandle StartSession(CoreSessionOptions options)

protected void UpdateClusterDescription(ClusterDescription newClusterDescription, bool shouldClusterDescriptionChangedEventBePublished = true)
{
ClusterDescription oldClusterDescription = null;
TaskCompletionSource<bool> oldDescriptionChangedTaskCompletionSource = null;
var oldClusterDescription = Interlocked.Exchange(ref _descriptionWithChangedTaskCompletionSource, new(newClusterDescription));

lock (_descriptionLock)
{
oldClusterDescription = _description;
_description = newClusterDescription;
OnDescriptionChanged(oldClusterDescription.ClusterDescription, newClusterDescription, shouldClusterDescriptionChangedEventBePublished);

oldDescriptionChangedTaskCompletionSource = _descriptionChangedTaskCompletionSource;
_descriptionChangedTaskCompletionSource = new TaskCompletionSource<bool>();
}

OnDescriptionChanged(oldClusterDescription, newClusterDescription, shouldClusterDescriptionChangedEventBePublished);

// TODO: use RunContinuationsAsynchronously instead once we require a new enough .NET Framework
Task.Run(() => oldDescriptionChangedTaskCompletionSource.TrySetResult(true));
oldClusterDescription.TrySetChanged();
}

private string BuildTimeoutExceptionMessage(TimeSpan timeout, IServerSelector selector, ClusterDescription clusterDescription)
Expand Down Expand Up @@ -363,6 +345,25 @@ private void ThrowTimeoutException(IServerSelector selector, ClusterDescription
}

// nested classes
internal sealed class ClusterDescriptionChangeSource
{
private readonly TaskCompletionSource<bool> _changedTaskCompletionSource;
private readonly ClusterDescription _clusterDescription;

public ClusterDescriptionChangeSource(ClusterDescription clusterDescription)
{
_changedTaskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to hide this ceremony of source creation.

_clusterDescription = clusterDescription;
}

public ClusterDescription ClusterDescription => _clusterDescription;

public Task Changed => _changedTaskCompletionSource.Task;

public bool TrySetChanged()
=> _changedTaskCompletionSource.TrySetResult(true);
}

private class SelectServerHelper : IDisposable
{
private readonly Cluster _cluster;
Expand All @@ -380,7 +381,7 @@ public SelectServerHelper(Cluster cluster, IServerSelector selector)
{
_cluster = cluster;

_connectedServers = new List<IClusterableServer>(_cluster._description?.Servers?.Count ?? 1);
_connectedServers = new List<IClusterableServer>(_cluster._descriptionWithChangedTaskCompletionSource.ClusterDescription?.Servers?.Count ?? 1);
_connectedServerDescriptions = new List<ServerDescription>(_connectedServers.Count);
_operationCountServerSelector = new OperationsCountServerSelector(_connectedServers);

Expand Down Expand Up @@ -429,11 +430,9 @@ public void HandleException(Exception exception)

public IServer SelectServer()
{
lock (_cluster._descriptionLock)
{
_descriptionChangedTask = _cluster._descriptionChangedTaskCompletionSource.Task;
_description = _cluster._description;
}
var clusterDescription = _cluster._descriptionWithChangedTaskCompletionSource;
_descriptionChangedTask = clusterDescription.Changed;
_description = clusterDescription.ClusterDescription;

if (!_serverSelectionWaitQueueEntered)
{
Expand Down
2 changes: 1 addition & 1 deletion tests/MongoDB.Driver.Tests/Core/Jira/CSharp3173Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ private IServerSelector CreateWritableServerAndEndPointSelector(EndPoint endPoin
private void ForceClusterId(MultiServerCluster cluster, ClusterId clusterId)
{
Reflector.SetFieldValue(cluster, "_clusterId", clusterId);
Reflector.SetFieldValue(cluster, "_description", ClusterDescription.CreateInitial(clusterId, __directConnection));
Reflector.SetFieldValue(cluster, "_descriptionWithChangedTaskCompletionSource", new Cluster.ClusterDescriptionChangeSource(ClusterDescription.CreateInitial(clusterId, __directConnection)));
}

private void SetupServerMonitorConnection(
Expand Down
2 changes: 1 addition & 1 deletion tests/MongoDB.Driver.Tests/Core/Jira/CSharp3302Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ private IServerSelector CreateWritableServerAndEndPointSelector(EndPoint endPoin
private void ForceClusterId(MultiServerCluster cluster, ClusterId clusterId)
{
Reflector.SetFieldValue(cluster, "_clusterId", clusterId);
Reflector.SetFieldValue(cluster, "_description", ClusterDescription.CreateInitial(clusterId, __directConnection));
Reflector.SetFieldValue(cluster, "_descriptionWithChangedTaskCompletionSource", new Cluster.ClusterDescriptionChangeSource(ClusterDescription.CreateInitial(clusterId, __directConnection)));
}

private void SetupServerMonitorConnection(
Expand Down