Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix issue 868 #878

Merged
merged 5 commits into from Jul 6, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 24 additions & 5 deletions projects/Apigen/apigen/Apigen.cs
Expand Up @@ -855,9 +855,28 @@ public void EmitClassMethodImplementations(AmqpClass c)
EmitLine("");
EmitLine(" public override void WriteArgumentsTo(ref Client.Impl.MethodArgumentWriter writer)");
EmitLine(" {");
var lastWasBitClass = false;
foreach (AmqpField f in m.m_Fields)
{
EmitLine($" writer.Write{MangleClass(ResolveDomain(f.Domain))}(_{MangleMethod(f.Name)});");
string mangleClass = MangleClass(ResolveDomain(f.Domain));
if (mangleClass != "Bit")
{
if (lastWasBitClass)
{
EmitLine($" writer.EndBits();");
lastWasBitClass = false;
}
}
else
{
lastWasBitClass = true;
}

EmitLine($" writer.Write{mangleClass}(_{MangleMethod(f.Name)});");
}
if (lastWasBitClass)
{
EmitLine($" writer.EndBits();");
}
EmitLine(" }");
EmitLine("");
Expand Down Expand Up @@ -933,14 +952,14 @@ public void EmitClassMethodImplementations(AmqpClass c)

public void EmitMethodArgumentReader()
{
EmitLine(" internal override Client.Impl.MethodBase DecodeMethodFrom(ReadOnlyMemory<byte> memory)");
EmitLine(" internal override Client.Impl.MethodBase DecodeMethodFrom(ReadOnlySpan<byte> span)");
EmitLine(" {");
EmitLine(" ushort classId = Util.NetworkOrderDeserializer.ReadUInt16(memory.Span);");
EmitLine(" ushort methodId = Util.NetworkOrderDeserializer.ReadUInt16(memory.Slice(2).Span);");
EmitLine(" ushort classId = Util.NetworkOrderDeserializer.ReadUInt16(span);");
EmitLine(" ushort methodId = Util.NetworkOrderDeserializer.ReadUInt16(span.Slice(2));");
EmitLine(" Client.Impl.MethodBase result = DecodeMethodFrom(classId, methodId);");
EmitLine(" if(result != null)");
EmitLine(" {");
EmitLine(" Client.Impl.MethodArgumentReader reader = new Client.Impl.MethodArgumentReader(memory.Slice(4));");
EmitLine(" Client.Impl.MethodArgumentReader reader = new Client.Impl.MethodArgumentReader(span.Slice(4));");
EmitLine(" result.ReadArgumentsFrom(ref reader);");
EmitLine(" return result;");
EmitLine(" }");
Expand Down
1 change: 1 addition & 0 deletions projects/RabbitMQ.Client/RabbitMQ.Client.csproj
Expand Up @@ -26,6 +26,7 @@
<MinVerVerbosity>minimal</MinVerVerbosity>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageOutputPath>..\..\packages</PackageOutputPath>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

<PropertyGroup Condition="'$(CONCOURSE_CI_BUILD)' == 'true'">
Expand Down
70 changes: 40 additions & 30 deletions projects/RabbitMQ.Client/client/impl/Command.cs
Expand Up @@ -40,9 +40,7 @@

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing.Impl;

namespace RabbitMQ.Client.Impl
Expand All @@ -57,11 +55,6 @@ class Command : IDisposable
private const int EmptyFrameSize = 8;
private readonly bool _returnBufferOnDispose;

static Command()
{
CheckEmptyFrameSize();
}

internal Command(MethodBase method) : this(method, null, null, false)
{
}
Expand All @@ -80,38 +73,55 @@ public Command(MethodBase method, ContentHeaderBase header, ReadOnlyMemory<byte>

internal MethodBase Method { get; private set; }

public static void CheckEmptyFrameSize()
internal void Transmit(ushort channelNumber, Connection connection)
{
var f = new EmptyOutboundFrame();
byte[] b = new byte[f.GetMinimumBufferSize()];
f.WriteTo(b);
long actualLength = f.ByteCount;
int maxBodyPayloadBytes = (int)(connection.FrameMax == 0 ? int.MaxValue : connection.FrameMax - EmptyFrameSize);
var size = GetMaxSize(maxBodyPayloadBytes);
var memory = new Memory<byte>(ArrayPool<byte>.Shared.Rent(size), 0, size);
bollhals marked this conversation as resolved.
Show resolved Hide resolved
var span = memory.Span;

var offset = Framing.Method.WriteTo(span, channelNumber, Method);
if (Method.HasContent)
{
int remainingBodyBytes = Body.Length;
offset += Framing.Header.WriteTo(span.Slice(offset), channelNumber, Header, remainingBodyBytes);
var bodySpan = Body.Span;
while (remainingBodyBytes > 0)
{
int frameSize = remainingBodyBytes > maxBodyPayloadBytes ? maxBodyPayloadBytes : remainingBodyBytes;
offset += Framing.BodySegment.WriteTo(span.Slice(offset), channelNumber, bodySpan.Slice(bodySpan.Length - remainingBodyBytes, frameSize));
remainingBodyBytes -= frameSize;
}
}

if (offset != size)
{
throw new InvalidOperationException($"Serialized to wrong size, expect {size}, offset {offset}");
}

if (EmptyFrameSize != actualLength)
connection.Write(memory);
}

private int GetMaxSize(int maxPayloadBytes)
{
if (!Method.HasContent)
{
string message =
string.Format("EmptyFrameSize is incorrect - defined as {0} where the computed value is in fact {1}.",
EmptyFrameSize,
actualLength);
throw new ProtocolViolationException(message);
return Framing.Method.FrameSize + Method.GetRequiredBufferSize();
}

return Framing.Method.FrameSize + Method.GetRequiredBufferSize() +
Framing.Header.FrameSize + Header.GetRequiredPayloadBufferSize() +
Framing.BodySegment.FrameSize * GetBodyFrameCount(maxPayloadBytes) + Body.Length;
}

internal void Transmit(int channelNumber, Connection connection)
private int GetBodyFrameCount(int maxPayloadBytes)
{
connection.WriteFrame(new MethodOutboundFrame(channelNumber, Method));
if (Method.HasContent)
if (maxPayloadBytes == int.MaxValue)
{
connection.WriteFrame(new HeaderOutboundFrame(channelNumber, Header, Body.Length));
int frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
int bodyPayloadMax = (frameMax == 0) ? Body.Length : frameMax - EmptyFrameSize;
for (int offset = 0; offset < Body.Length; offset += bodyPayloadMax)
{
int remaining = Body.Length - offset;
int count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
connection.WriteFrame(new BodySegmentOutboundFrame(channelNumber, Body.Slice(offset, count)));
}
return 1;
}

return (Body.Length + maxPayloadBytes - 1) / maxPayloadBytes;
}

public void Dispose()
Expand Down
9 changes: 6 additions & 3 deletions projects/RabbitMQ.Client/client/impl/CommandAssembler.cs
Expand Up @@ -81,16 +81,19 @@ public Command HandleFrame(in InboundFrame f)
{
throw new UnexpectedFrameException(f.Type);
}
m_method = m_protocol.DecodeMethodFrom(f.Payload);
m_method = m_protocol.DecodeMethodFrom(f.Payload.Span);
m_state = m_method.HasContent ? AssemblyState.ExpectingContentHeader : AssemblyState.Complete;
return CompletedCommand();
case AssemblyState.ExpectingContentHeader:
if (!f.IsHeader())
{
throw new UnexpectedFrameException(f.Type);
}
m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(f.Payload.Span));
ulong totalBodyBytes = m_header.ReadFrom(f.Payload.Slice(2));

ReadOnlySpan<byte> span = f.Payload.Span;
m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(span));
m_header.ReadFrom(span.Slice(12));
ulong totalBodyBytes = NetworkOrderDeserializer.ReadUInt64(span.Slice(4));
if (totalBodyBytes > MaxArrayOfBytesSize)
{
throw new UnexpectedFrameException(f.Type);
Expand Down
10 changes: 6 additions & 4 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Expand Up @@ -39,6 +39,7 @@
//---------------------------------------------------------------------------

using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Net;
Expand All @@ -62,7 +63,6 @@ internal sealed class Connection : IConnection
private readonly object _eventLock = new object();

///<summary>Heartbeat frame for transmission. Reusable across connections.</summary>
private readonly EmptyOutboundFrame _heartbeatFrame = new EmptyOutboundFrame();

private readonly ManualResetEventSlim _appContinuation = new ManualResetEventSlim(false);

Expand Down Expand Up @@ -902,7 +902,9 @@ public void HeartbeatWriteTimerCallback(object state)
{
if (!_closed)
{
WriteFrame(_heartbeatFrame);
var memory = new Memory<byte>(ArrayPool<byte>.Shared.Rent(Client.Impl.Framing.Heartbeat.FrameSize), 0, Client.Impl.Framing.Heartbeat.FrameSize);
bollhals marked this conversation as resolved.
Show resolved Hide resolved
Client.Impl.Framing.Heartbeat.Payload.CopyTo(memory.Span);
Write(memory);
_heartbeatWriteTimer?.Change((int)_heartbeatTimeSpan.TotalMilliseconds, Timeout.Infinite);
}
}
Expand Down Expand Up @@ -939,9 +941,9 @@ public override string ToString()
return string.Format("Connection({0},{1})", _id, Endpoint);
}

public void WriteFrame(OutboundFrame f)
public void Write(Memory<byte> memory)
{
_frameHandler.WriteFrame(f);
_frameHandler.Write(memory);
}

public void UpdateSecret(string newSecret, string reason)
Expand Down
26 changes: 2 additions & 24 deletions projects/RabbitMQ.Client/client/impl/ContentHeaderBase.cs
Expand Up @@ -41,8 +41,6 @@
using System;
using System.Text;

using RabbitMQ.Util;

namespace RabbitMQ.Client.Impl
{
abstract class ContentHeaderBase : IContentHeader
Expand All @@ -67,35 +65,15 @@ public virtual object Clone()
///<summary>
/// Fill this instance from the given byte buffer stream.
///</summary>
internal ulong ReadFrom(ReadOnlyMemory<byte> memory)
internal void ReadFrom(ReadOnlySpan<byte> span)
{
// Skipping the first two bytes since they arent used (weight - not currently used)
ulong bodySize = NetworkOrderDeserializer.ReadUInt64(memory.Slice(2).Span);
ContentHeaderPropertyReader reader = new ContentHeaderPropertyReader(memory.Slice(10));
ContentHeaderPropertyReader reader = new ContentHeaderPropertyReader(span);
ReadPropertiesFrom(ref reader);
return bodySize;
}

internal abstract void ReadPropertiesFrom(ref ContentHeaderPropertyReader reader);
internal abstract void WritePropertiesTo(ref ContentHeaderPropertyWriter writer);

private const ushort ZERO = 0;

internal int WriteTo(Memory<byte> memory, ulong bodySize)
{
NetworkOrderSerializer.WriteUInt16(memory.Span, ZERO); // Weight - not used
NetworkOrderSerializer.WriteUInt64(memory.Slice(2).Span, bodySize);

ContentHeaderPropertyWriter writer = new ContentHeaderPropertyWriter(memory.Slice(10));
WritePropertiesTo(ref writer);
return 10 + writer.Offset;
}
public int GetRequiredBufferSize()
{
// The first 10 bytes are the Weight (2 bytes) + body size (8 bytes)
return 10 + GetRequiredPayloadBufferSize();
}

public abstract int GetRequiredPayloadBufferSize();
}
}