Skip to content

Commit

Permalink
Publisher confirms as PublishMiddleware
Browse files Browse the repository at this point in the history
  • Loading branch information
sanych-sun committed Jun 21, 2023
1 parent e23325a commit e7955dd
Show file tree
Hide file tree
Showing 23 changed files with 135 additions and 239 deletions.
2 changes: 1 addition & 1 deletion src/RabbitMQ.Next.Abstractions/Channels/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Task SendAsync<TRequest>(TRequest request, CancellationToken cancellation = defa
where TRequest : struct, IOutgoingMethod
where TResponse : struct, IIncomingMethod;

Task PublishAsync<TState>(TState contentBuilderState, string exchange, string routingKey, IMessageProperties properties, Action<TState, IBufferWriter<byte>> payload, PublishFlags flags = PublishFlags.None, CancellationToken cancellation = default);
Task<ulong> PublishAsync<TState>(TState contentBuilderState, string exchange, string routingKey, IMessageProperties properties, Action<TState, IBufferWriter<byte>> payload, PublishFlags flags = PublishFlags.None, CancellationToken cancellation = default);

Task<TMethod> WaitAsync<TMethod>(CancellationToken cancellation = default)
where TMethod : struct, IIncomingMethod;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System.Threading;
using System.Threading.Tasks;

namespace RabbitMQ.Next.Consumer;

public interface IConsumerMiddleware
{
ValueTask InvokeAsync(IDeliveredMessage message, CancellationToken cancellation);
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Next.Channels;

namespace RabbitMQ.Next.Publisher;

public interface IPublishMiddleware
{
ValueTask InvokeAsync<TContent>(TContent content, IMessageBuilder message, CancellationToken cancellation);
ValueTask InitAsync(IChannel channel, CancellationToken cancellation);

ValueTask<ulong> InvokeAsync<TContent>(TContent content, IMessageBuilder message, CancellationToken cancellation);
}
2 changes: 1 addition & 1 deletion src/RabbitMQ.Next.Publisher.Abstractions/IPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ public interface IPublisher : IAsyncDisposable
{
Task PublishAsync<TContent>(TContent content, Action<IMessageBuilder> propertiesBuilder = null, CancellationToken cancellation = default);

Task PublishAsync<TState, TContent>(TState state, TContent content, Action<TState, IMessageBuilder> propertiesBuilder = null, CancellationToken cancellation = default);
Task PublishAsync<TState, TContent>(TState state, TContent content, Action<TState, IMessageBuilder> propertiesBuilder, CancellationToken cancellation = default);
}
2 changes: 0 additions & 2 deletions src/RabbitMQ.Next.Publisher.Abstractions/IPublisherBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,4 @@ public interface IPublisherBuilder: ISerializationBuilder<IPublisherBuilder>
IPublisherBuilder UsePublishMiddleware(Func<IPublishMiddleware, IPublishMiddleware> middlewareFactory);

IPublisherBuilder UseReturnMiddleware(Func<IReturnMiddleware,IReturnMiddleware> middlewareFactory);

IPublisherBuilder NoConfirms();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ namespace RabbitMQ.Next.Publisher;

public static class PublisherBuilderExtensions
{
public static IPublisherBuilder UsePublishMiddleware(this IPublisherBuilder builder, Action<object, IMessageBuilder> middleware)
=> builder.UsePublishMiddleware(p => new DelegatePublishMiddleware(p, middleware));

public static IPublisherBuilder UseReturnMiddleware(this IPublisherBuilder builder, Action<IReturnedMessage> middleware)
=> builder.UseReturnMiddleware(p => new DelegateReturnMiddleware(p, middleware));
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Next.Channels;

namespace RabbitMQ.Next.Publisher.Attributes;

Expand Down Expand Up @@ -42,7 +43,9 @@ private static IReadOnlyList<MessageAttributeBase> AsMessageAttributes(object[]
return typed;
}

public ValueTask InvokeAsync<TContent>(TContent content, IMessageBuilder message, CancellationToken cancellation)
public ValueTask InitAsync(IChannel channel, CancellationToken cancellation) => default;

public ValueTask<ulong> InvokeAsync<TContent>(TContent content, IMessageBuilder message, CancellationToken cancellation)
{
var type = typeof(TContent);

Expand Down
6 changes: 4 additions & 2 deletions src/RabbitMQ.Next.Publisher/ConfirmMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
using System.Collections.Concurrent;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Next.Channels;
using RabbitMQ.Next.Messaging;
using RabbitMQ.Next.Tasks;
using RabbitMQ.Next.Transport.Methods.Basic;

namespace RabbitMQ.Next.Publisher;
Expand All @@ -25,15 +27,15 @@ static ConfirmMessageHandler()
NegativeCompletedTcs.SetResult(false);
}

public Task<bool> WaitForConfirmAsync(ulong deliveryTag)
public Task<bool> WaitForConfirmAsync(ulong deliveryTag, CancellationToken cancellation)
{
var tcs = this.pendingConfirms.GetOrAdd(deliveryTag, _ => new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously));
if (tcs.Task.IsCompleted)
{
this.pendingConfirms.TryRemove(deliveryTag, out _);
}

return tcs.Task;
return tcs.Task.WithCancellation(cancellation);
}

public void Handle(AckMethod method, IPayload payload)
Expand Down
41 changes: 41 additions & 0 deletions src/RabbitMQ.Next.Publisher/ConfirmMessageMiddleware.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Next.Channels;
using RabbitMQ.Next.Transport.Methods.Basic;
using RabbitMQ.Next.Transport.Methods.Confirm;

namespace RabbitMQ.Next.Publisher;

internal class ConfirmMessageMiddleware : IPublishMiddleware
{
private readonly IPublishMiddleware next;
private ConfirmMessageHandler confirms;

public ConfirmMessageMiddleware(IPublishMiddleware next)
{
this.next = next;
}

public ValueTask InitAsync(IChannel channel, CancellationToken cancellation)
{
this.confirms = new ConfirmMessageHandler();
channel.WithMessageHandler<AckMethod>(this.confirms);
channel.WithMessageHandler<NackMethod>(this.confirms);

return new ValueTask(channel.SendAsync<SelectMethod, SelectOkMethod>(new SelectMethod(), cancellation));
}

public async ValueTask<ulong> InvokeAsync<TContent>(TContent content, IMessageBuilder message, CancellationToken cancellation)
{
var deliveryTag = await this.next.InvokeAsync(content, message, cancellation);

var confirmed = await this.confirms.WaitForConfirmAsync(deliveryTag, cancellation);
if (!confirmed)
{
// todo: provide some useful info here
throw new DeliveryFailedException();
}

return deliveryTag;
}
}
38 changes: 2 additions & 36 deletions src/RabbitMQ.Next.Publisher/ConnectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ namespace RabbitMQ.Next.Publisher;

public static class ConnectionExtensions
{
private static readonly Func<IPublishMiddleware, IPublishMiddleware> DefaultPublishPipelineFactory = (p) => p;
private static readonly Func<IReturnMiddleware, IReturnMiddleware> DefaultReturnPipelineFactory = (p) => null;

public static IPublisher Publisher(this IConnection connection, string exchange, Action<IPublisherBuilder> builder = null)
{
if (string.IsNullOrWhiteSpace(exchange))
Expand All @@ -27,44 +24,13 @@ public static IPublisher Publisher(this IConnection connection, string exchange,
new MessageBuilderPoolPolicy(),
10);

var publishPipelineFactory = DefaultPublishPipelineFactory;
if (publisherBuilder.PublishMiddlewares.Count > 0)
{
publishPipelineFactory = (p) =>
{
var last = p;
for (var i = 0; i < publisherBuilder.PublishMiddlewares.Count; i++)
{
last = publisherBuilder.PublishMiddlewares[i]?.Invoke(last);
}
return last;
};
}

var returnPipelineFactory = DefaultReturnPipelineFactory;
if (publisherBuilder.ReturnMiddlewares.Count > 0)
{
returnPipelineFactory = (p) =>
{
var last = p;
for (var i = 0; i < publisherBuilder.ReturnMiddlewares.Count; i++)
{
last = publisherBuilder.ReturnMiddlewares[i]?.Invoke(last);
}
return last;
};
}

var publisher = new Publisher(
connection,
messagePropsPool,
publisherBuilder.Serializer,
exchange,
publisherBuilder.PublisherConfirms,
publishPipelineFactory,
returnPipelineFactory);
publisherBuilder.PublishMiddlewares,
publisherBuilder.ReturnMiddlewares);

return publisher;
}
Expand Down
36 changes: 13 additions & 23 deletions src/RabbitMQ.Next.Publisher/InternalMessagePublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,37 @@
using RabbitMQ.Next.Channels;
using RabbitMQ.Next.Messaging;
using RabbitMQ.Next.Serialization;
using RabbitMQ.Next.Tasks;

namespace RabbitMQ.Next.Publisher;

internal sealed class InternalMessagePublisher : IPublishMiddleware
{
private readonly IChannel channel;
private readonly ISerializer serializer;
private readonly ConfirmMessageHandler confirms;
private readonly string exchange;

private long lastDeliveryTag;

public InternalMessagePublisher(IChannel channel, string exchange, ISerializer serializer, ConfirmMessageHandler confirms)
private IChannel publisherChannel;

public InternalMessagePublisher(string exchange, ISerializer serializer)
{
this.channel = channel;
this.exchange = exchange;
this.serializer = serializer;
this.confirms = confirms;
}

public async ValueTask InvokeAsync<TContent>(TContent content, IMessageBuilder message, CancellationToken cancellation = default)
public ValueTask InitAsync(IChannel channel, CancellationToken cancellation)
{
this.publisherChannel = channel;
return default;
}

public ValueTask<ulong> InvokeAsync<TContent>(TContent content, IMessageBuilder message, CancellationToken cancellation = default)
{
var flags = this.ComposePublishFlags(message);

await this.channel.PublishAsync(
return this.publisherChannel.PublishAsync(
(content, this.serializer, message),
this.exchange, message.RoutingKey, message,
(st, buffer) => st.serializer.Serialize(st.message, st.content, buffer),
flags, cancellation);

var deliveryTag = Interlocked.Increment(ref this.lastDeliveryTag);

if (this.confirms != null)
{
var confirmed = await this.confirms.WaitForConfirmAsync((ulong)deliveryTag);
if (!confirmed)
{
// todo: provide some useful info here
throw new DeliveryFailedException();
}
}
flags, cancellation).AsValueTask();
}

private PublishFlags ComposePublishFlags(IMessageBuilder message)
Expand Down
2 changes: 0 additions & 2 deletions src/RabbitMQ.Next.Publisher/MessageBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ public IMessageBuilder SetContentEncoding(string value)
return this;
}

public IMessageBuilder SetSetHeader(string key, object value) => throw new NotImplementedException();

public IMessageBuilder SetHeader(string key, object value)
{
if (string.IsNullOrWhiteSpace(key))
Expand Down
Loading

0 comments on commit e7955dd

Please sign in to comment.