Skip to content

Commit

Permalink
Headers parsing (#63)
Browse files Browse the repository at this point in the history
Implementations of HPUB and HMSG messages.

Also includes minor code clean-up and some test improvements to reduce false positives.

* WIP: Headers parsing

* Headers tests

* Handle empty headers

* Update licence information

Co-authored-by: Caleb Lloyd <2414837+caleblloyd@users.noreply.github.com>

* Update licence information

Co-authored-by: Caleb Lloyd <2414837+caleblloyd@users.noreply.github.com>

* Moved HeaderParser to connection

Also, test resilience improvements.

* Fixed cluster reconnect tests

This was caused by recent nats-server fix.

* Bumped up the test timeout to 10 minutes

GitHub CI seems to have slowed down!?
On my local ubuntu vm runs under a minute.

* WIP: Investigating GitHub test runner

* WIP: Investigating GitHub test runner

* Fixing tests

Separated server version read.

---------

Co-authored-by: Caleb Lloyd <2414837+caleblloyd@users.noreply.github.com>
  • Loading branch information
mtmk and caleblloyd committed Jun 21, 2023
1 parent e5cd29e commit 7d0ecad
Show file tree
Hide file tree
Showing 39 changed files with 1,600 additions and 688 deletions.
14 changes: 14 additions & 0 deletions NATS.Client.sln
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.Core.PublishModel",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Core.MemoryTests", "tests\NATS.Client.Core.MemoryTests\NATS.Client.Core.MemoryTests.csproj", "{B26DE6AC-A4D5-4427-8453-EE3514E4B513}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.Core.PublishHeaders", "sandbox\Example.Core.PublishHeaders\Example.Core.PublishHeaders.csproj", "{B0C82F24-BDEC-4420-A02A-F74E2423D755}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.Core.SubscribeHeaders", "sandbox\Example.Core.SubscribeHeaders\Example.Core.SubscribeHeaders.csproj", "{A96660DB-DAEB-4C57-8096-F236AC4FA927}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -115,6 +119,14 @@ Global
{B26DE6AC-A4D5-4427-8453-EE3514E4B513}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B26DE6AC-A4D5-4427-8453-EE3514E4B513}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B26DE6AC-A4D5-4427-8453-EE3514E4B513}.Release|Any CPU.Build.0 = Release|Any CPU
{B0C82F24-BDEC-4420-A02A-F74E2423D755}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B0C82F24-BDEC-4420-A02A-F74E2423D755}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B0C82F24-BDEC-4420-A02A-F74E2423D755}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B0C82F24-BDEC-4420-A02A-F74E2423D755}.Release|Any CPU.Build.0 = Release|Any CPU
{A96660DB-DAEB-4C57-8096-F236AC4FA927}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A96660DB-DAEB-4C57-8096-F236AC4FA927}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A96660DB-DAEB-4C57-8096-F236AC4FA927}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A96660DB-DAEB-4C57-8096-F236AC4FA927}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -136,6 +148,8 @@ Global
{C85BA135-3C21-4027-BE5A-849E1011DD0A} = {95A69671-16CA-4133-981C-CC381B7AAA30}
{29F96D05-D02F-4610-A8FB-3527BF83C4A5} = {95A69671-16CA-4133-981C-CC381B7AAA30}
{B26DE6AC-A4D5-4427-8453-EE3514E4B513} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{B0C82F24-BDEC-4420-A02A-F74E2423D755} = {95A69671-16CA-4133-981C-CC381B7AAA30}
{A96660DB-DAEB-4C57-8096-F236AC4FA927} = {95A69671-16CA-4133-981C-CC381B7AAA30}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {8CBB7278-D093-448E-B3DE-B5991209A1AA}
Expand Down
6 changes: 6 additions & 0 deletions NATS.Client.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=ASCII/@EntryIndexedValue">ASCII</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=CR/@EntryIndexedValue">CR</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=LF/@EntryIndexedValue">LF</s:String>
<s:Boolean x:Key="/Default/UserDictionary/Words/=HMSG/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=HPUB/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\NATS.Client.Core\NATS.Client.Core.csproj" />
</ItemGroup>

</Project>
31 changes: 31 additions & 0 deletions sandbox/Example.Core.PublishHeaders/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// > nats sub bar.*
using Microsoft.Extensions.Logging;
using NATS.Client.Core;

var subject = "bar.xyz";
var options = NatsOptions.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };

Print("[CON] Connecting...\n");

await using var connection = new NatsConnection(options);

for (int i = 0; i < 10; i++)
{
Print($"[PUB] Publishing to subject ({i}) '{subject}'...\n");
await connection.PublishAsync<Bar>(
subject,
new Bar { Id = i, Name = "Baz" },
new NatsPubOpts { Headers = new NatsHeaders { ["XFoo"] = $"bar{i}" } });
}

void Print(string message)
{
Console.Write($"{DateTime.Now:HH:mm:ss} {message}");
}

public record Bar
{
public int Id { get; set; }

public string? Name { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\NATS.Client.Core\NATS.Client.Core.csproj" />
</ItemGroup>

</Project>
41 changes: 41 additions & 0 deletions sandbox/Example.Core.SubscribeHeaders/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// > nats pub bar.xyz --count=10 "my_message_{{ Count }}" -H X-Foo:Baz

using System.Text;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;

var subject = "bar.*";
var options = NatsOptions.Default with { LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Error) };

Print("[CON] Connecting...\n");

await using var connection = new NatsConnection(options);

Print($"[SUB] Subscribing to subject '{subject}'...\n");

NatsSub sub = await connection.SubscribeAsync(subject);

await foreach (var msg in sub.Msgs.ReadAllAsync())
{
Print($"[RCV] {msg.Subject}: {Encoding.UTF8.GetString(msg.Data.Span)}\n");
if (msg.Headers != null)
{
foreach (var (key, values) in msg.Headers)
{
foreach (var value in values)
Print($" {key}: {value}\n");
}
}
}

void Print(string message)
{
Console.Write($"{DateTime.Now:HH:mm:ss} {message}");
}

public record Bar
{
public int Id { get; set; }

public string? Name { get; set; }
}
6 changes: 6 additions & 0 deletions src/NATS.Client.Core/Commands/CommandConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ internal static class CommandConstants
// string.Join(",", Encoding.ASCII.GetBytes("PUB "))
public static ReadOnlySpan<byte> PubWithPadding => new byte[] { 80, 85, 66, 32 };

// string.Join(",", Encoding.ASCII.GetBytes("HPUB "))
public static ReadOnlySpan<byte> HPubWithPadding => new byte[] { 72, 80, 85, 66, 32 };

// string.Join(",", Encoding.ASCII.GetBytes("SUB "))
public static ReadOnlySpan<byte> SubWithPadding => new byte[] { 83, 85, 66, 32 };

Expand All @@ -24,4 +27,7 @@ internal static class CommandConstants

// string.Join(",", Encoding.ASCII.GetBytes("PONG\r\n"))
public static ReadOnlySpan<byte> PongNewLine => new byte[] { 80, 79, 78, 71, 13, 10 };

// string.Join(",", Encoding.ASCII.GetBytes("NATS/1.0\r\n"))
public static ReadOnlySpan<byte> NatsHeaders10NewLine => new byte[] { 78, 65, 84, 83, 47, 49, 46, 48, 13, 10 };
}
170 changes: 39 additions & 131 deletions src/NATS.Client.Core/Commands/ProtocolWriter.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Buffers;
using System.Buffers.Text;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using NATS.Client.Core.Internal;
Expand All @@ -12,6 +13,9 @@ internal sealed class ProtocolWriter
private const int NewLineLength = 2; // \r\n

private readonly FixedArrayBufferWriter _writer; // where T : IBufferWriter<byte>
private readonly FixedArrayBufferWriter _bufferHeaders = new();
private readonly FixedArrayBufferWriter _bufferPayload = new();
private readonly HeaderWriter _headerWriter = new(Encoding.UTF8);

public ProtocolWriter(FixedArrayBufferWriter writer)
{
Expand Down Expand Up @@ -46,159 +50,63 @@ public void WritePong()
}

// https://docs.nats.io/reference/reference-protocols/nats-protocol#pub
// PUB <subject> [reply-to] <#bytes>\r\n[payload]
// To omit the payload, set the payload size to 0, but the second CRLF is still required.
public void WritePublish(string subject, string? replyTo, ReadOnlySequence<byte> payload)
// PUB <subject> [reply-to] <#bytes>\r\n[payload]\r\n
public void WritePublish(string subject, string? replyTo, NatsHeaders? headers, ReadOnlySequence<byte> payload)
{
var offset = 0;
var maxLength = CommandConstants.PubWithPadding.Length
+ subject.Length + 1 // with space padding
+ (replyTo == null ? 0 : replyTo.Length + 1)
+ MaxIntStringLength
+ NewLineLength
+ (int)payload.Length
+ NewLineLength;

var writableSpan = _writer.GetSpan(maxLength);

CommandConstants.PubWithPadding.CopyTo(writableSpan);
offset += CommandConstants.PubWithPadding.Length;
// We use a separate buffer to write the headers so that we can calculate the
// size before we write to the output buffer '_writer'.
if (headers != null)
{
_bufferHeaders.Reset();
_headerWriter.Write(_bufferHeaders, headers);
}

subject.WriteASCIIBytes(writableSpan.Slice(offset));
offset += subject.Length;
writableSpan.Slice(offset)[0] = (byte)' ';
offset += 1;
// Start writing the message to buffer:
// PUP / HPUB
_writer.WriteSpan(headers == null ? CommandConstants.PubWithPadding : CommandConstants.HPubWithPadding);
_writer.WriteASCIIAndSpace(subject);

if (replyTo != null)
{
replyTo.WriteASCIIBytes(writableSpan.Slice(offset));
offset += replyTo.Length;
writableSpan.Slice(offset)[0] = (byte)' ';
offset += 1;
_writer.WriteASCIIAndSpace(replyTo);
}

if (!Utf8Formatter.TryFormat(payload.Length, writableSpan.Slice(offset), out var written))
if (headers == null)
{
throw new NatsException("Can not format integer.");
_writer.WriteNumber(payload.Length);
}

offset += written;

CommandConstants.NewLine.CopyTo(writableSpan.Slice(offset));
offset += CommandConstants.NewLine.Length;

if (payload.Length != 0)
else
{
payload.CopyTo(writableSpan.Slice(offset));
offset += (int)payload.Length;
var headersLength = _bufferHeaders.WrittenSpan.Length;
_writer.WriteNumber(CommandConstants.NatsHeaders10NewLine.Length + headersLength);
_writer.WriteSpace();
var total = CommandConstants.NatsHeaders10NewLine.Length + headersLength + payload.Length;
_writer.WriteNumber(total);
}

CommandConstants.NewLine.CopyTo(writableSpan.Slice(offset));
offset += CommandConstants.NewLine.Length;

_writer.Advance(offset);
}

public void WritePublish<T>(string subject, string? replyTo, T? value, INatsSerializer serializer)
{
var offset = 0;
var maxLengthWithoutPayload = CommandConstants.PubWithPadding.Length
+ subject.Length + 1
+ (replyTo == null ? 0 : replyTo.Length + 1)
+ MaxIntStringLength
+ NewLineLength;

var writableSpan = _writer.GetSpan(maxLengthWithoutPayload);

CommandConstants.PubWithPadding.CopyTo(writableSpan);
offset += CommandConstants.PubWithPadding.Length;

subject.WriteASCIIBytes(writableSpan.Slice(offset));
offset += subject.Length;
writableSpan.Slice(offset)[0] = (byte)' ';
offset += 1;
// End of message first line
_writer.WriteNewLine();

if (replyTo != null)
if (headers != null)
{
replyTo.WriteASCIIBytes(writableSpan.Slice(offset));
offset += replyTo.Length;
writableSpan.Slice(offset)[0] = (byte)' ';
offset += 1;
_writer.WriteSpan(CommandConstants.NatsHeaders10NewLine);
_writer.WriteSpan(_bufferHeaders.WrittenSpan);
}

// Advance for written.
_writer.Advance(offset);

// preallocate range for write #bytes(write after serialized)
var preallocatedRange = _writer.PreAllocate(MaxIntStringLength);
offset += MaxIntStringLength;

CommandConstants.NewLine.CopyTo(writableSpan.Slice(offset));
_writer.Advance(CommandConstants.NewLine.Length);

var payloadLength = serializer.Serialize(_writer, value);
var payloadLengthSpan = _writer.GetSpanInPreAllocated(preallocatedRange);
payloadLengthSpan.Fill((byte)' ');
if (!Utf8Formatter.TryFormat(payloadLength, payloadLengthSpan, out var written))
if (payload.Length != 0)
{
throw new NatsException("Can not format integer.");
_writer.WriteSequence(payload);
}

WriteConstant(CommandConstants.NewLine);
_writer.WriteNewLine();
}

public void WritePublish<T>(string subject, ReadOnlyMemory<byte> inboxPrefix, int id, T? value, INatsSerializer serializer)
public void WritePublish<T>(string subject, string? replyTo, NatsHeaders? headers, T? value, INatsSerializer serializer)
{
Span<byte> idBytes = stackalloc byte[10];
if (Utf8Formatter.TryFormat(id, idBytes, out var written))
{
idBytes = idBytes.Slice(0, written);
}

var offset = 0;
var maxLengthWithoutPayload = CommandConstants.PubWithPadding.Length
+ subject.Length + 1
+ (inboxPrefix.Length + idBytes.Length + 1) // with space
+ MaxIntStringLength
+ NewLineLength;

var writableSpan = _writer.GetSpan(maxLengthWithoutPayload);

CommandConstants.PubWithPadding.CopyTo(writableSpan);
offset += CommandConstants.PubWithPadding.Length;

subject.WriteASCIIBytes(writableSpan.Slice(offset));
offset += subject.Length;
writableSpan.Slice(offset)[0] = (byte)' ';
offset += 1;

// build reply-to
inboxPrefix.Span.CopyTo(writableSpan.Slice(offset));
offset += inboxPrefix.Length;
idBytes.CopyTo(writableSpan.Slice(offset));
offset += idBytes.Length;
writableSpan.Slice(offset)[0] = (byte)' ';
offset += 1;

// Advance for written.
_writer.Advance(offset);

// preallocate range for write #bytes(write after serialized)
var preallocatedRange = _writer.PreAllocate(MaxIntStringLength);
offset += MaxIntStringLength;

CommandConstants.NewLine.CopyTo(writableSpan.Slice(offset));
_writer.Advance(CommandConstants.NewLine.Length);

var payloadLength = serializer.Serialize(_writer, value);
var payloadLengthSpan = _writer.GetSpanInPreAllocated(preallocatedRange);
payloadLengthSpan.Fill((byte)' ');
if (!Utf8Formatter.TryFormat(payloadLength, payloadLengthSpan, out written))
{
throw new NatsException("Can not format integer.");
}

WriteConstant(CommandConstants.NewLine);
_bufferPayload.Reset();
serializer.Serialize(_bufferPayload, value);
var payload = new ReadOnlySequence<byte>(_bufferPayload.WrittenMemory);
WritePublish(subject, replyTo, headers, payload);
}

// https://docs.nats.io/reference/reference-protocols/nats-protocol#sub
Expand Down
Loading

0 comments on commit 7d0ecad

Please sign in to comment.