Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ Consider a Producer instance like a long-lived object, do not create one to send
| ConnectionClosedHandler | Event when the client is disconnected | It is an event |
| MaxInFlight | Max Number of messages before send | 1000 |

Producer with a reference name stores the sequence id on the server.
It is possible to retrieve the id using `producer.GetLastPublishingId()`
or more generic `system.QuerySequence("reference", "my_stream")`

### Publish Messages

#### Standard publish
Expand Down
17 changes: 10 additions & 7 deletions RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private Client(ClientParameters parameters)
_heartBeatHandler = new HeartBeatHandler(
SendHeartBeat,
Close,
(int) parameters.Heartbeat.TotalSeconds);
(int)parameters.Heartbeat.TotalSeconds);
IsClosed = false;
}

Expand Down Expand Up @@ -330,7 +330,7 @@ private uint NextCorrelationId()

private async Task HandleClosed(string reason)
{
IsClosed = true;
InternalClose();
await OnConnectionClosed(reason);
}

Expand Down Expand Up @@ -451,7 +451,7 @@ private void HandleCorrelatedCommand(ushort tag, ref ReadOnlySequence<byte> fram
case CloseResponse.Key:
CloseResponse.Read(frame, out var closeResponse);
HandleCorrelatedResponse(closeResponse);
IsClosed = true;
InternalClose();
break;
case HeartBeatHandler.Key:
_heartBeatHandler.UpdateHeartBeat();
Expand Down Expand Up @@ -484,30 +484,33 @@ private async ValueTask<bool> SendHeartBeat()
return await Publish(new HeartBeatRequest());
}

private void InternalClose()
{
_heartBeatHandler.Close();
IsClosed = true;
}

public async Task<CloseResponse> Close(string reason)
{
if (IsClosed)
{
return new CloseResponse(0, ResponseCode.Ok);
}

_heartBeatHandler.Close();

// TODO LRB timeout
var result =
await Request<CloseRequest, CloseResponse>(corr => new CloseRequest(corr, reason),
TimeSpan.FromSeconds(30));

try
{
InternalClose();
connection.Dispose();
}
catch (Exception e)
{
LogEventSource.Log.LogError($"An error occurred while calling {nameof(connection.Dispose)}.", e);
}

IsClosed = true;
return result;
}

Expand Down
12 changes: 12 additions & 0 deletions RabbitMQ.Stream.Client/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,18 @@ private async Task SemaphoreWait()
}
}

/// <summary>
/// GetLastPublishingId
/// </summary>
/// <returns>The last sequence id stored by the producer.</returns>
public async Task<ulong> GetLastPublishingId()
{
var response = await client.QueryPublisherSequence(config.Reference, config.Stream);
ClientExceptions.MaybeThrowException(response.ResponseCode,
$"GetLastPublishingId stream: {config.Stream}, reference: {config.Reference}");
return response.Sequence;
}

public async ValueTask Send(ulong publishingId, Message message)
{
await SemaphoreWait();
Expand Down
4 changes: 1 addition & 3 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ RabbitMQ.Stream.Client.Producer
RabbitMQ.Stream.Client.Producer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
RabbitMQ.Stream.Client.Producer.ConfirmFrames.get -> int
RabbitMQ.Stream.Client.Producer.Dispose() -> void
RabbitMQ.Stream.Client.Producer.GetLastPublishingId() -> System.Threading.Tasks.Task<ulong>
RabbitMQ.Stream.Client.Producer.IncomingFrames.get -> int
RabbitMQ.Stream.Client.Producer.MessagesSent.get -> int
RabbitMQ.Stream.Client.Producer.PendingCount.get -> int
Expand Down Expand Up @@ -624,9 +625,6 @@ RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.StreamNotAvailable = 6 -> Rab
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.TimeoutError = 2 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.UndefinedError = 200 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
RabbitMQ.Stream.Client.Reliable.ConfirmationStatus.WaitForConfirmation = 0 -> RabbitMQ.Stream.Client.Reliable.ConfirmationStatus
RabbitMQ.Stream.Client.Reliable.IPublishingIdStrategy
RabbitMQ.Stream.Client.Reliable.IPublishingIdStrategy.GetPublishingId() -> ulong
RabbitMQ.Stream.Client.Reliable.IPublishingIdStrategy.InitPublishingId() -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy.WhenConnected(string connectionInfo) -> void
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy.WhenDisconnected(string connectionInfo) -> bool
Expand Down
18 changes: 0 additions & 18 deletions RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs

This file was deleted.

55 changes: 13 additions & 42 deletions RabbitMQ.Stream.Client/Reliable/ReliableProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,6 @@

namespace RabbitMQ.Stream.Client.Reliable;

internal class AutoPublishingId : IPublishingIdStrategy
{
private ulong _lastPublishingId = 0;
private readonly ReliableProducerConfig _reliableProducerConfig;

public ulong GetPublishingId()
{
return ++_lastPublishingId;
}

public AutoPublishingId(ReliableProducerConfig reliableProducerConfig)
{
_reliableProducerConfig = reliableProducerConfig;
}

public async Task InitPublishingId()
{
try
{
_lastPublishingId =
await _reliableProducerConfig.StreamSystem.QuerySequence(_reliableProducerConfig.Reference,
_reliableProducerConfig.Stream);
}
catch (Exception)
{
_lastPublishingId = 0;
}
}
}

public record ReliableProducerConfig
{
public StreamSystem StreamSystem { get; set; }
Expand All @@ -63,14 +33,13 @@ public record ReliableProducerConfig
public class ReliableProducer : ReliableBase
{
private Producer _producer;
private readonly AutoPublishingId _autoPublishingId;
private ulong _publishingId;
private readonly ReliableProducerConfig _reliableProducerConfig;
private readonly ConfirmationPipe _confirmationPipe;

private ReliableProducer(ReliableProducerConfig reliableProducerConfig)
{
_reliableProducerConfig = reliableProducerConfig;
_autoPublishingId = new AutoPublishingId(_reliableProducerConfig);
_confirmationPipe = new ConfirmationPipe(reliableProducerConfig.ConfirmationHandler);
_confirmationPipe.Start();
}
Expand All @@ -85,10 +54,6 @@ public static async Task<ReliableProducer> CreateReliableProducer(ReliableProduc
protected override async Task GetNewReliable(bool boot)
{
await SemaphoreSlim.WaitAsync();
if (boot)
{
await _autoPublishingId.InitPublishingId();
}

try
{
Expand Down Expand Up @@ -124,6 +89,12 @@ protected override async Task GetNewReliable(bool boot)
}
});
_reliableProducerConfig.ReconnectStrategy.WhenConnected(ToString());
if (boot)
{
// Init the publishing id
Interlocked.Exchange(ref _publishingId,
await _producer.GetLastPublishingId());
}
}

catch (CreateProducerException ce)
Expand Down Expand Up @@ -170,8 +141,8 @@ public override async Task Close()

public async ValueTask Send(Message message)
{
var pid = _autoPublishingId.GetPublishingId();
_confirmationPipe.AddUnConfirmedMessage(pid, message);
Interlocked.Increment(ref _publishingId);
_confirmationPipe.AddUnConfirmedMessage(_publishingId, message);
await SemaphoreSlim.WaitAsync();
try
{
Expand All @@ -182,7 +153,7 @@ public async ValueTask Send(Message message)
// on the _waitForConfirmation list. The user will get Timeout Error
if (!(_inReconnection))
{
await _producer.Send(pid, message);
await _producer.Send(_publishingId, message);
}
}

Expand All @@ -198,14 +169,14 @@ public async ValueTask Send(Message message)

public async ValueTask Send(List<Message> messages, CompressionType compressionType)
{
var pid = _autoPublishingId.GetPublishingId();
_confirmationPipe.AddUnConfirmedMessage(pid, messages);
Interlocked.Increment(ref _publishingId);
_confirmationPipe.AddUnConfirmedMessage(_publishingId, messages);
await SemaphoreSlim.WaitAsync();
try
{
if (!_inReconnection)
{
await _producer.Send(pid, messages, compressionType);
await _producer.Send(_publishingId, messages, compressionType);
}
}

Expand Down
3 changes: 1 addition & 2 deletions RabbitMQ.Stream.Client/StreamSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private async Task MayBeReconnectLocator()
{
if (client.IsClosed)
{
client = await Client.Create(clientParameters with
client = await Client.Create(client.Parameters with
{
ClientProvidedName = clientParameters.ClientProvidedName
});
Expand Down Expand Up @@ -197,7 +197,6 @@ public async Task<ulong> QuerySequence(string reference, string stream)
{
await MayBeReconnectLocator();
MaybeThrowQueryException(reference, stream);

var response = await client.QueryPublisherSequence(reference, stream);
ClientExceptions.MaybeThrowException(response.ResponseCode,
$"QuerySequence stream: {stream}, reference: {reference}");
Expand Down
19 changes: 8 additions & 11 deletions Tests/ProducerSystemTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,10 @@ public async Task ProducerShouldRaiseAnExceptionIfStreamOrBatchSizeAreNotValid()
var system = await StreamSystem.Create(config);

await Assert.ThrowsAsync<CreateProducerException>(() => system.CreateProducer(
new ProducerConfig
{
Reference = "producer",
Stream = "",
}));
new ProducerConfig { Reference = "producer", Stream = "", }));

await Assert.ThrowsAsync<CreateProducerException>(() => system.CreateProducer(
new ProducerConfig
{
Reference = "producer",
Stream = "TEST",
MessagesBufferSize = -1,
}));
new ProducerConfig { Reference = "producer", Stream = "TEST", MessagesBufferSize = -1, }));

await system.Close();
}
Expand Down Expand Up @@ -275,6 +266,12 @@ await SystemUtils.PublishMessages(system, stream, NumberOfMessages,
// sequence start from zero
Assert.True(resAfter == (NumberOfMessages - 1));

var producer = await system.CreateProducer(new ProducerConfig()
{
Stream = stream,
Reference = ProducerName
});
Assert.True(await producer.GetLastPublishingId() == (NumberOfMessages - 1));
await system.DeleteStream(stream);
await system.Close();
}
Expand Down