Skip to content

Commit

Permalink
Channel frame handling optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
sanych-sun committed Sep 2, 2023
1 parent 8161e71 commit f153691
Show file tree
Hide file tree
Showing 13 changed files with 203 additions and 146 deletions.
5 changes: 5 additions & 0 deletions src/RabbitMQ.Next.Abstractions/Methods/IHasContentMethod.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
namespace RabbitMQ.Next.Methods;

public interface IHasContentMethod
{
}
100 changes: 22 additions & 78 deletions src/RabbitMQ.Next/Channels/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ internal sealed class Channel : IChannelInternal
private readonly ObjectPool<LazyMessageProperties> messagePropertiesPool;
private readonly ObjectPool<MessageBuilder> messageBuilderPool;
private readonly TaskCompletionSource<bool> channelCompletion;
private readonly Dictionary<uint, IMessageHandlerInternal> methodHandlers = new();
private readonly Dictionary<uint, IFrameHandler> methodHandlers = new();

private ulong lastDeliveryTag;

Expand Down Expand Up @@ -57,9 +57,14 @@ public Channel(ChannelWriter<MemoryBlock> socketWriter, ObjectPool<MemoryBlock>
public IDisposable WithMessageHandler<TMethod>(IMessageHandler<TMethod> handler)
where TMethod : struct, IIncomingMethod
{
var parser = MethodRegistry.GetParser<TMethod>();
IFrameHandler frameHandler =
typeof(IHasContentMethod).IsAssignableFrom(typeof(TMethod))
? new MethodWithContentFrameHandler<TMethod>(handler, parser, this.memoryPool, this.messagePropertiesPool)
: new MethodFrameHandler<TMethod>(handler, parser, this.memoryPool);

var methodId = (uint)MethodRegistry.GetMethodId<TMethod>();
var wrapped = new MessageHandlerWrapper<TMethod>(handler);
this.methodHandlers.Add(methodId, wrapped);
this.methodHandlers.Add(methodId, frameHandler);

return new Disposer(() => this.methodHandlers.Remove(methodId));
}
Expand Down Expand Up @@ -199,92 +204,31 @@ private void ValidateState()

// Incoming frame processing state
// TODO: consider removing to specialized class
private FrameType expectedFrameType = FrameType.Method;
private MethodId currentMethodId = MethodId.Unknown;
private MemoryBlock methodFrame;
private MemoryBlock contentHeader;
private long pendingContentSize;
private MemoryBlock contentBodyHead;
private MemoryBlock contentBodyTail;
private IFrameHandler currentFrameHandler;


public void PushFrame(FrameType type, MemoryBlock payload)
{
if (type != this.expectedFrameType)
{
throw new InvalidOperationException($"Expected frame type is {this.expectedFrameType} but got {type}");
}

this.expectedFrameType = type switch
{
FrameType.Method => this.ParseMethodFrame(payload),
FrameType.ContentHeader => this.ParseContentHeaderFrame(payload),
FrameType.ContentBody => this.ParseContentBodyFrame(payload),
_ => throw new ArgumentOutOfRangeException(nameof(type), type, $"Non supported frame type: {type}"),
};

if (this.expectedFrameType == FrameType.None)
if (this.currentFrameHandler == null)
{
// do not wait for frames anymore, can execute the method
if (this.methodHandlers.TryGetValue((uint)this.currentMethodId, out var handler))
if (type != FrameType.Method)
{
PayloadAccessor payloadAccessor = null;

if (this.contentHeader != null)
{
payloadAccessor = new PayloadAccessor(this.messagePropertiesPool, this.memoryPool, this.contentHeader, this.contentBodyHead);
}

handler.ProcessMessage(this.methodFrame, payloadAccessor);
throw new InvalidOperationException($"Unexpected {type} frame, when Method frame was expected");
}
else

((ReadOnlySpan<byte>)payload.Buffer).Read(out uint methodId);

if (!this.methodHandlers.TryGetValue(methodId, out this.currentFrameHandler))
{
// TODO: should throw on unhandled methods?
// TODO: throw channel exception here?
throw new InvalidOperationException();
}

this.memoryPool.Return(this.methodFrame);

// reset state
this.expectedFrameType = FrameType.Method;
this.currentMethodId = MethodId.Unknown;
this.methodFrame = null;
this.contentHeader = null;
this.pendingContentSize = 0;
this.contentBodyHead = null;
this.contentBodyTail = null;
}
}

private FrameType ParseMethodFrame(MemoryBlock payload)
{
((ReadOnlyMemory<byte>)payload).GetMethodId(out this.currentMethodId);
this.methodFrame = payload;

return (MethodRegistry.HasContent(this.currentMethodId)) ? FrameType.ContentHeader : FrameType.None;
}

private FrameType ParseContentHeaderFrame(MemoryBlock payload)
{
((ReadOnlySpan<byte>)payload)[4..] // skip 2 obsolete shorts
.Read(out ulong contentSize);

this.pendingContentSize = (long)contentSize;
this.contentHeader = payload;
return FrameType.ContentBody;
}

private FrameType ParseContentBodyFrame(MemoryBlock payload)
{
if (this.contentBodyHead == null)
{
this.contentBodyHead = payload;
this.contentBodyTail = payload;
}
else
var expectedFrame = this.currentFrameHandler.AcceptFrame(type, payload);
if (expectedFrame == FrameType.Method)
{
this.contentBodyTail = this.contentBodyTail.Append(payload);
this.currentFrameHandler = null;
}

this.pendingContentSize -= payload.Length;
return (this.pendingContentSize > 0) ? FrameType.ContentBody : FrameType.None;
}
}
12 changes: 12 additions & 0 deletions src/RabbitMQ.Next/Channels/IFrameHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;
using RabbitMQ.Next.Buffers;
using RabbitMQ.Next.Transport;

namespace RabbitMQ.Next.Channels;

internal interface IFrameHandler
{
void Release(Exception ex);

FrameType AcceptFrame(FrameType type, MemoryBlock payload);
}
11 changes: 0 additions & 11 deletions src/RabbitMQ.Next/Channels/IMessageHandlerInternal.cs

This file was deleted.

32 changes: 0 additions & 32 deletions src/RabbitMQ.Next/Channels/MessageHandlerWrapper.cs

This file was deleted.

45 changes: 45 additions & 0 deletions src/RabbitMQ.Next/Channels/MethodFrameHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using System;
using Microsoft.Extensions.ObjectPool;
using RabbitMQ.Next.Buffers;
using RabbitMQ.Next.Methods;
using RabbitMQ.Next.Transport;
using RabbitMQ.Next.Transport.Methods;

namespace RabbitMQ.Next.Channels;

internal class MethodFrameHandler<TMethod> : IFrameHandler
where TMethod: struct, IIncomingMethod
{
private readonly IMessageHandler<TMethod> wrapped;
private readonly IMethodParser<TMethod> parser;
private readonly ObjectPool<MemoryBlock> memoryPool;

public MethodFrameHandler(IMessageHandler<TMethod> wrapped, IMethodParser<TMethod> parser, ObjectPool<MemoryBlock> memoryPool)
{
this.wrapped = wrapped;
this.parser = parser;
this.memoryPool = memoryPool;
}

public void Release(Exception ex = null)
{
this.wrapped.Release(ex);
}

public FrameType AcceptFrame(FrameType type, MemoryBlock payload)
{
if (type != FrameType.Method)
{
throw new InvalidOperationException($"Unexpected {type} frame, when Method frame was expected");
}

ReadOnlySpan<byte> data = payload;
data = data.Read(out uint _);

var methodArgs = this.parser.Parse(data);
this.memoryPool.Return(payload);
this.wrapped.Handle(methodArgs, null);

return FrameType.Method;
}
}
111 changes: 111 additions & 0 deletions src/RabbitMQ.Next/Channels/MethodWithContentFrameHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
using System;
using Microsoft.Extensions.ObjectPool;
using RabbitMQ.Next.Buffers;
using RabbitMQ.Next.Methods;
using RabbitMQ.Next.Transport;
using RabbitMQ.Next.Transport.Methods;

namespace RabbitMQ.Next.Channels;

internal class MethodWithContentFrameHandler<TMethod> : IFrameHandler
where TMethod: struct, IIncomingMethod
{
private readonly IMessageHandler<TMethod> wrapped;
private readonly IMethodParser<TMethod> parser;
private readonly ObjectPool<MemoryBlock> memoryPool;
private readonly ObjectPool<LazyMessageProperties> messagePropertiesPool;

private FrameType expectedFrameType = FrameType.Method;
private MemoryBlock methodFrame;
private MemoryBlock contentHeader;
private long pendingContentSize;
private MemoryBlock contentBodyHead;
private MemoryBlock contentBodyTail;

public MethodWithContentFrameHandler(IMessageHandler<TMethod> wrapped, IMethodParser<TMethod> parser, ObjectPool<MemoryBlock> memoryPool, ObjectPool<LazyMessageProperties> messagePropertiesPool)
{
this.wrapped = wrapped;
this.parser = parser;
this.memoryPool = memoryPool;
this.messagePropertiesPool = messagePropertiesPool;
}

public void Release(Exception ex = null)
{
this.wrapped.Release(ex);
}

public FrameType AcceptFrame(FrameType type, MemoryBlock payload)
{
if (type != this.expectedFrameType)
{
throw new InvalidOperationException($"Expected frame type is {this.expectedFrameType} but got {type}");
}

this.expectedFrameType = type switch
{
FrameType.Method => this.ParseMethodFrame(payload),
FrameType.ContentHeader => this.ParseContentHeaderFrame(payload),
FrameType.ContentBody => this.ParseContentBodyFrame(payload),
_ => throw new ArgumentOutOfRangeException(nameof(type), type, $"Non supported frame type: {type}"),
};

if (this.expectedFrameType == FrameType.None)
{
PayloadAccessor payloadAccessor = null;

if (this.contentHeader != null)
{
payloadAccessor = new PayloadAccessor(this.messagePropertiesPool, this.memoryPool, this.contentHeader, this.contentBodyHead);
}

var methodArgs = this.parser.Parse(((ReadOnlySpan<byte>)this.methodFrame).Read(out uint _));

this.wrapped.Handle(methodArgs, payloadAccessor);

this.memoryPool.Return(this.methodFrame);

// reset state
this.expectedFrameType = FrameType.Method;
this.methodFrame = null;
this.contentHeader = null;
this.pendingContentSize = 0;
this.contentBodyHead = null;
this.contentBodyTail = null;
}

return this.expectedFrameType;
}

private FrameType ParseMethodFrame(MemoryBlock payload)
{
this.methodFrame = payload;
return FrameType.ContentHeader;
}

private FrameType ParseContentHeaderFrame(MemoryBlock payload)
{
((ReadOnlySpan<byte>)payload)[4..] // skip 2 obsolete shorts
.Read(out ulong contentSize);

this.pendingContentSize = (long)contentSize;
this.contentHeader = payload;
return FrameType.ContentBody;
}

private FrameType ParseContentBodyFrame(MemoryBlock payload)
{
if (this.contentBodyHead == null)
{
this.contentBodyHead = payload;
this.contentBodyTail = payload;
}
else
{
this.contentBodyTail = this.contentBodyTail.Append(payload);
}

this.pendingContentSize -= payload.Length;
return (this.pendingContentSize > 0) ? FrameType.ContentBody : FrameType.None;
}
}
9 changes: 0 additions & 9 deletions src/RabbitMQ.Next/Transport/Framing.Method.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,4 @@ public static Span<byte> WriteMethodArgs<TMethod>(this Span<byte> buffer, TMetho
buffer = buffer.Write((uint)method.MethodId);
return formatter.Write(buffer, method);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ReadOnlyMemory<byte> GetMethodId(this ReadOnlyMemory<byte> data, out MethodId methodId)
{
data.Span.Read(out uint method);
methodId = (MethodId) method;

return data[sizeof(uint)..];
}
}
2 changes: 1 addition & 1 deletion src/RabbitMQ.Next/Transport/Methods/Basic/DeliverMethod.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace RabbitMQ.Next.Transport.Methods.Basic;

public readonly struct DeliverMethod : IIncomingMethod
public readonly struct DeliverMethod : IIncomingMethod, IHasContentMethod
{
public DeliverMethod(string exchange, string routingKey, string consumerTag, ulong deliveryTag, bool redelivered)
{
Expand Down
2 changes: 1 addition & 1 deletion src/RabbitMQ.Next/Transport/Methods/Basic/GetOkMethod.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace RabbitMQ.Next.Transport.Methods.Basic;

public readonly struct GetOkMethod : IIncomingMethod
public readonly struct GetOkMethod : IIncomingMethod, IHasContentMethod
{
public GetOkMethod(string exchange, string routingKey, ulong deliveryTag, bool redelivered, uint messageCount)
{
Expand Down
Loading

0 comments on commit f153691

Please sign in to comment.