Skip to content

Commit

Permalink
Improve Reconnection
Browse files Browse the repository at this point in the history
* Part of #336

* Improve the disconnection message.
  It is possible to understand if it is a normal disconnection or an unexpected.
* Remove the Active Items from the connection pool and use the
  Publishers and Consumers client list directly to check the pool size
* Refactor the Factory Classes.
  Remove code duplication in case of metadata update and connection closed. See ReliableBase.OnEntityClosed
* Handle streamNotAvailable error.
  In this case the client will try to reconnect the entity
* Fix the events attach to the RawConsumer and RawProducer.
  The events are attached only if the ResponseCode is OK
* Detach the events once the entity receives the disconnection or metadata update.
  In that case, the entity is closed
* Introduce ReliableEntityStatus like a state machine to understand the status of Producer and Consumer classes
* Add ResourceAvailableReconnectStrategy interface to Handle the retry in case testing in a stream exists.
  See ReliableBase CheckIfStreamIsAvailable
* Change the  MetadataHandler to Func<MetaDataUpdate, Task> to be like the other methods
* Producer and Consumer classes fail fast during the first initialization.
  The user is aware of what is happening. The reconnect part occurs only after the first boot.

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio committed Jan 9, 2024
1 parent a7c9fd6 commit e1eaac3
Show file tree
Hide file tree
Showing 29 changed files with 901 additions and 709 deletions.
8 changes: 4 additions & 4 deletions RabbitMQ.Stream.Client/AbstractEntity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace RabbitMQ.Stream.Client
public abstract record EntityCommonConfig
{
internal ConnectionsPool Pool { get; set; }
public Func<MetaDataUpdate, Task> MetadataHandler { get; set; }
}

internal enum EntityStatus
Expand Down Expand Up @@ -49,10 +50,11 @@ protected void ThrowIfClosed()
// here the _cancelTokenSource is disposed and the token is cancelled
// in producer is used to cancel the send task
// in consumer is used to cancel the receive task
private void MaybeCancelToken()
protected void UpdateStatusToClosed()
{
if (!_cancelTokenSource.IsCancellationRequested)
_cancelTokenSource.Cancel();
_status = EntityStatus.Closed;
}

public abstract Task<ResponseCode> Close();
Expand Down Expand Up @@ -82,9 +84,7 @@ protected async Task<ResponseCode> Shutdown(EntityCommonConfig config, bool igno
return ResponseCode.Ok;
}

MaybeCancelToken();

_status = EntityStatus.Closed;
UpdateStatusToClosed();
var result = await DeleteEntityFromTheServer(ignoreIfAlreadyDeleted).ConfigureAwait(false);

if (_client is { IsClosed: true })
Expand Down
107 changes: 59 additions & 48 deletions RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ public record ClientParameters
{"product", "RabbitMQ Stream"},
{"version", Version.VersionString},
{"platform", ".NET"},
{"copyright", "Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries."},
{
"copyright",
"Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries."
},
{
"information",
"Licensed under the Apache 2.0 and MPL 2.0 licenses. See https://www.rabbitmq.com/"
Expand All @@ -53,7 +56,7 @@ public record ClientParameters
public string VirtualHost { get; set; } = "/";
public EndPoint Endpoint { get; set; } = new IPEndPoint(IPAddress.Loopback, 5552);

public delegate void MetadataUpdateHandler(MetaDataUpdate update);
public delegate Task MetadataUpdateHandler(MetaDataUpdate update);

public event MetadataUpdateHandler OnMetadataUpdate;
public Action<Exception> UnhandledExceptionHandler { get; set; } = _ => { };
Expand Down Expand Up @@ -121,12 +124,13 @@ public class Client : IClient
private readonly TaskCompletionSource<TuneResponse> tuneReceived =
new TaskCompletionSource<TuneResponse>(TaskCreationOptions.RunContinuationsAsynchronously);

internal readonly IDictionary<byte, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>)>
internal readonly IDictionary<byte, (string, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>))>
publishers =
new ConcurrentDictionary<byte, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>)>();
new ConcurrentDictionary<byte, (string, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>)
)>();

internal readonly IDictionary<byte, ConsumerEvents> consumers =
new ConcurrentDictionary<byte, ConsumerEvents>();
internal readonly IDictionary<byte, (string, ConsumerEvents)> consumers =
new ConcurrentDictionary<byte, (string, ConsumerEvents)>();

private int publishCommandsSent;

Expand Down Expand Up @@ -201,6 +205,26 @@ private async Task OnConnectionClosed(string reason)
}
}

private readonly SemaphoreSlim _attachSemaphore = new(1, 1);

public void AttachEventsToTheClient(ConnectionCloseHandler connectionCloseHandler,
ClientParameters.MetadataUpdateHandler metadataUpdateHandler)
{
_attachSemaphore.Wait();
ConnectionClosed += connectionCloseHandler;
Parameters.OnMetadataUpdate += metadataUpdateHandler;
_attachSemaphore.Release();
}

public void DetachEventsFromTheClient(ConnectionCloseHandler connectionCloseHandler,
ClientParameters.MetadataUpdateHandler metadataUpdateHandler)
{
_attachSemaphore.Wait();
ConnectionClosed -= connectionCloseHandler;
Parameters.OnMetadataUpdate -= metadataUpdateHandler;
_attachSemaphore.Release();
}

public static async Task<Client> Create(ClientParameters parameters, ILogger logger = null)
{
var client = new Client(parameters, logger);
Expand Down Expand Up @@ -312,7 +336,8 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand

try
{
publishers.Add(publisherId, (confirmCallback, errorCallback));
publishers.Add(publisherId, (stream,
(confirmCallback, errorCallback)));
response = await Request<DeclarePublisherRequest, DeclarePublisherResponse>(corr =>
new DeclarePublisherRequest(corr, publisherId, publisherRef, stream)).ConfigureAwait(false);
}
Expand All @@ -324,10 +349,9 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand
if (response.ResponseCode == ResponseCode.Ok)
return (publisherId, response);

// if the response code is not ok we need to remove the subscription
// and close the connection if necessary.
publishers.Remove(publisherId);
await MaybeClose("Create Publisher Exception", stream, pool).ConfigureAwait(false);
pool?.MaybeClose(ClientId, "Publisher creation failed");
return (publisherId, response);
}

Expand Down Expand Up @@ -396,9 +420,10 @@ private byte IncrementEntityId()
try
{
consumers.Add(subscriptionId,
new ConsumerEvents(
deliverHandler,
consumerUpdateHandler));
(config.Stream,
new ConsumerEvents(
deliverHandler,
consumerUpdateHandler)));

response = await Request<SubscribeRequest, SubscribeResponse>(corr =>
new SubscribeRequest(corr, subscriptionId, config.Stream, config.OffsetSpec, initialCredit,
Expand All @@ -412,10 +437,8 @@ private byte IncrementEntityId()
if (response.ResponseCode == ResponseCode.Ok)
return (subscriptionId, response);

// if the response code is not ok we need to remove the subscription
// and close the connection if necessary.
consumers.Remove(subscriptionId);
await MaybeClose("Create Consumer Exception", config.Stream, config.Pool).ConfigureAwait(false);
config.Pool.MaybeClose(ClientId, "Subscription failed");
return (subscriptionId, response);
}

Expand Down Expand Up @@ -518,7 +541,8 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
confirmFrames += 1;
if (publishers.TryGetValue(confirm.PublisherId, out var publisherConf))
{
var (confirmCallback, _) = publisherConf;
var (_, (confirmCallback, _)) = (publisherConf);

confirmCallback(confirm.PublishingIds);
if (MemoryMarshal.TryGetArray(confirm.PublishingIds, out var confirmSegment))
{
Expand All @@ -542,7 +566,8 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
Deliver.Read(frame, out var deliver);
if (consumers.TryGetValue(deliver.SubscriptionId, out var consumerEvent))
{
await consumerEvent.DeliverHandler(deliver).ConfigureAwait(false);
var (_, deliverHandler) = consumerEvent;
await deliverHandler.DeliverHandler(deliver).ConfigureAwait(false);
}
else
{
Expand All @@ -561,7 +586,7 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
PublishError.Read(frame, out var error);
if (publishers.TryGetValue(error.PublisherId, out var publisher))
{
var (_, errorCallback) = publisher;
var (_, (_, errorCallback)) = publisher;
errorCallback(error.PublishingErrors);
}
else
Expand All @@ -588,7 +613,8 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
ConsumerUpdateQueryResponse.Read(frame, out var consumerUpdateQueryResponse);
HandleCorrelatedResponse(consumerUpdateQueryResponse);
var consumerEventsUpd = consumers[consumerUpdateQueryResponse.SubscriptionId];
var off = await consumerEventsUpd.ConsumerUpdateHandler(consumerUpdateQueryResponse.IsActive)
var consumer = consumerEventsUpd.Item2;
var off = await consumer.ConsumerUpdateHandler(consumerUpdateQueryResponse.IsActive)
.ConfigureAwait(false);
if (off == null)
{
Expand Down Expand Up @@ -736,14 +762,6 @@ private void InternalClose()
IsClosed = true;
}

private bool HasEntities()
{
lock (Obj)
{
return publishers.Count > 0 || consumers.Count > 0;
}
}

private async ValueTask<bool> ConsumerUpdateResponse(uint rCorrelationId, IOffsetType offsetSpecification)
{
return await Publish(new ConsumerUpdateRequest(rCorrelationId, offsetSpecification)).ConfigureAwait(false);
Expand All @@ -759,6 +777,7 @@ public async Task<CloseResponse> Close(string reason)
InternalClose();
try
{
connection.UpdateCloseStatus(ConnectionClosedReason.Normal);
var result =
await Request<CloseRequest, CloseResponse>(corr => new CloseRequest(corr, reason),
TimeSpan.FromSeconds(10)).ConfigureAwait(false);
Expand Down Expand Up @@ -799,27 +818,9 @@ internal async Task<CloseResponse> MaybeClose(string reason, string stream, Conn
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
try
{
if (!HasEntities())
{
if (!string.IsNullOrEmpty(ClientId))
{
_logger.LogInformation("Close connection for the {ClientId}", ClientId);
// the client can be closed in an unexpected way so we need to remove it from the pool
// so you will find pool.remove(ClientId) also to the disconnect event
pool.Remove(ClientId);
await Close(reason).ConfigureAwait(false);
}
}
else
{
// we remove an id reference from the client
// in case there are still active ids from the client and the stream
if (!string.IsNullOrEmpty(ClientId))
{
pool.Release(ClientId, stream);
}
}

// the client can be closed in an unexpected way so we need to remove it from the pool
// so you will find pool.remove(ClientId) also to the disconnect event
pool.MaybeClose(ClientId, reason);
var result = new CloseResponse(0, ResponseCode.Ok);
return result;
}
Expand All @@ -831,6 +832,16 @@ internal async Task<CloseResponse> MaybeClose(string reason, string stream, Conn

public string ClientId { get; init; }

public IDictionary<byte, (string, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>))> Publishers
{
get => publishers;
}

public IDictionary<byte, (string, ConsumerEvents)> Consumers
{
get => consumers;
}

public async ValueTask<QueryPublisherResponse> QueryPublisherSequence(string publisherRef, string stream)
{
return await Request<QueryPublisherRequest, QueryPublisherResponse>(corr =>
Expand Down
13 changes: 10 additions & 3 deletions RabbitMQ.Stream.Client/ClientExceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace RabbitMQ.Stream.Client
{
internal static class ClientExceptions
{

// <summary>
/// IsAKnownException returns true if the exception is a known exception
/// We need it to reconnect when the producer/consumer.
Expand All @@ -32,11 +31,19 @@ internal static bool IsAKnownException(Exception exception)
{
var x = aggregateException.InnerExceptions.Select(x =>
x.GetType() == typeof(SocketException) || x.GetType() == typeof(TimeoutException) ||
x.GetType() == typeof(LeaderNotFoundException));
x.GetType() == typeof(LeaderNotFoundException) || x.GetType() == typeof(InvalidOperationException));
return x.Any();
}

return exception is (SocketException or TimeoutException or LeaderNotFoundException);
return exception is (SocketException or TimeoutException or LeaderNotFoundException or InvalidOperationException) ||
IsStreamNotAvailable(exception);
}

internal static bool IsStreamNotAvailable(Exception exception)
{
// StreamNotAvailable is a temporary exception it can happen when the stream is just created and
// it is not ready yet to all the nodes. In this case we can try to reconnect.
return exception is CreateException { ResponseCode: ResponseCode.StreamNotAvailable };
}

public static void MaybeThrowException(ResponseCode responseCode, string message)
Expand Down
22 changes: 16 additions & 6 deletions RabbitMQ.Stream.Client/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@

namespace RabbitMQ.Stream.Client
{
internal static class ConnectionClosedReason
{
public const string Normal = "TCP connection closed normal";
public const string Unexpected = "TCP connection closed unexpected";
}

public class Connection : IDisposable
{
private readonly Socket socket;
Expand All @@ -25,6 +31,7 @@ public class Connection : IDisposable
private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1);
private int numFrames;
private bool isClosed = false;
private string _closedReason = ConnectionClosedReason.Unexpected;
private bool _disposedValue;
private readonly ILogger _logger;

Expand All @@ -35,6 +42,10 @@ public class Connection : IDisposable
internal int NumFrames => numFrames;
internal string ClientId { get; set; }
public bool IsClosed => isClosed;
public void UpdateCloseStatus(string reason)
{
_closedReason = reason;
}

private static System.IO.Stream MaybeTcpUpgrade(NetworkStream networkStream, SslOption sslOption)
{
Expand Down Expand Up @@ -191,14 +202,12 @@ private async Task ProcessIncomingFrames()
finally
{
isClosed = true;
_logger?.LogDebug("TCP Connection Closed ClientId: {ClientId} is IsCancellationRequested {Token} ",
ClientId, Token.IsCancellationRequested);
_logger?.LogDebug(
"TCP Connection Closed ClientId: {ClientId}, Reason {Reason}. IsCancellationRequested {Token} ",
ClientId, _closedReason, Token.IsCancellationRequested);
// Mark the PipeReader as complete
await reader.CompleteAsync(caught).ConfigureAwait(false);
var t = closedCallback?.Invoke("TCP Connection Closed")!;
if (t != null)
await t.ConfigureAwait(false);
_logger?.LogDebug("TCP Connection Closed");
closedCallback?.Invoke(_closedReason)!.ConfigureAwait(false);
}
}

Expand Down Expand Up @@ -231,6 +240,7 @@ public void Dispose()
{
try
{
UpdateCloseStatus(ConnectionClosedReason.Normal);
if (!_cancelTokenSource.IsCancellationRequested)
{
_cancelTokenSource.Cancel();
Expand Down
Loading

0 comments on commit e1eaac3

Please sign in to comment.