diff --git a/src/MongoDB.Driver.Core/Core/Bindings/ChannelReadBinding.cs b/src/MongoDB.Driver.Core/Core/Bindings/ChannelReadBinding.cs index 11b3c29c3fb..c91671dfa75 100644 --- a/src/MongoDB.Driver.Core/Core/Bindings/ChannelReadBinding.cs +++ b/src/MongoDB.Driver.Core/Core/Bindings/ChannelReadBinding.cs @@ -14,10 +14,9 @@ */ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; -using MongoDB.Driver.Core.Clusters; -using MongoDB.Driver.Core.Connections; using MongoDB.Driver.Core.Misc; using MongoDB.Driver.Core.Servers; @@ -51,7 +50,7 @@ public ChannelReadBinding(IServer server, IChannelHandle channel, ReadPreference _session = Ensure.IsNotNull(session, nameof(session)); } - // properties + // properties /// public ReadPreference ReadPreference { @@ -90,6 +89,18 @@ public Task GetReadChannelSourceAsync(CancellationToken ca return Task.FromResult(GetReadChannelSourceHelper()); } + /// + public IChannelSourceHandle GetReadChannelSource(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + return GetReadChannelSource(cancellationToken); + } + + /// + public Task GetReadChannelSourceAsync(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + return GetReadChannelSourceAsync(cancellationToken); + } + private IChannelSourceHandle GetReadChannelSourceHelper() { return new ChannelSourceHandle(new ChannelChannelSource(_server, _channel.Fork(), _session.Fork())); diff --git a/src/MongoDB.Driver.Core/Core/Bindings/ChannelReadWriteBinding.cs b/src/MongoDB.Driver.Core/Core/Bindings/ChannelReadWriteBinding.cs index 2375dd0560d..2e8bf6695f3 100644 --- a/src/MongoDB.Driver.Core/Core/Bindings/ChannelReadWriteBinding.cs +++ b/src/MongoDB.Driver.Core/Core/Bindings/ChannelReadWriteBinding.cs @@ -14,6 +14,7 @@ */ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using MongoDB.Driver.Core.Misc; @@ -85,6 +86,18 @@ public Task GetReadChannelSourceAsync(CancellationToken ca return Task.FromResult(GetChannelSourceHelper()); } + /// + public IChannelSourceHandle GetReadChannelSource(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + return GetReadChannelSource(cancellationToken); + } + + /// + public Task GetReadChannelSourceAsync(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + return GetReadChannelSourceAsync(cancellationToken); + } + /// public IChannelSourceHandle GetWriteChannelSource(CancellationToken cancellationToken) { @@ -92,12 +105,24 @@ public IChannelSourceHandle GetWriteChannelSource(CancellationToken cancellation return GetChannelSourceHelper(); } + /// + public IChannelSourceHandle GetWriteChannelSource(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + return GetWriteChannelSource(cancellationToken); + } + /// public IChannelSourceHandle GetWriteChannelSource(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken) { return GetWriteChannelSource(cancellationToken); // ignore mayUseSecondary } + /// + public IChannelSourceHandle GetWriteChannelSource(IReadOnlyCollection deprioritizedServers, IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken) + { + return GetWriteChannelSource(mayUseSecondary, cancellationToken); + } + /// public Task GetWriteChannelSourceAsync(CancellationToken cancellationToken) { @@ -105,12 +130,24 @@ public Task GetWriteChannelSourceAsync(CancellationToken c return Task.FromResult(GetChannelSourceHelper()); } + /// + public Task GetWriteChannelSourceAsync(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + return GetWriteChannelSourceAsync(cancellationToken); + } + /// public Task GetWriteChannelSourceAsync(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken) { return GetWriteChannelSourceAsync(cancellationToken); // ignore mayUseSecondary } + /// + public Task GetWriteChannelSourceAsync(IReadOnlyCollection deprioritizedServers, IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken) + { + return GetWriteChannelSourceAsync(mayUseSecondary, cancellationToken); + } + // private methods private IChannelSourceHandle GetChannelSourceHelper() { diff --git a/src/MongoDB.Driver.Core/Core/Bindings/ChannelSourceReadWriteBinding.cs b/src/MongoDB.Driver.Core/Core/Bindings/ChannelSourceReadWriteBinding.cs index 75b8928e297..e434a36a57b 100644 --- a/src/MongoDB.Driver.Core/Core/Bindings/ChannelSourceReadWriteBinding.cs +++ b/src/MongoDB.Driver.Core/Core/Bindings/ChannelSourceReadWriteBinding.cs @@ -14,9 +14,11 @@ */ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using MongoDB.Driver.Core.Misc; +using MongoDB.Driver.Core.Servers; namespace MongoDB.Driver.Core.Bindings { @@ -73,6 +75,18 @@ public Task GetReadChannelSourceAsync(CancellationToken ca return Task.FromResult(GetChannelSourceHelper()); } + /// + public IChannelSourceHandle GetReadChannelSource(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + return GetReadChannelSource(cancellationToken); + } + + /// + public Task GetReadChannelSourceAsync(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + return GetReadChannelSourceAsync(cancellationToken); + } + /// public IChannelSourceHandle GetWriteChannelSource(CancellationToken cancellationToken) { @@ -80,12 +94,24 @@ public IChannelSourceHandle GetWriteChannelSource(CancellationToken cancellation return GetChannelSourceHelper(); } + /// + public IChannelSourceHandle GetWriteChannelSource(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + return GetWriteChannelSource(cancellationToken); + } + /// public IChannelSourceHandle GetWriteChannelSource(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken) { return GetWriteChannelSource(cancellationToken); // ignore mayUseSecondary } + /// + public IChannelSourceHandle GetWriteChannelSource(IReadOnlyCollection deprioritizedServers, IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken) + { + return GetWriteChannelSource(mayUseSecondary, cancellationToken); + } + /// public Task GetWriteChannelSourceAsync(CancellationToken cancellationToken) { @@ -93,12 +119,24 @@ public Task GetWriteChannelSourceAsync(CancellationToken c return Task.FromResult(GetChannelSourceHelper()); } + /// + public Task GetWriteChannelSourceAsync(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + return GetWriteChannelSourceAsync(cancellationToken); + } + /// public Task GetWriteChannelSourceAsync(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken) { return GetWriteChannelSourceAsync(cancellationToken); // ignore mayUseSecondary } + /// + public Task GetWriteChannelSourceAsync(IReadOnlyCollection deprioritizedServers, IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken) + { + return GetWriteChannelSourceAsync(mayUseSecondary, cancellationToken); + } + /// public void Dispose() { diff --git a/src/MongoDB.Driver.Core/Core/Bindings/IBinding.cs b/src/MongoDB.Driver.Core/Core/Bindings/IBinding.cs index 1e625a04e3c..40c33ee441c 100644 --- a/src/MongoDB.Driver.Core/Core/Bindings/IBinding.cs +++ b/src/MongoDB.Driver.Core/Core/Bindings/IBinding.cs @@ -14,6 +14,7 @@ */ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using MongoDB.Driver.Core.Servers; @@ -61,6 +62,22 @@ public interface IReadBinding : IBinding /// The cancellation token. /// A channel source. Task GetReadChannelSourceAsync(CancellationToken cancellationToken); + + /// + /// Gets a channel source for read operations while deprioritizing servers in the provided collection. + /// + /// The deprioritized servers. + /// The cancellation token. + /// A channel source. + IChannelSourceHandle GetReadChannelSource(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken); + + /// + /// Gets a channel source for read operations while deprioritizing servers in the provided collection. + /// + /// The deprioritized servers. + /// The cancellation token. + /// A channel source. + Task GetReadChannelSourceAsync(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken); } /// @@ -75,6 +92,14 @@ public interface IWriteBinding : IBinding /// A channel source. IChannelSourceHandle GetWriteChannelSource(CancellationToken cancellationToken); + /// + /// Gets a channel source for write operations while deprioritizing servers in the provided collection. + /// + /// The deprioritized servers. + /// The cancellation token. + /// A channel source. + IChannelSourceHandle GetWriteChannelSource(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken); + /// /// Gets a channel source for write operations that may use a secondary. /// @@ -83,6 +108,15 @@ public interface IWriteBinding : IBinding /// A channel source. IChannelSourceHandle GetWriteChannelSource(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken); + /// + /// Gets a channel source for write operations that may use a secondary and deprioritizes servers in the provided collection. + /// + /// The deprioritized servers. + /// The may use secondary criteria. + /// The cancellation token. + /// A channel source. + IChannelSourceHandle GetWriteChannelSource(IReadOnlyCollection deprioritizedServers, IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken); + /// /// Gets a channel source for write operations. /// @@ -90,6 +124,14 @@ public interface IWriteBinding : IBinding /// A channel source. Task GetWriteChannelSourceAsync(CancellationToken cancellationToken); + /// + /// Gets a channel source for write operations while deprioritizing servers in the provided collection. + /// + /// The deprioritized servers. + /// The cancellation token. + /// A channel source. + Task GetWriteChannelSourceAsync(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken); + /// /// Gets a channel source for write operations that may use a secondary. /// @@ -97,6 +139,15 @@ public interface IWriteBinding : IBinding /// The cancellation token. /// A channel source. Task GetWriteChannelSourceAsync(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken); + + /// + /// Gets a channel source for write operations that may use a secondary and deprioritizes servers in the provided collection. + /// + /// The deprioritized servers. + /// The may use secondary criteria. + /// The cancellation token. + /// A channel source. + Task GetWriteChannelSourceAsync(IReadOnlyCollection deprioritizedServers, IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken); } /// diff --git a/src/MongoDB.Driver.Core/Core/Bindings/ReadBindingHandle.cs b/src/MongoDB.Driver.Core/Core/Bindings/ReadBindingHandle.cs index f190db358d0..6c8283d2bfd 100644 --- a/src/MongoDB.Driver.Core/Core/Bindings/ReadBindingHandle.cs +++ b/src/MongoDB.Driver.Core/Core/Bindings/ReadBindingHandle.cs @@ -15,12 +15,10 @@ using System; using System.Collections.Generic; -using System.Linq; -using System.Text; using System.Threading; using System.Threading.Tasks; -using MongoDB.Driver.Core.Clusters; using MongoDB.Driver.Core.Misc; +using MongoDB.Driver.Core.Servers; namespace MongoDB.Driver.Core.Bindings { @@ -76,6 +74,20 @@ public Task GetReadChannelSourceAsync(CancellationToken ca return _reference.Instance.GetReadChannelSourceAsync(cancellationToken); } + /// + public IChannelSourceHandle GetReadChannelSource(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + ThrowIfDisposed(); + return _reference.Instance.GetReadChannelSource(deprioritizedServers, cancellationToken); + } + + /// + public Task GetReadChannelSourceAsync(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + ThrowIfDisposed(); + return _reference.Instance.GetReadChannelSourceAsync(deprioritizedServers, cancellationToken); + } + /// public void Dispose() { diff --git a/src/MongoDB.Driver.Core/Core/Bindings/ReadPreferenceBinding.cs b/src/MongoDB.Driver.Core/Core/Bindings/ReadPreferenceBinding.cs index 5790a7e5be9..a84fa4fe932 100644 --- a/src/MongoDB.Driver.Core/Core/Bindings/ReadPreferenceBinding.cs +++ b/src/MongoDB.Driver.Core/Core/Bindings/ReadPreferenceBinding.cs @@ -14,6 +14,7 @@ */ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using MongoDB.Driver.Core.Clusters; @@ -66,17 +67,29 @@ public ICoreSessionHandle Session // methods /// public IChannelSourceHandle GetReadChannelSource(CancellationToken cancellationToken) + { + return GetReadChannelSource(null, cancellationToken); + } + + /// + public Task GetReadChannelSourceAsync(CancellationToken cancellationToken) + { + return GetReadChannelSourceAsync(null, cancellationToken); + } + + /// + public IChannelSourceHandle GetReadChannelSource(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) { ThrowIfDisposed(); - var server = _cluster.SelectServerAndPinIfNeeded(_session, _serverSelector, cancellationToken); + var server = _cluster.SelectServerAndPinIfNeeded(_session, _serverSelector, deprioritizedServers, cancellationToken); return GetChannelSourceHelper(server); } - /// - public async Task GetReadChannelSourceAsync(CancellationToken cancellationToken) + /// + public async Task GetReadChannelSourceAsync(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) { ThrowIfDisposed(); - var server = await _cluster.SelectServerAndPinIfNeededAsync(_session, _serverSelector, cancellationToken).ConfigureAwait(false); + var server = await _cluster.SelectServerAndPinIfNeededAsync(_session, _serverSelector, deprioritizedServers, cancellationToken).ConfigureAwait(false); return GetChannelSourceHelper(server); } diff --git a/src/MongoDB.Driver.Core/Core/Bindings/ReadWriteBindingHandle.cs b/src/MongoDB.Driver.Core/Core/Bindings/ReadWriteBindingHandle.cs index c205eb33ade..205e47dfdf4 100644 --- a/src/MongoDB.Driver.Core/Core/Bindings/ReadWriteBindingHandle.cs +++ b/src/MongoDB.Driver.Core/Core/Bindings/ReadWriteBindingHandle.cs @@ -15,13 +15,10 @@ using System; using System.Collections.Generic; -using System.Linq; -using System.Text; using System.Threading; using System.Threading.Tasks; -using MongoDB.Driver.Core.Clusters; using MongoDB.Driver.Core.Misc; -using MongoDB.Driver.Core.Operations; +using MongoDB.Driver.Core.Servers; namespace MongoDB.Driver.Core.Bindings { @@ -77,6 +74,20 @@ public Task GetReadChannelSourceAsync(CancellationToken ca return _reference.Instance.GetReadChannelSourceAsync(cancellationToken); } + /// + public IChannelSourceHandle GetReadChannelSource(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + ThrowIfDisposed(); + return _reference.Instance.GetReadChannelSource(deprioritizedServers, cancellationToken); + } + + /// + public Task GetReadChannelSourceAsync(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + ThrowIfDisposed(); + return _reference.Instance.GetReadChannelSourceAsync(deprioritizedServers, cancellationToken); + } + /// public IChannelSourceHandle GetWriteChannelSource(CancellationToken cancellationToken) { @@ -84,6 +95,13 @@ public IChannelSourceHandle GetWriteChannelSource(CancellationToken cancellation return _reference.Instance.GetWriteChannelSource(cancellationToken); } + /// + public IChannelSourceHandle GetWriteChannelSource(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + ThrowIfDisposed(); + return _reference.Instance.GetWriteChannelSource(deprioritizedServers, cancellationToken); + } + /// public IChannelSourceHandle GetWriteChannelSource(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken) { @@ -91,6 +109,13 @@ public IChannelSourceHandle GetWriteChannelSource(IMayUseSecondaryCriteria mayUs return _reference.Instance.GetWriteChannelSource(mayUseSecondary, cancellationToken); } + /// + public IChannelSourceHandle GetWriteChannelSource(IReadOnlyCollection deprioritizedServers, IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken) + { + ThrowIfDisposed(); + return _reference.Instance.GetWriteChannelSource(deprioritizedServers, mayUseSecondary, cancellationToken); + } + /// public Task GetWriteChannelSourceAsync(CancellationToken cancellationToken) { @@ -98,6 +123,13 @@ public Task GetWriteChannelSourceAsync(CancellationToken c return _reference.Instance.GetWriteChannelSourceAsync(cancellationToken); } + /// + public Task GetWriteChannelSourceAsync(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + ThrowIfDisposed(); + return _reference.Instance.GetWriteChannelSourceAsync(deprioritizedServers, cancellationToken); + } + /// public Task GetWriteChannelSourceAsync(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken) { @@ -105,6 +137,13 @@ public Task GetWriteChannelSourceAsync(IMayUseSecondaryCri return _reference.Instance.GetWriteChannelSourceAsync(mayUseSecondary, cancellationToken); } + /// + public Task GetWriteChannelSourceAsync(IReadOnlyCollection deprioritizedServers, IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken) + { + ThrowIfDisposed(); + return _reference.Instance.GetWriteChannelSourceAsync(deprioritizedServers, mayUseSecondary, cancellationToken); + } + /// public void Dispose() { diff --git a/src/MongoDB.Driver.Core/Core/Bindings/SingleServerReadBinding.cs b/src/MongoDB.Driver.Core/Core/Bindings/SingleServerReadBinding.cs index 4415c65d325..7666ef3e719 100644 --- a/src/MongoDB.Driver.Core/Core/Bindings/SingleServerReadBinding.cs +++ b/src/MongoDB.Driver.Core/Core/Bindings/SingleServerReadBinding.cs @@ -14,6 +14,7 @@ */ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using MongoDB.Driver.Core.Misc; @@ -77,6 +78,18 @@ public Task GetReadChannelSourceAsync(CancellationToken ca return Task.FromResult(GetChannelSourceHelper()); } + /// + public IChannelSourceHandle GetReadChannelSource(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + return GetReadChannelSource(cancellationToken); + } + + /// + public Task GetReadChannelSourceAsync(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + return GetReadChannelSourceAsync(cancellationToken); + } + /// public void Dispose() { diff --git a/src/MongoDB.Driver.Core/Core/Bindings/SingleServerReadWriteBinding.cs b/src/MongoDB.Driver.Core/Core/Bindings/SingleServerReadWriteBinding.cs index 09acb647105..8fec85d3249 100644 --- a/src/MongoDB.Driver.Core/Core/Bindings/SingleServerReadWriteBinding.cs +++ b/src/MongoDB.Driver.Core/Core/Bindings/SingleServerReadWriteBinding.cs @@ -14,6 +14,7 @@ */ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using MongoDB.Driver.Core.Misc; @@ -81,6 +82,18 @@ public Task GetReadChannelSourceAsync(CancellationToken ca return Task.FromResult(GetChannelSourceHelper()); } + /// + public IChannelSourceHandle GetReadChannelSource(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + return GetReadChannelSource(cancellationToken); + } + + /// + public Task GetReadChannelSourceAsync(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + return GetReadChannelSourceAsync(cancellationToken); + } + /// public IChannelSourceHandle GetWriteChannelSource(CancellationToken cancellationToken) { @@ -88,12 +101,24 @@ public IChannelSourceHandle GetWriteChannelSource(CancellationToken cancellation return GetChannelSourceHelper(); } + /// + public IChannelSourceHandle GetWriteChannelSource(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + return GetWriteChannelSource(cancellationToken); + } + /// public IChannelSourceHandle GetWriteChannelSource(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken) { return GetWriteChannelSource(cancellationToken); // ignore mayUseSecondary } + /// + public IChannelSourceHandle GetWriteChannelSource(IReadOnlyCollection deprioritizedServers, IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken) + { + return GetWriteChannelSource(mayUseSecondary, cancellationToken); + } + /// public Task GetWriteChannelSourceAsync(CancellationToken cancellationToken) { @@ -101,12 +126,24 @@ public Task GetWriteChannelSourceAsync(CancellationToken c return Task.FromResult(GetChannelSourceHelper()); } + /// + public Task GetWriteChannelSourceAsync(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + return GetWriteChannelSourceAsync(cancellationToken); + } + /// public Task GetWriteChannelSourceAsync(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken) { return GetWriteChannelSourceAsync(cancellationToken); // ignore mayUseSecondary } + /// + public Task GetWriteChannelSourceAsync(IReadOnlyCollection deprioritizedServers, IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken) + { + return GetWriteChannelSourceAsync(mayUseSecondary, cancellationToken); + } + private IChannelSourceHandle GetChannelSourceHelper() { return new ChannelSourceHandle(new ServerChannelSource(_server, _session.Fork())); diff --git a/src/MongoDB.Driver.Core/Core/Bindings/WritableServerBinding.cs b/src/MongoDB.Driver.Core/Core/Bindings/WritableServerBinding.cs index 8bb61bb5563..44f323d1360 100644 --- a/src/MongoDB.Driver.Core/Core/Bindings/WritableServerBinding.cs +++ b/src/MongoDB.Driver.Core/Core/Bindings/WritableServerBinding.cs @@ -14,6 +14,7 @@ */ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using MongoDB.Driver.Core.Clusters; @@ -62,58 +63,103 @@ public ICoreSessionHandle Session /// public IChannelSourceHandle GetReadChannelSource(CancellationToken cancellationToken) { - ThrowIfDisposed(); - var server = _cluster.SelectServerAndPinIfNeeded(_session, WritableServerSelector.Instance, cancellationToken); + return GetReadChannelSource(null, cancellationToken); + } + /// + public Task GetReadChannelSourceAsync(CancellationToken cancellationToken) + { + return GetReadChannelSourceAsync(null, cancellationToken); + } + + /// + public IChannelSourceHandle GetReadChannelSource(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) + { + ThrowIfDisposed(); + var server = _cluster.SelectServerAndPinIfNeeded(_session, WritableServerSelector.Instance, deprioritizedServers, cancellationToken); return CreateServerChannelSource(server); } - /// - public async Task GetReadChannelSourceAsync(CancellationToken cancellationToken) + /// + public async Task GetReadChannelSourceAsync(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) { ThrowIfDisposed(); - var server = await _cluster.SelectServerAndPinIfNeededAsync(_session, WritableServerSelector.Instance, cancellationToken).ConfigureAwait(false); + var server = await _cluster.SelectServerAndPinIfNeededAsync(_session, WritableServerSelector.Instance, deprioritizedServers, cancellationToken).ConfigureAwait(false); return CreateServerChannelSource(server); } /// public IChannelSourceHandle GetWriteChannelSource(CancellationToken cancellationToken) + { + return GetWriteChannelSource(deprioritizedServers: null, cancellationToken); + } + + /// + public IChannelSourceHandle GetWriteChannelSource(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) { ThrowIfDisposed(); - var server = _cluster.SelectServerAndPinIfNeeded(_session, WritableServerSelector.Instance, cancellationToken); + var server = _cluster.SelectServerAndPinIfNeeded(_session, WritableServerSelector.Instance, deprioritizedServers, cancellationToken); return CreateServerChannelSource(server); } /// public IChannelSourceHandle GetWriteChannelSource(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken) + { + return GetWriteChannelSource(null, mayUseSecondary, cancellationToken); + } + + /// + public IChannelSourceHandle GetWriteChannelSource(IReadOnlyCollection deprioritizedServers, IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken) { if (IsSessionPinnedToServer()) { throw new InvalidOperationException($"This overload of {nameof(GetWriteChannelSource)} cannot be called when pinned to a server."); } - var selector = new WritableServerSelector(mayUseSecondary); + var writableServerSelector = new WritableServerSelector(mayUseSecondary); + + var selector = deprioritizedServers != null + ? (IServerSelector)new CompositeServerSelector(new IServerSelector[] { new PriorityServerSelector(deprioritizedServers), writableServerSelector }) + : writableServerSelector; + var server = _cluster.SelectServer(selector, cancellationToken); return CreateServerChannelSource(server); } /// - public async Task GetWriteChannelSourceAsync(CancellationToken cancellationToken) + public Task GetWriteChannelSourceAsync(CancellationToken cancellationToken) + { + return GetWriteChannelSourceAsync(deprioritizedServers: null, cancellationToken); + } + + /// + public async Task GetWriteChannelSourceAsync(IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) { ThrowIfDisposed(); - var server = await _cluster.SelectServerAndPinIfNeededAsync(_session, WritableServerSelector.Instance, cancellationToken).ConfigureAwait(false); + var server = await _cluster.SelectServerAndPinIfNeededAsync(_session, WritableServerSelector.Instance, deprioritizedServers, cancellationToken).ConfigureAwait(false); return CreateServerChannelSource(server); } /// - public async Task GetWriteChannelSourceAsync(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken) + public Task GetWriteChannelSourceAsync(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken) + { + return GetWriteChannelSourceAsync(null, mayUseSecondary, cancellationToken); + } + + /// + public async Task GetWriteChannelSourceAsync(IReadOnlyCollection deprioritizedServers, IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken) { if (IsSessionPinnedToServer()) { throw new InvalidOperationException($"This overload of {nameof(GetWriteChannelSource)} cannot be called when pinned to a server."); } - var selector = new WritableServerSelector(mayUseSecondary); + var writableServerSelector = new WritableServerSelector(mayUseSecondary); + + IServerSelector selector = deprioritizedServers != null + ? new CompositeServerSelector(new IServerSelector[] { new PriorityServerSelector(deprioritizedServers), writableServerSelector }) + : writableServerSelector; + var server = await _cluster.SelectServerAsync(selector, cancellationToken).ConfigureAwait(false); return CreateServerChannelSource(server); } diff --git a/src/MongoDB.Driver.Core/Core/Clusters/IClusterExtensions.cs b/src/MongoDB.Driver.Core/Core/Clusters/IClusterExtensions.cs index e5cabd39243..6b83a44715d 100644 --- a/src/MongoDB.Driver.Core/Core/Clusters/IClusterExtensions.cs +++ b/src/MongoDB.Driver.Core/Core/Clusters/IClusterExtensions.cs @@ -13,6 +13,7 @@ * limitations under the License. */ +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using MongoDB.Driver.Core.Bindings; @@ -30,6 +31,7 @@ internal static class IClusterExtensions this ICluster cluster, ICoreSessionHandle session, IServerSelector selector, + IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) { var pinnedServer = GetPinnedServerIfValid(cluster, session); @@ -38,6 +40,10 @@ internal static class IClusterExtensions return pinnedServer; } + selector = deprioritizedServers != null + ? new CompositeServerSelector(new[] { new PriorityServerSelector(deprioritizedServers), selector }) + : selector; + // Server selection also updates the cluster type, allowing us to to determine if the server // should be pinned. var server = cluster.SelectServer(selector, cancellationToken); @@ -49,6 +55,7 @@ internal static class IClusterExtensions this ICluster cluster, ICoreSessionHandle session, IServerSelector selector, + IReadOnlyCollection deprioritizedServers, CancellationToken cancellationToken) { var pinnedServer = GetPinnedServerIfValid(cluster, session); @@ -57,6 +64,10 @@ internal static class IClusterExtensions return pinnedServer; } + selector = deprioritizedServers != null + ? new CompositeServerSelector(new[] { new PriorityServerSelector(deprioritizedServers), selector }) + : selector; + // Server selection also updates the cluster type, allowing us to to determine if the server // should be pinned. var server = await cluster.SelectServerAsync(selector, cancellationToken).ConfigureAwait(false); diff --git a/src/MongoDB.Driver.Core/Core/Clusters/ServerSelectors/PriorityServerSelector.cs b/src/MongoDB.Driver.Core/Core/Clusters/ServerSelectors/PriorityServerSelector.cs new file mode 100644 index 00000000000..6d125a879a7 --- /dev/null +++ b/src/MongoDB.Driver.Core/Core/Clusters/ServerSelectors/PriorityServerSelector.cs @@ -0,0 +1,56 @@ +/* Copyright 2010-present MongoDB Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System.Collections.Generic; +using System.Linq; +using MongoDB.Driver.Core.Misc; +using MongoDB.Driver.Core.Servers; + +namespace MongoDB.Driver.Core.Clusters.ServerSelectors +{ + /// + /// Represents a server selector that selects servers based on a collection of servers to deprioritize. + /// + public sealed class PriorityServerSelector : IServerSelector + { + private readonly IReadOnlyCollection _deprioritizedServers; + + /// + /// Initializes a new instance of the class. + /// + /// The collection of servers to deprioritize. + public PriorityServerSelector(IReadOnlyCollection deprioritizedServers) + { + _deprioritizedServers = Ensure.IsNotNullOrEmpty(deprioritizedServers, nameof(deprioritizedServers)) as IReadOnlyCollection; + } + + /// + public IEnumerable SelectServers(ClusterDescription cluster, IEnumerable servers) + { + // according to spec, we only do deprioritization in a sharded cluster. + if (cluster.Type != ClusterType.Sharded) + { + return servers; + } + + var filteredServers = servers.Where(description => _deprioritizedServers.All(d => d.EndPoint != description.EndPoint)).ToList(); + + return filteredServers.Any() ? filteredServers : servers; + } + + /// + public override string ToString() => $"PriorityServerSelector{{{{ Deprioritized servers: {string.Join(", ", _deprioritizedServers.Select(s => s.EndPoint))} }}}}"; + } +} diff --git a/src/MongoDB.Driver.Core/Core/Operations/RetryableReadOperationExecutor.cs b/src/MongoDB.Driver.Core/Core/Operations/RetryableReadOperationExecutor.cs index d17cc437ab3..cdb74827dc0 100644 --- a/src/MongoDB.Driver.Core/Core/Operations/RetryableReadOperationExecutor.cs +++ b/src/MongoDB.Driver.Core/Core/Operations/RetryableReadOperationExecutor.cs @@ -51,7 +51,7 @@ public static TResult Execute(IRetryableReadOperation operatio try { - context.ReplaceChannelSource(context.Binding.GetReadChannelSource(cancellationToken)); + context.ReplaceChannelSource(context.Binding.GetReadChannelSource(new[] { context.ChannelSource.ServerDescription }, cancellationToken)); context.ReplaceChannel(context.ChannelSource.GetChannel(cancellationToken)); } catch @@ -96,7 +96,7 @@ public static async Task ExecuteAsync(IRetryableReadOperation< try { - context.ReplaceChannelSource(context.Binding.GetReadChannelSource(cancellationToken)); + context.ReplaceChannelSource(context.Binding.GetReadChannelSource(new[] { context.ChannelSource.ServerDescription }, cancellationToken)); context.ReplaceChannel(context.ChannelSource.GetChannel(cancellationToken)); } catch diff --git a/src/MongoDB.Driver.Core/Core/Operations/RetryableWriteOperationExecutor.cs b/src/MongoDB.Driver.Core/Core/Operations/RetryableWriteOperationExecutor.cs index 86087929ae7..6e73af1e758 100644 --- a/src/MongoDB.Driver.Core/Core/Operations/RetryableWriteOperationExecutor.cs +++ b/src/MongoDB.Driver.Core/Core/Operations/RetryableWriteOperationExecutor.cs @@ -53,7 +53,7 @@ public static TResult Execute(IRetryableWriteOperation operati try { - context.ReplaceChannelSource(context.Binding.GetWriteChannelSource(cancellationToken)); + context.ReplaceChannelSource(context.Binding.GetWriteChannelSource(new[] { context.ChannelSource.ServerDescription }, cancellationToken)); context.ReplaceChannel(context.ChannelSource.GetChannel(cancellationToken)); } catch @@ -104,7 +104,7 @@ public static async Task ExecuteAsync(IRetryableWriteOperation try { - context.ReplaceChannelSource(await context.Binding.GetWriteChannelSourceAsync(cancellationToken).ConfigureAwait(false)); + context.ReplaceChannelSource(await context.Binding.GetWriteChannelSourceAsync(new[] { context.ChannelSource.ServerDescription }, cancellationToken).ConfigureAwait(false)); context.ReplaceChannel(await context.ChannelSource.GetChannelAsync(cancellationToken).ConfigureAwait(false)); } catch diff --git a/tests/MongoDB.Driver.Core.Tests/Core/Bindings/WritableServerBindingTests.cs b/tests/MongoDB.Driver.Core.Tests/Core/Bindings/WritableServerBindingTests.cs index 9ee482c2f6a..55a9d3eca62 100644 --- a/tests/MongoDB.Driver.Core.Tests/Core/Bindings/WritableServerBindingTests.cs +++ b/tests/MongoDB.Driver.Core.Tests/Core/Bindings/WritableServerBindingTests.cs @@ -200,6 +200,48 @@ public void Session_should_return_expected_result() } } + [Theory] + [ParameterAttributeData] + public async Task GetWriteChannelSource_should_use_a_composite_server_selector_to_select_the_server_from_the_cluster_when_deprioritized_servers_present( + [Values(false, true)] + bool async) + { + var subject = new WritableServerBinding(_mockCluster.Object, NoCoreSession.NewHandle()); + var selectedServer = new Mock().Object; + + var clusterId = new ClusterId(); + var endPoint = new DnsEndPoint("localhost", 27017); + var server = new ServerDescription(new ServerId(clusterId, endPoint), endPoint); +#pragma warning disable CS0618 // Type or member is obsolete + var initialClusterDescription = new ClusterDescription( + clusterId, + ClusterConnectionMode.Sharded, + ClusterType.Unknown, + new[] { server }); +#pragma warning restore CS0618 // Type or member is obsolete + var finalClusterDescription = initialClusterDescription.WithType(ClusterType.Sharded); + _mockCluster.SetupSequence(c => c.Description).Returns(initialClusterDescription).Returns(finalClusterDescription); + + var deprioritizedServers = new ServerDescription[] { server }; + + if (async) + { + _mockCluster.Setup(c => c.SelectServerAsync(It.Is(cp => cp.ToString().Contains("PriorityServerSelector")), CancellationToken.None)).Returns(Task.FromResult(selectedServer)); + + await subject.GetWriteChannelSourceAsync(deprioritizedServers, CancellationToken.None); + + _mockCluster.Verify(c => c.SelectServerAsync(It.Is(cp => cp.ToString().Contains("PriorityServerSelector")), CancellationToken.None), Times.Once); + } + else + { + _mockCluster.Setup(c => c.SelectServer(It.Is(cp => cp.ToString().Contains("PriorityServerSelector")), CancellationToken.None)).Returns(selectedServer); + + subject.GetWriteChannelSource(deprioritizedServers, CancellationToken.None); + + _mockCluster.Verify(c => c.SelectServer(It.Is(c => c.ToString().Contains("PriorityServerSelector")), CancellationToken.None), Times.Once); + } + } + [Theory] [ParameterAttributeData] public void GetWriteChannelSource_with_mayUseSecondary_should_pass_mayUseSecondary_to_server_selector( diff --git a/tests/MongoDB.Driver.Core.Tests/Core/Clusters/ClusterTests.cs b/tests/MongoDB.Driver.Core.Tests/Core/Clusters/ClusterTests.cs index d84ab3f6465..d367ab5c0aa 100644 --- a/tests/MongoDB.Driver.Core.Tests/Core/Clusters/ClusterTests.cs +++ b/tests/MongoDB.Driver.Core.Tests/Core/Clusters/ClusterTests.cs @@ -413,6 +413,93 @@ public void SelectServer_should_throw_if_any_servers_are_incompatible(int min, i _capturedEvents.Any().Should().BeFalse(); } + [Theory] + [ParameterAttributeData] + public async Task SelectServer_should_ignore_deprioritized_servers_if_cluster_is_sharded( + [Values(false, true)] + bool async) + { +#pragma warning disable CS0618 // Type or member is obsolete + var subject = CreateSubject(ClusterConnectionMode.Sharded); +#pragma warning restore CS0618 // Type or member is obsolete + + subject.Initialize(); + + var connected1 = ServerDescriptionHelper.Connected(subject.Description.ClusterId, new DnsEndPoint("localhost", 27017)); + var connected2 = ServerDescriptionHelper.Connected(subject.Description.ClusterId, new DnsEndPoint("localhost", 27018)); + var connected3 = ServerDescriptionHelper.Connected(subject.Description.ClusterId, new DnsEndPoint("localhost", 27019)); + + subject.SetServerDescriptions(connected1, connected2, connected3); + + var deprioritizedServers = new List { connected1 }; + + var selector = new PriorityServerSelector(deprioritizedServers); + + for (int i = 0; i < 15; i++) + { + _capturedEvents.Clear(); + + IServer result; + if (async) + { + result = await subject.SelectServerAsync(selector, CancellationToken.None); + } + else + { + result = subject.SelectServer(selector, CancellationToken.None); + } + + result.Should().NotBeNull(); + + deprioritizedServers.Should().NotContain(d => d.EndPoint == result.Description.EndPoint); + + _capturedEvents.Next().Should().BeOfType(); + _capturedEvents.Next().Should().BeOfType(); + _capturedEvents.Any().Should().BeFalse(); + } + } + + [Theory] + [ParameterAttributeData] + public async Task SelectServer_should_return_deprioritized_servers_if_no_other_servers_exist_or_cluster_not_sharded( + [Values(false, true)] bool async, + [Values(false, true)] bool isSharded) + { +#pragma warning disable CS0618 // Type or member is obsolete + StubCluster subject = isSharded ? CreateSubject(ClusterConnectionMode.Sharded) : CreateSubject(); +#pragma warning restore CS0618 // Type or member is obsolete + + subject.Initialize(); + + var connected1 = ServerDescriptionHelper.Connected(subject.Description.ClusterId, new DnsEndPoint("localhost", 27017)); + var connected2 = ServerDescriptionHelper.Connected(subject.Description.ClusterId, new DnsEndPoint("localhost", 27018)); + + subject.SetServerDescriptions(connected1, connected2); + + var deprioritizedServers = new List { connected1, connected2 }; + + var selector = new PriorityServerSelector(deprioritizedServers); + + _capturedEvents.Clear(); + IServer result; + if (async) + { + result = await subject.SelectServerAsync(selector, CancellationToken.None); + } + else + { + result = subject.SelectServer(selector, CancellationToken.None); + } + + result.Should().NotBeNull(); + + deprioritizedServers.Should().Contain(d => d.EndPoint == result.Description.EndPoint); + + _capturedEvents.Next().Should().BeOfType(); + _capturedEvents.Next().Should().BeOfType(); + _capturedEvents.Any().Should().BeFalse(); + } + [Fact] public void StartSession_should_return_expected_result() { @@ -431,6 +518,10 @@ public void DescriptionChanged_should_be_raised_when_the_description_changes() int count = 0; var subject = CreateSubject(); subject.Initialize(); + + // clear the ClusterDescriptionChanged event from initializing the StubCluster + _capturedEvents.Clear(); + subject.DescriptionChanged += (o, e) => count++; subject.SetServerDescriptions(ServerDescriptionHelper.Connected(subject.Description.ClusterId)); @@ -591,6 +682,7 @@ private class StubCluster : Cluster public override void Initialize() { base.Initialize(); + UpdateClusterDescription(Description.WithType(Settings.GetInitialClusterType())); } public void RemoveServer(EndPoint endPoint) diff --git a/tests/MongoDB.Driver.Core.Tests/Core/Clusters/ServerSelectors/PriorityServerSelectorTests.cs b/tests/MongoDB.Driver.Core.Tests/Core/Clusters/ServerSelectors/PriorityServerSelectorTests.cs new file mode 100644 index 00000000000..16aa265ad29 --- /dev/null +++ b/tests/MongoDB.Driver.Core.Tests/Core/Clusters/ServerSelectors/PriorityServerSelectorTests.cs @@ -0,0 +1,83 @@ +/* Copyright 2010-present MongoDB Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System.Linq; +using System.Net; +using FluentAssertions; +using MongoDB.Driver.Core.Helpers; +using MongoDB.Driver.Core.Servers; +using Xunit; + +namespace MongoDB.Driver.Core.Clusters.ServerSelectors; + +public class PriorityServerSelectorTests +{ + private readonly ClusterDescription _description; + private readonly ServerDescription _server1; + private readonly ServerDescription _server2; + private readonly ServerDescription _server3; + + public PriorityServerSelectorTests() + { + var clusterId = new ClusterId(); + + _server1 = ServerDescriptionHelper.Connected(clusterId, new DnsEndPoint("localhost", 27017)); + _server2 = ServerDescriptionHelper.Connected(clusterId, new DnsEndPoint("localhost", 27018)); + _server3 = ServerDescriptionHelper.Connected(clusterId, new DnsEndPoint("localhost", 27019)); + +#pragma warning disable CS0618 // Type or member is obsolete + _description = new ClusterDescription( + clusterId, + ClusterConnectionMode.Sharded, + ClusterType.Sharded, + new[] { _server1, _server2, _server3 }); +#pragma warning restore CS0618 // Type or member is obsolete + } + + [Fact] + public void Should_select_all_the_servers_not_deprioritized() + { + var subject = new PriorityServerSelector(new[] { _server1, _server2 }); + + var result = subject.SelectServers(_description, _description.Servers).ToList(); + + result.Count.Should().Be(1); + result.Should().BeEquivalentTo(_server3); + } + + [Fact] + public void Should_select_all_the_servers_if_all_servers_are_deprioritized() + { + var subject = new PriorityServerSelector(new[] { _server1, _server2, _server3}); + + var result = subject.SelectServers(_description, _description.Servers).ToList(); + + result.Count.Should().Be(3); + result.Should().BeEquivalentTo(_description.Servers); + } + + [Fact] + public void Should_ignore_deprioritized_servers_if_not_in_sharded_mode() + { + var changedDescription = _description.WithType(ClusterType.Unknown); + + var subject = new PriorityServerSelector(new[] { _server2, _server3 }); + + var result = subject.SelectServers(changedDescription, _description.Servers).ToList(); + + result.Count.Should().Be(3); + result.Should().BeEquivalentTo(_description.Servers); + } +} diff --git a/tests/MongoDB.Driver.Tests/Specifications/retryable-reads/RetryableReadsProseTests.cs b/tests/MongoDB.Driver.Tests/Specifications/retryable-reads/RetryableReadsProseTests.cs index 79f34dbafca..db8330df4ed 100644 --- a/tests/MongoDB.Driver.Tests/Specifications/retryable-reads/RetryableReadsProseTests.cs +++ b/tests/MongoDB.Driver.Tests/Specifications/retryable-reads/RetryableReadsProseTests.cs @@ -20,15 +20,16 @@ using FluentAssertions; using MongoDB.Bson; using MongoDB.Bson.TestHelpers; -using MongoDB.TestHelpers.XunitExtensions; using MongoDB.Driver.Core; using MongoDB.Driver.Core.Bindings; +using MongoDB.Driver.Core.Clusters; using MongoDB.Driver.Core.Clusters.ServerSelectors; using MongoDB.Driver.Core.Events; using MongoDB.Driver.Core.Misc; using MongoDB.Driver.Core.TestHelpers; using MongoDB.Driver.Core.TestHelpers.XunitExtensions; using MongoDB.Driver.TestHelpers; +using MongoDB.TestHelpers.XunitExtensions; using Xunit; namespace MongoDB.Driver.Tests.Specifications.retryable_reads @@ -60,7 +61,7 @@ public async Task PoolClearedError_read_retryablity_test([Values(true, false)] b IServerSelector failPointSelector = new ReadPreferenceServerSelector(ReadPreference.Primary); var settings = DriverTestConfiguration.GetClientSettings(); - if (CoreTestConfiguration.Cluster.Description.Type == Core.Clusters.ClusterType.Sharded) + if (CoreTestConfiguration.Cluster.Description.Type == ClusterType.Sharded) { var serverAddress = settings.Servers.First(); settings.Servers = new[] { serverAddress }; @@ -120,6 +121,104 @@ public async Task PoolClearedError_read_retryablity_test([Values(true, false)] b eventCapturer.Events.OfType().Count().Should().Be(1); } + [Fact] + public void Sharded_cluster_retryable_reads_are_retried_on_different_mongos_if_available() + { + RequireServer.Check() + .Supports(Feature.FailPointsFailCommandForSharded) + .ClusterTypes(ClusterType.Sharded) + .MultipleMongoses(true); + + var failPointCommand = BsonDocument.Parse( + @"{ + configureFailPoint: ""failCommand"", + mode: { times: 1 }, + data: + { + failCommands: [""find""], + errorCode: 6 + } + }"); + + var eventCapturer = new EventCapturer().CaptureCommandEvents("find"); + + using var client = DriverTestConfiguration.CreateDisposableClient( + s => + { + s.RetryReads = true; + s.ClusterConfigurator = b => b.Subscribe(eventCapturer); + } + , null, useMultipleShardRouters: true); + + var failPointServer1 = client.Cluster.SelectServer(new EndPointServerSelector(client.Cluster.Description.Servers[0].EndPoint), default); + var failPointServer2 = client.Cluster.SelectServer(new EndPointServerSelector(client.Cluster.Description.Servers[1].EndPoint), default); + + using var failPoint1 = FailPoint.Configure(failPointServer1, NoCoreSession.NewHandle(), failPointCommand); + using var failPoint2 = FailPoint.Configure(failPointServer2, NoCoreSession.NewHandle(), failPointCommand); + + var database = client.GetDatabase(DriverTestConfiguration.DatabaseNamespace.DatabaseName); + var collection = database.GetCollection(DriverTestConfiguration.CollectionNamespace.CollectionName); + + Assert.Throws(() => + { + collection.Find(Builders.Filter.Empty).ToList(); + }); + + var failedEvents = eventCapturer.Events.OfType().ToArray(); + failedEvents.Length.Should().Be(2); + + failedEvents[0].CommandName.Should().Be(failedEvents[1].CommandName).And.Be("find"); + failedEvents[0].ConnectionId.ServerId.Should().NotBe(failedEvents[1].ConnectionId.ServerId); + } + + [Fact] + public void Sharded_cluster_retryable_reads_are_retried_on_same_mongos_if_no_other_is_available() + { + RequireServer.Check() + .Supports(Feature.FailPointsFailCommandForSharded) + .ClusterTypes(ClusterType.Sharded); + + var failPointCommand = BsonDocument.Parse( + @"{ + configureFailPoint: ""failCommand"", + mode: { times: 1 }, + data: + { + failCommands: [""find""], + errorCode: 6 + } + }"); + + var eventCapturer = new EventCapturer().CaptureCommandEvents("find"); + + using var client = DriverTestConfiguration.CreateDisposableClient( + s => + { + s.RetryReads = true; + s.DirectConnection = false; + s.ClusterConfigurator = b => b.Subscribe(eventCapturer); + } + , null, useMultipleShardRouters: false); + + var failPointServer = client.Cluster.SelectServer(new EndPointServerSelector(client.Cluster.Description.Servers[0].EndPoint), default); + + using var failPoint = FailPoint.Configure(failPointServer, NoCoreSession.NewHandle(), failPointCommand); + + var database = client.GetDatabase(DriverTestConfiguration.DatabaseNamespace.DatabaseName); + var collection = database.GetCollection(DriverTestConfiguration.CollectionNamespace.CollectionName); + + collection.Find(Builders.Filter.Empty).ToList(); + + var failedEvents = eventCapturer.Events.OfType().ToArray(); + var succeededEvents = eventCapturer.Events.OfType().ToArray(); + + failedEvents.Length.Should().Be(1); + succeededEvents.Length.Should().Be(1); + + failedEvents[0].CommandName.Should().Be(succeededEvents[0].CommandName).And.Be("find"); + failedEvents[0].ConnectionId.ServerId.Should().Be(succeededEvents[0].ConnectionId.ServerId); + } + // private methods private DisposableMongoClient CreateClient(MongoClientSettings mongoClientSettings, EventCapturer eventCapturer, TimeSpan heartbeatInterval, string applicationName = null) { diff --git a/tests/MongoDB.Driver.Tests/Specifications/retryable-writes/prose-tests/RetryWriteOnOtherMongos.cs b/tests/MongoDB.Driver.Tests/Specifications/retryable-writes/prose-tests/RetryWriteOnOtherMongos.cs new file mode 100644 index 00000000000..7240eb86c08 --- /dev/null +++ b/tests/MongoDB.Driver.Tests/Specifications/retryable-writes/prose-tests/RetryWriteOnOtherMongos.cs @@ -0,0 +1,133 @@ +/* Copyright 2010-present MongoDB Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System.Linq; +using FluentAssertions; +using MongoDB.Bson; +using MongoDB.Driver.Core; +using MongoDB.Driver.Core.Bindings; +using MongoDB.Driver.Core.Clusters; +using MongoDB.Driver.Core.Clusters.ServerSelectors; +using MongoDB.Driver.Core.Events; +using MongoDB.Driver.Core.Misc; +using MongoDB.Driver.Core.TestHelpers; +using MongoDB.Driver.Core.TestHelpers.XunitExtensions; +using Xunit; + +namespace MongoDB.Driver.Tests.Specifications.retryable_writes.prose_tests +{ + public class RetryWriteOnOtherMongos + { + [Fact] + public void Sharded_cluster_retryable_writes_are_retried_on_different_mongos_if_available() + { + RequireServer.Check() + .Supports(Feature.FailPointsFailCommandForSharded) + .ClusterTypes(ClusterType.Sharded) + .MultipleMongoses(true); + + var failPointCommand = BsonDocument.Parse( + @"{ + configureFailPoint: ""failCommand"", + mode: { times: 1 }, + data: + { + failCommands: [""insert""], + errorCode: 6, + errorLabels: [""RetryableWriteError""] + } + }"); + + var eventCapturer = new EventCapturer().CaptureCommandEvents("insert"); + + using var client = DriverTestConfiguration.CreateDisposableClient( + s => + { + s.RetryWrites = true; + s.ClusterConfigurator = b => b.Subscribe(eventCapturer); + } + , null, useMultipleShardRouters: true); + + var failPointServer1 = client.Cluster.SelectServer(new EndPointServerSelector(client.Cluster.Description.Servers[0].EndPoint), default); + var failPointServer2 = client.Cluster.SelectServer(new EndPointServerSelector(client.Cluster.Description.Servers[1].EndPoint), default); + + using var failPoint1 = FailPoint.Configure(failPointServer1, NoCoreSession.NewHandle(), failPointCommand); + using var failPoint2 = FailPoint.Configure(failPointServer2, NoCoreSession.NewHandle(), failPointCommand); + + var database = client.GetDatabase(DriverTestConfiguration.DatabaseNamespace.DatabaseName); + var collection = database.GetCollection(DriverTestConfiguration.CollectionNamespace.CollectionName); + + Assert.Throws(() => + { + collection.InsertOne(new BsonDocument("x", 1)); + }); + + var failedEvents = eventCapturer.Events.OfType().ToArray(); + failedEvents.Length.Should().Be(2); + + failedEvents[0].CommandName.Should().Be(failedEvents[1].CommandName).And.Be("insert"); + failedEvents[0].ConnectionId.ServerId.Should().NotBe(failedEvents[1].ConnectionId.ServerId); + } + + [Fact] + public void Sharded_cluster_retryable_writes_are_retried_on_same_mongo_if_no_other_is_available() + { + RequireServer.Check() + .Supports(Feature.FailPointsFailCommandForSharded) + .ClusterTypes(ClusterType.Sharded); + + var failPointCommand = BsonDocument.Parse( + @"{ + configureFailPoint: ""failCommand"", + mode: { times: 1 }, + data: + { + failCommands: [""insert""], + errorCode: 6, + errorLabels: [""RetryableWriteError""] + } + }"); + + var eventCapturer = new EventCapturer().CaptureCommandEvents("insert"); + + using var client = DriverTestConfiguration.CreateDisposableClient( + s => + { + s.RetryWrites = true; + s.DirectConnection = false; + s.ClusterConfigurator = b => b.Subscribe(eventCapturer); + } + , null, useMultipleShardRouters: false); + + var failPointServer = client.Cluster.SelectServer(new EndPointServerSelector(client.Cluster.Description.Servers[0].EndPoint), default); + + using var failPoint = FailPoint.Configure(failPointServer, NoCoreSession.NewHandle(), failPointCommand); + + var database = client.GetDatabase(DriverTestConfiguration.DatabaseNamespace.DatabaseName); + var collection = database.GetCollection(DriverTestConfiguration.CollectionNamespace.CollectionName); + + collection.InsertOne(new BsonDocument("x", 1)); + + var failedEvents = eventCapturer.Events.OfType().ToArray(); + var succeededEvents = eventCapturer.Events.OfType().ToArray(); + + failedEvents.Length.Should().Be(1); + succeededEvents.Length.Should().Be(1); + + failedEvents[0].CommandName.Should().Be(succeededEvents[0].CommandName).And.Be("insert"); + failedEvents[0].ConnectionId.ServerId.Should().Be(succeededEvents[0].ConnectionId.ServerId); + } + } +}