Skip to content

Commit

Permalink
Do not create consumerTag string in HandleBasicDeliver when conumer e…
Browse files Browse the repository at this point in the history
…xists in the dictionary
  • Loading branch information
zavarkog committed Jun 24, 2022
1 parent a6ac5c4 commit 4b67b23
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 18 deletions.
5 changes: 3 additions & 2 deletions projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class ConsumerDispatcherBase
private protected IConsumerDispatcher _dispatcher;
private protected readonly AsyncBasicConsumerFake _consumer = new AsyncBasicConsumerFake(_autoResetEvent);
protected readonly string _consumerTag = "ConsumerTag";
protected static readonly byte[] _consumerTagBytes = Encoding.UTF8.GetBytes("ConsumerTag");
protected readonly ulong _deliveryTag = 500UL;
protected static readonly byte[] _exchange = Encoding.UTF8.GetBytes("Exchange");
protected static readonly byte[] _routingKey = Encoding.UTF8.GetBytes("RoutingKey");
Expand Down Expand Up @@ -42,7 +43,7 @@ public void AsyncConsumerDispatcher()
{
for (int i = 0; i < Count; i++)
{
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _body);
_dispatcher.HandleBasicDeliver(_consumerTagBytes, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _body);
}
_autoResetEvent.Wait();
_autoResetEvent.Reset();
Expand All @@ -60,7 +61,7 @@ public void ConsumerDispatcher()
{
for (int i = 0; i < Count; i++)
{
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _body);
_dispatcher.HandleBasicDeliver(_consumerTagBytes, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _body);
}
_autoResetEvent.Wait();
_autoResetEvent.Reset();
Expand Down
5 changes: 2 additions & 3 deletions projects/RabbitMQ.Client/client/framing/BasicDeliver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,22 @@
//---------------------------------------------------------------------------

using System;

using RabbitMQ.Client.client.framing;
using RabbitMQ.Client.Impl;

namespace RabbitMQ.Client.Framing.Impl
{
internal readonly struct BasicDeliver : IAmqpMethod
{
public readonly string _consumerTag;
public readonly ReadOnlyMemory<byte> _consumerTag;
public readonly ulong _deliveryTag;
public readonly bool _redelivered;
public readonly ReadOnlyMemory<byte> _exchange;
public readonly ReadOnlyMemory<byte> _routingKey;

public BasicDeliver(ReadOnlyMemory<byte> data)
{
int offset = WireFormatting.ReadShortstr(data.Span, out _consumerTag);
int offset = WireFormatting.ReadShortMemory(data, out _consumerTag);
offset += WireFormatting.ReadLonglong(data.Span.Slice(offset), out _deliveryTag);
offset += WireFormatting.ReadBits(data.Span.Slice(offset), out _redelivered);
offset += WireFormatting.ReadShortMemory(data.Slice(offset), out _exchange);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,53 +1,88 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Util;

namespace RabbitMQ.Client.ConsumerDispatching
{
#nullable enable
internal abstract class ConsumerDispatcherBase
{
private static readonly FallbackConsumer fallbackConsumer = new FallbackConsumer();
private readonly Dictionary<string, IBasicConsumer> _consumers;
private readonly Dictionary<ReadOnlyMemory<byte>, (IBasicConsumer consumer, string consumerTag)> _consumers;

public IBasicConsumer? DefaultConsumer { get; set; }

protected ConsumerDispatcherBase()
{
_consumers = new Dictionary<string, IBasicConsumer>();
_consumers = new Dictionary<ReadOnlyMemory<byte>, (IBasicConsumer, string)>(MemoryOfByteEqualityComparer.Instance);
}

protected void AddConsumer(IBasicConsumer consumer, string tag)
{
lock (_consumers)
{
_consumers[tag] = consumer;
var tagBytes = Encoding.UTF8.GetBytes(tag);

This comment has been minimized.

Copy link
@zgabi

zgabi Jun 24, 2022

Owner

This is a new byte[] allocation, but hopefully AddConsumer is called more infrequently than the Dispatch method

_consumers[tagBytes] = (consumer, tag);
}
}

protected IBasicConsumer GetConsumerOrDefault(string tag)
protected (IBasicConsumer consumer, string consumerTag) GetConsumerOrDefault(ReadOnlyMemory<byte> tag)
{
lock (_consumers)
{
return _consumers.TryGetValue(tag, out var consumer) ? consumer : GetDefaultOrFallbackConsumer();
if (_consumers.TryGetValue(tag, out var consumerPair))
{
return consumerPair;
}

#if NETCOREAPP

This comment has been minimized.

Copy link
@zgabi

zgabi Jun 24, 2022

Owner

Create the string only when no consumer exists. Since earlier it was always created, this is not slower than earlier.
But how can that happen that there is no consumer for the specified consumer tag?

This comment has been minimized.

Copy link
@danielmarbach

danielmarbach Jun 24, 2022

Why not if #NET?

This comment has been minimized.

This comment has been minimized.

Copy link
@zgabi

zgabi Jun 24, 2022

Owner

But of course it would be good to use #if NETSTANDARD and #if NET6_0_OR_GREATER everywhere... nothing else... currently there are #if NETCOREAPP and #if NET452 too in several files

This comment has been minimized.

Copy link
@danielmarbach

danielmarbach Jun 24, 2022

Yeah sorry don't want to derail this then. But really this should be aligned holistically. But definitely not the concern of this PR

var consumerTag = Encoding.UTF8.GetString(tag.Span);
#else
string consumerTag;
unsafe
{
fixed (byte* bytes = tag.Span)
{
consumerTag = Encoding.UTF8.GetString(bytes, tag.Length);
}
}
#endif

return (GetDefaultOrFallbackConsumer(), consumerTag);
}
}

public IBasicConsumer GetAndRemoveConsumer(string tag)
{
lock (_consumers)
{
return _consumers.Remove(tag, out var consumer) ? consumer : GetDefaultOrFallbackConsumer();
var utf8 = Encoding.UTF8;
#if NETCOREAPP

This comment has been minimized.

Copy link
@danielmarbach

danielmarbach Jun 24, 2022

I think you can get the pool also for netstandard by referencing the buffers package

This comment has been minimized.

Copy link
@zgabi

zgabi Jun 24, 2022

Owner

True,

How about this?

        public IBasicConsumer GetAndRemoveConsumer(string tag)
        {
            lock (_consumers)
            {
                var utf8 = Encoding.UTF8;
                var pool = ArrayPool<byte>.Shared;
                var buf = pool.Rent(utf8.GetMaxByteCount(tag.Length));
#if NETSTANDARD
                int count = utf8.GetBytes(tag, 0, tag.Length, buf, 0);
#else
                int count = utf8.GetBytes(tag, buf);
#endif
                var memory = buf.AsMemory(0, count);
                var result = _consumers.Remove(memory, out var consumerPair) ? consumerPair.consumer : GetDefaultOrFallbackConsumer();
                pool.Return(buf);
                return result;
            }
        }

This comment has been minimized.

Copy link
@danielmarbach

danielmarbach Jun 24, 2022

Lgtm

var pool = ArrayPool<byte>.Shared;

This comment has been minimized.

Copy link
@danielmarbach

danielmarbach Jun 24, 2022

How about stack allocation for smallish consumer tags?

This comment has been minimized.

Copy link
@zgabi

zgabi Jun 24, 2022

Owner

_consumer.Remove needs a ReadOnlyMemory (since it is a key in the dictionary), can't pass a stack allocated ROS :(
It is not possible to lookup/remove item from dictionary by a span key.
Hopefully the framework guys will solve it somehow later... for example:
dotnet/runtime#27229

This comment has been minimized.

Copy link
@danielmarbach

danielmarbach Jun 24, 2022

Nice. Wasn't aware of that! Every day something new 😅

var buf = pool.Rent(utf8.GetMaxByteCount(tag.Length));
int count = utf8.GetBytes(tag, buf);
var memory = buf.AsMemory(0, count);
#else
var memory = utf8.GetBytes(tag).AsMemory();

This comment has been minimized.

Copy link
@zgabi

zgabi Jun 24, 2022

Owner

Netstandard version allocates a byte[]. This can be removed if needed, but as I saw, the netstandard version is doing similar thing, for example in SocketFrameHandler it allocates an array while net6 using stackalloc.

#endif
var result = _consumers.Remove(memory, out var consumerPair) ? consumerPair.consumer : GetDefaultOrFallbackConsumer();
#if NETCOREAPP
pool.Return(buf);
#endif
return result;
}
}

public Task ShutdownAsync(ShutdownEventArgs reason)
{
lock (_consumers)
{
foreach (KeyValuePair<string, IBasicConsumer> pair in _consumers)
foreach (KeyValuePair<ReadOnlyMemory<byte>, (IBasicConsumer consumer, string consumerTag)> pair in _consumers)
{
ShutdownConsumer(pair.Value, reason);
ShutdownConsumer(pair.Value.consumer, reason);
}
_consumers.Clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,13 @@ public void HandleBasicConsumeOk(IBasicConsumer consumer, string consumerTag)
}
}

public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered,
public void HandleBasicDeliver(ReadOnlyMemory<byte> consumerTag, ulong deliveryTag, bool redelivered,
ReadOnlyMemory<byte> exchange, ReadOnlyMemory<byte> routingKey, in ReadOnlyBasicProperties basicProperties, ReadOnlyMemory<byte> body, byte[] rentedArray)
{
if (!IsShutdown)
{
_writer.TryWrite(new WorkStruct(GetConsumerOrDefault(consumerTag), consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedArray));
var consumerPair = GetConsumerOrDefault(consumerTag);
_writer.TryWrite(new WorkStruct(consumerPair.consumer, consumerPair.consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedArray));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ internal interface IConsumerDispatcher

void HandleBasicConsumeOk(IBasicConsumer consumer, string consumerTag);

void HandleBasicDeliver(string consumerTag,
void HandleBasicDeliver(ReadOnlyMemory<byte> consumerTag,
ulong deliveryTag,
bool redelivered,
ReadOnlyMemory<byte> exchange,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
using System.Collections;
using System.Collections.Generic;
using System.Runtime.CompilerServices;

using RabbitMQ.Client.Exceptions;

This comment has been minimized.

Copy link
@zgabi

zgabi Jun 24, 2022

Owner

This change is a mistake. I'll revert it.
However the empty lines between the usings are inconsistent in this library. Sometimes you have empty line after the System usings, sometimes not.

using RabbitMQ.Util;

Expand Down
34 changes: 34 additions & 0 deletions projects/RabbitMQ.Client/util/MemoryOfByteEqualityComparer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using System;
using System.Collections.Generic;

namespace RabbitMQ.Util;

public sealed class MemoryOfByteEqualityComparer : IEqualityComparer<ReadOnlyMemory<byte>>
{
public static MemoryOfByteEqualityComparer Instance { get; } = new MemoryOfByteEqualityComparer();

public bool Equals(ReadOnlyMemory<byte> left, ReadOnlyMemory<byte> right)
{
return left.Span.SequenceEqual(right.Span);
}

public int GetHashCode(ReadOnlyMemory<byte> value)
{
#if NETSTANDARD
unchecked
{
int hashCode = 0;
foreach (byte item in value.Span)

This comment has been minimized.

Copy link
@zgabi

zgabi Jun 24, 2022

Owner

This one should be fixed. Calculating the hash per bytes are very inefficient. If this commit is basically OK for you I'll make (/find) a better hash for netstandard version.

This comment has been minimized.

Copy link
@danielmarbach

danielmarbach Jun 24, 2022

If thought there are extension methods available for netstandard. Was it memory extensions or something? I have to check

This comment has been minimized.

Copy link
@danielmarbach

danielmarbach Jun 24, 2022

Ah they are for netstandard 2.1 😔

{
hashCode = (hashCode * 397) ^ item.GetHashCode();
}

return hashCode;
}
#else
HashCode result = default;
result.AddBytes(value.Span);
return result.ToHashCode();
#endif
}
}

0 comments on commit 4b67b23

Please sign in to comment.