Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New event to handle Metadata update #332

Merged
merged 6 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ jobs:
key: ${{ runner.os }}-v2-nuget-${{ hashFiles('**/*.csproj') }}
restore-keys: |
${{ runner.os }}-v2-nuget-
- name: Wait RabbitMQ is Up
run: docker exec ${{ job.services.rabbitmq.id }} rabbitmqctl wait --pid 1 --timeout 60
- name: Enable RabbitMQ Plugins
run: docker exec ${{ job.services.rabbitmq.id }} rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management rabbitmq_amqp1_0
- name: Restore
Expand Down
71 changes: 63 additions & 8 deletions RabbitMQ.Stream.Client/AbstractEntity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,87 @@
namespace RabbitMQ.Stream.Client
{

public abstract record EntityCommonConfig
{
internal ConnectionsPool Pool { get; set; }
}

internal enum EntityStatus
{
Open,
Closed,
Disposed
}
public abstract class AbstractEntity

public interface IClosable
{
public Task<ResponseCode> Close();
}

public abstract class AbstractEntity : IClosable
{
private readonly CancellationTokenSource _cancelTokenSource = new();
protected CancellationToken Token => _cancelTokenSource.Token;

protected ILogger Logger { get; init; }
internal EntityStatus _status = EntityStatus.Closed;

protected byte EntityId { get; set; }
protected abstract string GetStream();
protected abstract string DumpEntityConfiguration();

// here the _cancelTokenSource is disposed and the token is cancelled
// in producer is used to cancel the send task
// in consumer is used to cancel the receive task
protected void MaybeCancelToken()
private void MaybeCancelToken()
{

if (!_cancelTokenSource.IsCancellationRequested)
_cancelTokenSource.Cancel();
}

public abstract Task<ResponseCode> Close();

protected void Dispose(bool disposing, string entityInfo, ILogger logger)
/// <summary>
/// Remove the producer or consumer from the server
/// </summary>
/// <param name="ignoreIfAlreadyDeleted"> In case the producer or consumer is already removed from the server.
/// ex: metadata update </param>
/// <returns></returns>
protected abstract Task<ResponseCode> DeleteEntityFromTheServer(bool ignoreIfAlreadyDeleted = false);

/// <summary>
/// Internal close method. It is called by the public Close method.
/// Set the status to closed and remove the producer or consumer from the server ( if it is not already removed )
/// Close the TCP connection if it is not already closed or it is needed.
/// </summary>
/// <param name="config">The connection pool instance</param>
/// <param name="ignoreIfAlreadyDeleted"></param>
/// <returns></returns>
protected async Task<ResponseCode> Shutdown(EntityCommonConfig config, bool ignoreIfAlreadyDeleted = false)
{
MaybeCancelToken();

if (!IsOpen()) // the client is already closed
{
return ResponseCode.Ok;
}

_status = EntityStatus.Closed;
var result = await DeleteEntityFromTheServer(ignoreIfAlreadyDeleted).ConfigureAwait(false);

if (_client is { IsClosed: true })
{
return result;
}

var closed = await _client.MaybeClose($"closing: {EntityId}",
GetStream(), config.Pool)
.ConfigureAwait(false);
ClientExceptions.MaybeThrowException(closed.ResponseCode, $"_client-close-Entity: {EntityId}");
Logger.LogDebug("{EntityInfo} is closed", DumpEntityConfiguration());
return result;
}

protected void Dispose(bool disposing)
{
if (!disposing)
{
Expand All @@ -51,12 +107,12 @@ protected void Dispose(bool disposing, string entityInfo, ILogger logger)
var closeTask = Close();
if (!closeTask.Wait(Consts.MidWait))
{
logger.LogWarning("Failed to close {EntityInfo} in time", entityInfo);
Logger?.LogWarning("Failed to close {EntityInfo} in time", DumpEntityConfiguration());
}
}
catch (Exception e)
{
logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", entityInfo, e.Message);
Logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", DumpEntityConfiguration(), e.Message);
}
finally
{
Expand All @@ -70,6 +126,5 @@ public bool IsOpen()
}

internal Client _client;

}
}
100 changes: 74 additions & 26 deletions RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ public record ClientParameters
public string VirtualHost { get; set; } = "/";
public EndPoint Endpoint { get; set; } = new IPEndPoint(IPAddress.Loopback, 5552);

public Action<MetaDataUpdate> MetadataHandler { get; set; } = _ => { };
public delegate void MetadataUpdateHandler(MetaDataUpdate update);

public event MetadataUpdateHandler OnMetadataUpdate;
public Action<Exception> UnhandledExceptionHandler { get; set; } = _ => { };
public TimeSpan Heartbeat { get; set; } = TimeSpan.FromMinutes(1);

Expand All @@ -71,6 +73,11 @@ public string ClientProvidedName
public AddressResolver AddressResolver { get; set; } = null;

public AuthMechanism AuthMechanism { get; set; } = AuthMechanism.Plain;

internal void FireMetadataUpdate(MetaDataUpdate metaDataUpdate)
{
OnMetadataUpdate?.Invoke(metaDataUpdate);
}
}

internal readonly struct OutgoingMsg : ICommand
Expand Down Expand Up @@ -213,7 +220,8 @@ await client
.ConfigureAwait(false);
logger?.LogDebug("Sasl mechanism: {Mechanisms}", saslHandshakeResponse.Mechanisms);

var isValid = saslHandshakeResponse.Mechanisms.Contains(parameters.AuthMechanism.ToString().ToUpperInvariant(),
var isValid = saslHandshakeResponse.Mechanisms.Contains(
parameters.AuthMechanism.ToString().ToUpperInvariant(),
StringComparer.OrdinalIgnoreCase);
if (!isValid)
{
Expand All @@ -225,7 +233,8 @@ await client
var authResponse =
await client
.Request<SaslAuthenticateRequest, SaslAuthenticateResponse>(corr =>
new SaslAuthenticateRequest(corr, parameters.AuthMechanism.ToString().ToUpperInvariant(), saslData))
new SaslAuthenticateRequest(corr, parameters.AuthMechanism.ToString().ToUpperInvariant(),
saslData))
.ConfigureAwait(false);
ClientExceptions.MaybeThrowException(authResponse.ResponseCode, parameters.UserName);

Expand Down Expand Up @@ -322,22 +331,28 @@ public async ValueTask<bool> Publish(Publish publishMsg)
return (publisherId, response);
}

public async Task<DeletePublisherResponse> DeletePublisher(byte publisherId)
public async Task<DeletePublisherResponse> DeletePublisher(byte publisherId,
bool ignoreIfAlreadyRemoved = false)
{
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
try
{
var result =
await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
new DeletePublisherRequest(corr, publisherId)).ConfigureAwait(false);
if (!ignoreIfAlreadyRemoved)
{
var result =
await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
new DeletePublisherRequest(corr, publisherId)).ConfigureAwait(false);

return result;
return result;
}
}
finally
{
publishers.Remove(publisherId);
_poolSemaphore.Release();
}

return new DeletePublisherResponse();
}

public async Task<(byte, SubscribeResponse)> Subscribe(string stream, IOffsetType offsetType,
Expand Down Expand Up @@ -386,20 +401,24 @@ public async Task<DeletePublisherResponse> DeletePublisher(byte publisherId)
return (subscriptionId, response);
}

public async Task<UnsubscribeResponse> Unsubscribe(byte subscriptionId)
public async Task<UnsubscribeResponse> Unsubscribe(byte subscriptionId, bool ignoreIfAlreadyRemoved = false)
{
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
try
{
// here we reduce a bit the timeout to avoid waiting too much
// if the client is busy with read operations it can take time to process the unsubscribe
// but the subscribe is removed.
var result =
await Request<UnsubscribeRequest, UnsubscribeResponse>(corr =>
new UnsubscribeRequest(corr, subscriptionId), TimeSpan.FromSeconds(5)).ConfigureAwait(false);
_logger.LogDebug("Unsubscribe request : {SubscriptionId}", subscriptionId);

return result;
if (!ignoreIfAlreadyRemoved)
{
// here we reduce a bit the timeout to avoid waiting too much
// if the client is busy with read operations it can take time to process the unsubscribe
// but the subscribe is removed.
var result =
await Request<UnsubscribeRequest, UnsubscribeResponse>(corr =>
new UnsubscribeRequest(corr, subscriptionId),
TimeSpan.FromSeconds(5)).ConfigureAwait(false);
_logger.LogDebug("Unsubscribe request : {SubscriptionId}", subscriptionId);

return result;
}
}
finally
{
Expand All @@ -408,6 +427,8 @@ public async Task<UnsubscribeResponse> Unsubscribe(byte subscriptionId)
consumers.Remove(subscriptionId);
_poolSemaphore.Release();
}

return new UnsubscribeResponse();
}

public async Task<PartitionsQueryResponse> QueryPartition(string superStream)
Expand Down Expand Up @@ -477,12 +498,25 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
case PublishConfirm.Key:
PublishConfirm.Read(frame, out var confirm);
confirmFrames += 1;
var (confirmCallback, _) = publishers[confirm.PublisherId];
confirmCallback(confirm.PublishingIds);
if (MemoryMarshal.TryGetArray(confirm.PublishingIds, out var confirmSegment))
if (publishers.TryGetValue(confirm.PublisherId, out var publisherConf))
{
if (confirmSegment.Array != null)
ArrayPool<ulong>.Shared.Return(confirmSegment.Array);
var (confirmCallback, _) = publisherConf;
confirmCallback(confirm.PublishingIds);
if (MemoryMarshal.TryGetArray(confirm.PublishingIds, out var confirmSegment))
{
if (confirmSegment.Array != null)
ArrayPool<ulong>.Shared.Return(confirmSegment.Array);
}
}
else
{
// the producer is not found, this can happen when the producer is closing
// and there are still confirmation on the wire
// we can ignore the error since the producer does not exists anymore
_logger?.LogDebug(
"Could not find stream producer {ID} or producer is closing." +
"A possible cause it that the producer was closed and the are still confirmation on the wire. ",
confirm.PublisherId);
}

break;
Expand All @@ -507,12 +541,26 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
break;
case PublishError.Key:
PublishError.Read(frame, out var error);
var (_, errorCallback) = publishers[error.PublisherId];
errorCallback(error.PublishingErrors);
if (publishers.TryGetValue(error.PublisherId, out var publisher))
{
var (_, errorCallback) = publisher;
errorCallback(error.PublishingErrors);
}
else
{
// the producer is not found, this can happen when the producer is closing
// and there are still confirmation on the wire
// we can ignore the error since the producer does not exists anymore
_logger?.LogDebug(
"Could not find stream producer {ID} or producer is closing." +
"A possible cause it that the producer was closed and the are still confirmation on the wire. ",
error.PublisherId);
}

break;
case MetaDataUpdate.Key:
MetaDataUpdate.Read(frame, out var metaDataUpdate);
Parameters.MetadataHandler(metaDataUpdate);
Parameters.FireMetadataUpdate(metaDataUpdate);
break;
case TuneResponse.Key:
TuneResponse.Read(frame, out var tuneResponse);
Expand Down
7 changes: 2 additions & 5 deletions RabbitMQ.Stream.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,18 @@

namespace RabbitMQ.Stream.Client;

public interface IConsumer
public interface IConsumer : IClosable
{
public Task StoreOffset(ulong offset);
public Task<ResponseCode> Close();
public void Dispose();

public ConsumerInfo Info { get; }
}

public record IConsumerConfig : INamedEntity
public record IConsumerConfig : EntityCommonConfig, INamedEntity
{
private ushort _initialCredits = Consts.ConsumerInitialCredits;

internal ConnectionsPool Pool { get; set; }

// StoredOffsetSpec configuration it is needed to keep the offset spec.
// since the offset can be decided from the ConsumerConfig.OffsetSpec.
// and from ConsumerConfig.ConsumerUpdateListener.
Expand Down
8 changes: 2 additions & 6 deletions RabbitMQ.Stream.Client/IProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace RabbitMQ.Stream.Client;
// - Super-Stream producer
// </summary>

public interface IProducer
public interface IProducer : IClosable
{
/// <summary>
/// Send the message to the stream in asynchronous mode.
Expand Down Expand Up @@ -49,8 +49,6 @@ public interface IProducer
/// <returns></returns>
public ValueTask Send(ulong publishingId, List<Message> subEntryMessages, CompressionType compressionType);

public Task<ResponseCode> Close();

/// <summary>
/// Return the last publishing id.
/// </summary>
Expand Down Expand Up @@ -83,11 +81,9 @@ public record ProducerFilter
public Func<Message, string> FilterValue { get; set; } = null;
}

public record IProducerConfig : INamedEntity
public record IProducerConfig : EntityCommonConfig, INamedEntity
{

internal ConnectionsPool Pool { get; set; }

public string Reference { get; set; }
public int MaxInFlight { get; set; } = 1_000;
public string ClientProvidedName { get; set; } = "dotnet-stream-raw-producer";
Expand Down