Skip to content

Commit

Permalink
CSHARP-3757: Redirect read/write retries to other mongos if possible (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
adelinowona committed Jun 12, 2024
1 parent 61d2c77 commit 33e14d0
Show file tree
Hide file tree
Showing 19 changed files with 844 additions and 31 deletions.
17 changes: 14 additions & 3 deletions src/MongoDB.Driver.Core/Core/Bindings/ChannelReadBinding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
*/

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver.Core.Clusters;
using MongoDB.Driver.Core.Connections;
using MongoDB.Driver.Core.Misc;
using MongoDB.Driver.Core.Servers;

Expand Down Expand Up @@ -51,7 +50,7 @@ public ChannelReadBinding(IServer server, IChannelHandle channel, ReadPreference
_session = Ensure.IsNotNull(session, nameof(session));
}

// properties
// properties
/// <inheritdoc/>
public ReadPreference ReadPreference
{
Expand Down Expand Up @@ -90,6 +89,18 @@ public Task<IChannelSourceHandle> GetReadChannelSourceAsync(CancellationToken ca
return Task.FromResult<IChannelSourceHandle>(GetReadChannelSourceHelper());
}

/// <inheritdoc />
public IChannelSourceHandle GetReadChannelSource(IReadOnlyCollection<ServerDescription> deprioritizedServers, CancellationToken cancellationToken)
{
return GetReadChannelSource(cancellationToken);
}

/// <inheritdoc />
public Task<IChannelSourceHandle> GetReadChannelSourceAsync(IReadOnlyCollection<ServerDescription> deprioritizedServers, CancellationToken cancellationToken)
{
return GetReadChannelSourceAsync(cancellationToken);
}

private IChannelSourceHandle GetReadChannelSourceHelper()
{
return new ChannelSourceHandle(new ChannelChannelSource(_server, _channel.Fork(), _session.Fork()));
Expand Down
37 changes: 37 additions & 0 deletions src/MongoDB.Driver.Core/Core/Bindings/ChannelReadWriteBinding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver.Core.Misc;
Expand Down Expand Up @@ -85,32 +86,68 @@ public Task<IChannelSourceHandle> GetReadChannelSourceAsync(CancellationToken ca
return Task.FromResult(GetChannelSourceHelper());
}

/// <inheritdoc />
public IChannelSourceHandle GetReadChannelSource(IReadOnlyCollection<ServerDescription> deprioritizedServers, CancellationToken cancellationToken)
{
return GetReadChannelSource(cancellationToken);
}

/// <inheritdoc />
public Task<IChannelSourceHandle> GetReadChannelSourceAsync(IReadOnlyCollection<ServerDescription> deprioritizedServers, CancellationToken cancellationToken)
{
return GetReadChannelSourceAsync(cancellationToken);
}

/// <inheritdoc/>
public IChannelSourceHandle GetWriteChannelSource(CancellationToken cancellationToken)
{
ThrowIfDisposed();
return GetChannelSourceHelper();
}

/// <inheritdoc />
public IChannelSourceHandle GetWriteChannelSource(IReadOnlyCollection<ServerDescription> deprioritizedServers, CancellationToken cancellationToken)
{
return GetWriteChannelSource(cancellationToken);
}

/// <inheritdoc/>
public IChannelSourceHandle GetWriteChannelSource(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken)
{
return GetWriteChannelSource(cancellationToken); // ignore mayUseSecondary
}

/// <inheritdoc />
public IChannelSourceHandle GetWriteChannelSource(IReadOnlyCollection<ServerDescription> deprioritizedServers, IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken)
{
return GetWriteChannelSource(mayUseSecondary, cancellationToken);
}

/// <inheritdoc/>
public Task<IChannelSourceHandle> GetWriteChannelSourceAsync(CancellationToken cancellationToken)
{
ThrowIfDisposed();
return Task.FromResult(GetChannelSourceHelper());
}

/// <inheritdoc />
public Task<IChannelSourceHandle> GetWriteChannelSourceAsync(IReadOnlyCollection<ServerDescription> deprioritizedServers, CancellationToken cancellationToken)
{
return GetWriteChannelSourceAsync(cancellationToken);
}

/// <inheritdoc/>
public Task<IChannelSourceHandle> GetWriteChannelSourceAsync(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken)
{
return GetWriteChannelSourceAsync(cancellationToken); // ignore mayUseSecondary
}

/// <inheritdoc />
public Task<IChannelSourceHandle> GetWriteChannelSourceAsync(IReadOnlyCollection<ServerDescription> deprioritizedServers, IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken)
{
return GetWriteChannelSourceAsync(mayUseSecondary, cancellationToken);
}

// private methods
private IChannelSourceHandle GetChannelSourceHelper()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
*/

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver.Core.Misc;
using MongoDB.Driver.Core.Servers;

namespace MongoDB.Driver.Core.Bindings
{
Expand Down Expand Up @@ -73,32 +75,68 @@ public Task<IChannelSourceHandle> GetReadChannelSourceAsync(CancellationToken ca
return Task.FromResult(GetChannelSourceHelper());
}

/// <inheritdoc />
public IChannelSourceHandle GetReadChannelSource(IReadOnlyCollection<ServerDescription> deprioritizedServers, CancellationToken cancellationToken)
{
return GetReadChannelSource(cancellationToken);
}

/// <inheritdoc />
public Task<IChannelSourceHandle> GetReadChannelSourceAsync(IReadOnlyCollection<ServerDescription> deprioritizedServers, CancellationToken cancellationToken)
{
return GetReadChannelSourceAsync(cancellationToken);
}

/// <inheritdoc/>
public IChannelSourceHandle GetWriteChannelSource(CancellationToken cancellationToken)
{
ThrowIfDisposed();
return GetChannelSourceHelper();
}

/// <inheritdoc />
public IChannelSourceHandle GetWriteChannelSource(IReadOnlyCollection<ServerDescription> deprioritizedServers, CancellationToken cancellationToken)
{
return GetWriteChannelSource(cancellationToken);
}

/// <inheritdoc/>
public IChannelSourceHandle GetWriteChannelSource(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken)
{
return GetWriteChannelSource(cancellationToken); // ignore mayUseSecondary
}

/// <inheritdoc />
public IChannelSourceHandle GetWriteChannelSource(IReadOnlyCollection<ServerDescription> deprioritizedServers, IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken)
{
return GetWriteChannelSource(mayUseSecondary, cancellationToken);
}

/// <inheritdoc/>
public Task<IChannelSourceHandle> GetWriteChannelSourceAsync(CancellationToken cancellationToken)
{
ThrowIfDisposed();
return Task.FromResult(GetChannelSourceHelper());
}

/// <inheritdoc />
public Task<IChannelSourceHandle> GetWriteChannelSourceAsync(IReadOnlyCollection<ServerDescription> deprioritizedServers, CancellationToken cancellationToken)
{
return GetWriteChannelSourceAsync(cancellationToken);
}

/// <inheritdoc/>
public Task<IChannelSourceHandle> GetWriteChannelSourceAsync(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken)
{
return GetWriteChannelSourceAsync(cancellationToken); // ignore mayUseSecondary
}

/// <inheritdoc />
public Task<IChannelSourceHandle> GetWriteChannelSourceAsync(IReadOnlyCollection<ServerDescription> deprioritizedServers, IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken)
{
return GetWriteChannelSourceAsync(mayUseSecondary, cancellationToken);
}

/// <inheritdoc/>
public void Dispose()
{
Expand Down
51 changes: 51 additions & 0 deletions src/MongoDB.Driver.Core/Core/Bindings/IBinding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver.Core.Servers;
Expand Down Expand Up @@ -61,6 +62,22 @@ public interface IReadBinding : IBinding
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A channel source.</returns>
Task<IChannelSourceHandle> GetReadChannelSourceAsync(CancellationToken cancellationToken);

/// <summary>
/// Gets a channel source for read operations while deprioritizing servers in the provided collection.
/// </summary>
/// <param name="deprioritizedServers">The deprioritized servers.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A channel source.</returns>
IChannelSourceHandle GetReadChannelSource(IReadOnlyCollection<ServerDescription> deprioritizedServers, CancellationToken cancellationToken);

/// <summary>
/// Gets a channel source for read operations while deprioritizing servers in the provided collection.
/// </summary>
/// <param name="deprioritizedServers">The deprioritized servers.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A channel source.</returns>
Task<IChannelSourceHandle> GetReadChannelSourceAsync(IReadOnlyCollection<ServerDescription> deprioritizedServers, CancellationToken cancellationToken);
}

/// <summary>
Expand All @@ -75,6 +92,14 @@ public interface IWriteBinding : IBinding
/// <returns>A channel source.</returns>
IChannelSourceHandle GetWriteChannelSource(CancellationToken cancellationToken);

/// <summary>
/// Gets a channel source for write operations while deprioritizing servers in the provided collection.
/// </summary>
/// <param name="deprioritizedServers">The deprioritized servers.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A channel source.</returns>
IChannelSourceHandle GetWriteChannelSource(IReadOnlyCollection<ServerDescription> deprioritizedServers, CancellationToken cancellationToken);

/// <summary>
/// Gets a channel source for write operations that may use a secondary.
/// </summary>
Expand All @@ -83,20 +108,46 @@ public interface IWriteBinding : IBinding
/// <returns>A channel source.</returns>
IChannelSourceHandle GetWriteChannelSource(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken);

/// <summary>
/// Gets a channel source for write operations that may use a secondary and deprioritizes servers in the provided collection.
/// </summary>
/// <param name="deprioritizedServers">The deprioritized servers.</param>
/// <param name="mayUseSecondary">The may use secondary criteria.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A channel source.</returns>
IChannelSourceHandle GetWriteChannelSource(IReadOnlyCollection<ServerDescription> deprioritizedServers, IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken);

/// <summary>
/// Gets a channel source for write operations.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A channel source.</returns>
Task<IChannelSourceHandle> GetWriteChannelSourceAsync(CancellationToken cancellationToken);

/// <summary>
/// Gets a channel source for write operations while deprioritizing servers in the provided collection.
/// </summary>
/// <param name="deprioritizedServers">The deprioritized servers.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A channel source.</returns>
Task<IChannelSourceHandle> GetWriteChannelSourceAsync(IReadOnlyCollection<ServerDescription> deprioritizedServers, CancellationToken cancellationToken);

/// <summary>
/// Gets a channel source for write operations that may use a secondary.
/// </summary>
/// <param name="mayUseSecondary">The may use secondary criteria.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A channel source.</returns>
Task<IChannelSourceHandle> GetWriteChannelSourceAsync(IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken);

/// <summary>
/// Gets a channel source for write operations that may use a secondary and deprioritizes servers in the provided collection.
/// </summary>
/// <param name="deprioritizedServers">The deprioritized servers.</param>
/// <param name="mayUseSecondary">The may use secondary criteria.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A channel source.</returns>
Task<IChannelSourceHandle> GetWriteChannelSourceAsync(IReadOnlyCollection<ServerDescription> deprioritizedServers, IMayUseSecondaryCriteria mayUseSecondary, CancellationToken cancellationToken);
}

/// <summary>
Expand Down
18 changes: 15 additions & 3 deletions src/MongoDB.Driver.Core/Core/Bindings/ReadBindingHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver.Core.Clusters;
using MongoDB.Driver.Core.Misc;
using MongoDB.Driver.Core.Servers;

namespace MongoDB.Driver.Core.Bindings
{
Expand Down Expand Up @@ -76,6 +74,20 @@ public Task<IChannelSourceHandle> GetReadChannelSourceAsync(CancellationToken ca
return _reference.Instance.GetReadChannelSourceAsync(cancellationToken);
}

/// <inheritdoc />
public IChannelSourceHandle GetReadChannelSource(IReadOnlyCollection<ServerDescription> deprioritizedServers, CancellationToken cancellationToken)
{
ThrowIfDisposed();
return _reference.Instance.GetReadChannelSource(deprioritizedServers, cancellationToken);
}

/// <inheritdoc />
public Task<IChannelSourceHandle> GetReadChannelSourceAsync(IReadOnlyCollection<ServerDescription> deprioritizedServers, CancellationToken cancellationToken)
{
ThrowIfDisposed();
return _reference.Instance.GetReadChannelSourceAsync(deprioritizedServers, cancellationToken);
}

/// <inheritdoc/>
public void Dispose()
{
Expand Down
21 changes: 17 additions & 4 deletions src/MongoDB.Driver.Core/Core/Bindings/ReadPreferenceBinding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver.Core.Clusters;
Expand Down Expand Up @@ -66,17 +67,29 @@ public ICoreSessionHandle Session
// methods
/// <inheritdoc/>
public IChannelSourceHandle GetReadChannelSource(CancellationToken cancellationToken)
{
return GetReadChannelSource(null, cancellationToken);
}

/// <inheritdoc/>
public Task<IChannelSourceHandle> GetReadChannelSourceAsync(CancellationToken cancellationToken)
{
return GetReadChannelSourceAsync(null, cancellationToken);
}

/// <inheritdoc />
public IChannelSourceHandle GetReadChannelSource(IReadOnlyCollection<ServerDescription> deprioritizedServers, CancellationToken cancellationToken)
{
ThrowIfDisposed();
var server = _cluster.SelectServerAndPinIfNeeded(_session, _serverSelector, cancellationToken);
var server = _cluster.SelectServerAndPinIfNeeded(_session, _serverSelector, deprioritizedServers, cancellationToken);
return GetChannelSourceHelper(server);
}

/// <inheritdoc/>
public async Task<IChannelSourceHandle> GetReadChannelSourceAsync(CancellationToken cancellationToken)
/// <inheritdoc />
public async Task<IChannelSourceHandle> GetReadChannelSourceAsync(IReadOnlyCollection<ServerDescription> deprioritizedServers, CancellationToken cancellationToken)
{
ThrowIfDisposed();
var server = await _cluster.SelectServerAndPinIfNeededAsync(_session, _serverSelector, cancellationToken).ConfigureAwait(false);
var server = await _cluster.SelectServerAndPinIfNeededAsync(_session, _serverSelector, deprioritizedServers, cancellationToken).ConfigureAwait(false);
return GetChannelSourceHelper(server);
}

Expand Down
Loading

0 comments on commit 33e14d0

Please sign in to comment.