From bfba1e748083ce8ec9c6ecf99ba355c1afbfee1c Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sun, 24 Mar 2024 00:05:58 +0000 Subject: [PATCH 1/8] Request reply benchmarks --- sandbox/MicroBenchmark/MicroBenchmark.csproj | 1 + sandbox/MicroBenchmark/RequestReplyBench.cs | 31 ++++++++++++++++++++ 2 files changed, 32 insertions(+) create mode 100644 sandbox/MicroBenchmark/RequestReplyBench.cs diff --git a/sandbox/MicroBenchmark/MicroBenchmark.csproj b/sandbox/MicroBenchmark/MicroBenchmark.csproj index a46099bf1..859e55d7d 100644 --- a/sandbox/MicroBenchmark/MicroBenchmark.csproj +++ b/sandbox/MicroBenchmark/MicroBenchmark.csproj @@ -18,6 +18,7 @@ + diff --git a/sandbox/MicroBenchmark/RequestReplyBench.cs b/sandbox/MicroBenchmark/RequestReplyBench.cs new file mode 100644 index 000000000..9a88a38d9 --- /dev/null +++ b/sandbox/MicroBenchmark/RequestReplyBench.cs @@ -0,0 +1,31 @@ +using BenchmarkDotNet.Attributes; +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.JetStream.Models; + +#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable. + +namespace MicroBenchmark; + +[MemoryDiagnoser] +[ShortRunJob] +[PlainExporter] +public class RequestReplyBench +{ + private readonly StreamConfig _streamConfig = new("EVENTS", new[] { "events.>" }); + + private NatsConnection _nats; + private NatsJSContext _js; + + [GlobalSetup] + public async Task SetupAsync() + { + _nats = new NatsConnection(); + await _nats.ConnectAsync(); + _js = new NatsJSContext(_nats); + } + + [Benchmark] + public async Task RequestAsync() => + await _js.CreateStreamAsync(_streamConfig); +} From 7c05535fa7db0477b3475d4c13efc0657cbda246 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Mon, 25 Mar 2024 20:24:48 +0000 Subject: [PATCH 2/8] wip --- sandbox/MicroBenchmark/MicroBenchmark.csproj | 2 +- sandbox/MicroBenchmark/RequestReplyBench.cs | 51 +++++++++- .../Internal/NatsReadProtocolProcessor.cs | 54 ++++++++--- .../Internal/SubscriptionManager.cs | 7 +- .../NatsConnection.RequestReply.cs | 96 +++++++++++++++++++ src/NATS.Client.Core/NatsConnection.cs | 12 ++- src/NATS.Client.JetStream/NatsJSContext.cs | 95 ++++++++++++++++++ 7 files changed, 298 insertions(+), 19 deletions(-) diff --git a/sandbox/MicroBenchmark/MicroBenchmark.csproj b/sandbox/MicroBenchmark/MicroBenchmark.csproj index 859e55d7d..6ee79ebd5 100644 --- a/sandbox/MicroBenchmark/MicroBenchmark.csproj +++ b/sandbox/MicroBenchmark/MicroBenchmark.csproj @@ -13,7 +13,7 @@ - + diff --git a/sandbox/MicroBenchmark/RequestReplyBench.cs b/sandbox/MicroBenchmark/RequestReplyBench.cs index 9a88a38d9..3fa1b4749 100644 --- a/sandbox/MicroBenchmark/RequestReplyBench.cs +++ b/sandbox/MicroBenchmark/RequestReplyBench.cs @@ -1,4 +1,5 @@ using BenchmarkDotNet.Attributes; +using NATS.Client; using NATS.Client.Core; using NATS.Client.JetStream; using NATS.Client.JetStream.Models; @@ -12,10 +13,12 @@ namespace MicroBenchmark; [PlainExporter] public class RequestReplyBench { - private readonly StreamConfig _streamConfig = new("EVENTS", new[] { "events.>" }); + private readonly StreamConfig _streamConfig = new("benchreqrep", new[] { "benchreqrep.>" }); + private readonly byte[] _data = new byte[128]; private NatsConnection _nats; private NatsJSContext _js; + private IJetStream _jetStream; [GlobalSetup] public async Task SetupAsync() @@ -23,9 +26,51 @@ public async Task SetupAsync() _nats = new NatsConnection(); await _nats.ConnectAsync(); _js = new NatsJSContext(_nats); + + await Task.Delay(1_000); + try + { + await _js.DeleteStreamAsync("benchreqrep"); + } + catch + { + // ignored + } + + await Task.Delay(1_000); + await _js.CreateStreamAsync(_streamConfig); + + ConnectionFactory connectionFactory = new ConnectionFactory(); + Options opts = ConnectionFactory.GetDefaultOptions("localhost"); + var conn = connectionFactory.CreateConnection(opts); + _jetStream = conn.CreateJetStreamContext(); + } + + [GlobalCleanup] + public async Task CleanupAsync() + { + await Task.Delay(1_000); + try + { + await _js.DeleteStreamAsync("benchreqrep"); + } + catch + { + // ignored + } + + await Task.Delay(1_000); } [Benchmark] - public async Task RequestAsync() => - await _js.CreateStreamAsync(_streamConfig); + public async Task RequestAsync() => + await _js.PublishAsync("benchreqrep.x", _data); + + [Benchmark] + public async Task RequestAsync2() => + await _js.PublishAsync2("benchreqrep.x", _data); + + [Benchmark] + public async Task RequestAsyncV1() => + await _jetStream.PublishAsync(subject: "benchreqrep.x", data: _data); } diff --git a/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs b/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs index fd77311d6..711fd32a4 100644 --- a/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs +++ b/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs @@ -174,7 +174,7 @@ private async Task ReadLoopAsync() } var msgHeader = buffer.Slice(0, positionBeforePayload.Value); - var (subject, sid, payloadLength, replyTo) = ParseMessageHeader(msgHeader); + var (subject, sid, payloadLength, replyTo, responseId) = ParseMessageHeader(msgHeader); if (payloadLength == 0) { @@ -192,7 +192,7 @@ private async Task ReadLoopAsync() buffer = buffer.Slice(buffer.GetPosition(3, positionBeforePayload.Value)); } - await _connection.PublishToClientHandlersAsync(subject, replyTo, sid, null, ReadOnlySequence.Empty).ConfigureAwait(false); + await _connection.PublishToClientHandlersAsync(subject, replyTo, sid, null, ReadOnlySequence.Empty, responseId).ConfigureAwait(false); } else { @@ -213,7 +213,7 @@ private async Task ReadLoopAsync() buffer = buffer.Slice(buffer.GetPosition(2, payloadSlice.End)); // payload + \r\n - await _connection.PublishToClientHandlersAsync(subject, replyTo, sid, null, payloadSlice).ConfigureAwait(false); + await _connection.PublishToClientHandlersAsync(subject, replyTo, sid, null, payloadSlice, responseId).ConfigureAwait(false); } } else if (code == ServerOpCodes.HMsg) @@ -237,7 +237,7 @@ private async Task ReadLoopAsync() _logger.LogTrace(NatsLogEvents.Protocol, "HMSG trace dump: {MsgHeader}", msgHeader.Dump()); } - var (subject, sid, replyTo, headersLength, totalLength) = ParseHMessageHeader(msgHeader); + var (subject, sid, replyTo, headersLength, totalLength, responseId) = ParseHMessageHeader(msgHeader); if (_trace) { @@ -271,7 +271,7 @@ private async Task ReadLoopAsync() var headerSlice = totalSlice.Slice(0, headersLength); var payloadSlice = totalSlice.Slice(headersLength, payloadLength); - await _connection.PublishToClientHandlersAsync(subject, replyTo, sid, headerSlice, payloadSlice) + await _connection.PublishToClientHandlersAsync(subject, replyTo, sid, headerSlice, payloadSlice, responseId) .ConfigureAwait(false); } else @@ -427,7 +427,7 @@ private async ValueTask> DispatchCommandAsync(int code, R // https://docs.nats.io/reference/reference-protocols/nats-protocol#msg // MSG [reply-to] <#bytes>\r\n[payload] - private (string subject, int sid, int payloadLength, string? replyTo) ParseMessageHeader(ReadOnlySpan msgHeader) + private (string subject, int sid, int payloadLength, string? replyTo, long? responseId) ParseMessageHeader(ReadOnlySpan msgHeader) { msgHeader = msgHeader.Slice(4); msgHeader.Split(out var subjectBytes, out msgHeader); @@ -436,11 +436,25 @@ private async ValueTask> DispatchCommandAsync(int code, R var subject = Encoding.ASCII.GetString(subjectBytes); + // Parse: _INBOX.NUID.ID + long? responseId = null; + if (subjectBytes.StartsWith(_connection.InboxPrefixBytes.Span)) + { + var lastIndex = subjectBytes.LastIndexOf((byte)'.'); + if (lastIndex != -1) + { + if (Utf8Parser.TryParse(subjectBytes.Slice(lastIndex + 1), out long id, out _)) + { + responseId = id; + } + } + } + if (msgHeader.Length == 0) { var sid = GetInt32(sidBytes); var size = GetInt32(replyToOrSizeBytes); - return (subject, sid, size, null); + return (subject, sid, size, null, responseId); } else { @@ -450,11 +464,11 @@ private async ValueTask> DispatchCommandAsync(int code, R var sid = GetInt32(sidBytes); var payloadLength = GetInt32(bytesSlice); var replyTo = Encoding.ASCII.GetString(replyToBytes); - return (subject, sid, payloadLength, replyTo); + return (subject, sid, payloadLength, replyTo, responseId); } } - private (string subject, int sid, int payloadLength, string? replyTo) ParseMessageHeader(in ReadOnlySequence msgHeader) + private (string subject, int sid, int payloadLength, string? replyTo, long? responseId) ParseMessageHeader(in ReadOnlySequence msgHeader) { if (msgHeader.IsSingleSegment) { @@ -479,7 +493,7 @@ private async ValueTask> DispatchCommandAsync(int code, R // https://docs.nats.io/reference/reference-protocols/nats-protocol#hmsg // HMSG [reply-to] <#header bytes> <#total bytes>\r\n[headers]\r\n\r\n[payload]\r\n - private (string subject, int sid, string? replyTo, int headersLength, int totalLength) ParseHMessageHeader(ReadOnlySpan msgHeader) + private (string subject, int sid, string? replyTo, int headersLength, int totalLength, long? responseId) ParseHMessageHeader(ReadOnlySpan msgHeader) { // 'HMSG' literal msgHeader.Split(out _, out msgHeader); @@ -489,6 +503,20 @@ private async ValueTask> DispatchCommandAsync(int code, R msgHeader.Split(out var replyToOrHeaderLenBytes, out msgHeader); msgHeader.Split(out var headerLenOrTotalLenBytes, out msgHeader); + // Parse: _INBOX.NUID.ID + long? responseId = null; + if (subjectBytes.StartsWith(_connection.InboxPrefixBytes.Span)) + { + var lastIndex = subjectBytes.LastIndexOf((byte)'.'); + if (lastIndex != -1) + { + if (Utf8Parser.TryParse(subjectBytes.Slice(lastIndex + 1), out long id, out _)) + { + responseId = id; + } + } + } + var subject = Encoding.ASCII.GetString(subjectBytes); var sid = GetInt32(sidBytes); @@ -497,7 +525,7 @@ private async ValueTask> DispatchCommandAsync(int code, R { var headersLength = GetInt32(replyToOrHeaderLenBytes); var totalLen = GetInt32(headerLenOrTotalLenBytes); - return (subject, sid, null, headersLength, totalLen); + return (subject, sid, null, headersLength, totalLen, responseId); } // There is more data because of the reply-to field @@ -511,11 +539,11 @@ private async ValueTask> DispatchCommandAsync(int code, R var lastBytes = msgHeader; var totalLen = GetInt32(lastBytes); - return (subject, sid, replyTo, headerLen, totalLen); + return (subject, sid, replyTo, headerLen, totalLen, responseId); } } - private (string subject, int sid, string? replyTo, int headersLength, int totalLength) ParseHMessageHeader(in ReadOnlySequence msgHeader) + private (string subject, int sid, string? replyTo, int headersLength, int totalLength, long? responseId) ParseHMessageHeader(in ReadOnlySequence msgHeader) { if (msgHeader.IsSingleSegment) { diff --git a/src/NATS.Client.Core/Internal/SubscriptionManager.cs b/src/NATS.Client.Core/Internal/SubscriptionManager.cs index e363d0068..982328557 100644 --- a/src/NATS.Client.Core/Internal/SubscriptionManager.cs +++ b/src/NATS.Client.Core/Internal/SubscriptionManager.cs @@ -198,7 +198,7 @@ public ISubscriptionManager GetManagerFor(string subject) return this; } - private async ValueTask SubscribeInboxAsync(NatsSubBase sub, CancellationToken cancellationToken) + public async Task EnsureMuxInboxSubscribedAsync(CancellationToken cancellationToken) { if (Interlocked.CompareExchange(ref _inboxSub, _inboxSubSentinel, _inboxSubSentinel) == _inboxSubSentinel) { @@ -227,6 +227,11 @@ await SubscribeInternalAsync( _inboxSubLock.Release(); } } + } + + private async ValueTask SubscribeInboxAsync(NatsSubBase sub, CancellationToken cancellationToken) + { + await EnsureMuxInboxSubscribedAsync(cancellationToken).ConfigureAwait(false); await InboxSubBuilder.RegisterAsync(sub).ConfigureAwait(false); } diff --git a/src/NATS.Client.Core/NatsConnection.RequestReply.cs b/src/NATS.Client.Core/NatsConnection.RequestReply.cs index 76a3d1b0c..3c15627dc 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestReply.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestReply.cs @@ -1,3 +1,4 @@ +using System.Buffers; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; @@ -74,6 +75,101 @@ public async ValueTask> RequestAsync( throw new NatsNoReplyException(); } + // + // + public async ValueTask> RequestAsync2( + string subject, + TRequest? data, + NatsHeaders? headers = default, + INatsSerialize? requestSerializer = default, + INatsDeserialize? replySerializer = default, + NatsPubOpts? requestOpts = default, + NatsSubOpts? replyOpts = default, + CancellationToken cancellationToken = default) + { + if (Telemetry.HasListeners()) + { + using var activity = Telemetry.StartSendActivity($"{SpanDestinationName(subject)} {Telemetry.Constants.RequestReplyActivityName}", this, subject, null); + try + { + replyOpts = SetReplyOptsDefaults(replyOpts); + await using var sub1 = await RequestSubAsync(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken) + .ConfigureAwait(false); + + if (await sub1.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) + { + if (sub1.Msgs.TryRead(out var msg)) + { + return msg; + } + } + + throw new NatsNoReplyException(); + } + catch (Exception e) + { + Telemetry.SetException(activity, e); + throw; + } + } + + // RequestManager + await SubscriptionManager.EnsureMuxInboxSubscribedAsync(cancellationToken).ConfigureAwait(false); + var req = new ReqCmd(); + req.Tcs = new TaskCompletionSource>>(TaskCreationOptions.RunContinuationsAsynchronously); + var id = Interlocked.Increment(ref _requestId); + lock (_requests) + { + _requests.Add(id, req); + } + var replyTo = $"{InboxPrefix}.{id}"; + + await PublishAsync(subject, data, headers, replyTo, requestSerializer, requestOpts, cancellationToken).ConfigureAwait(false); + + var msgBytes = await req.Tcs.Task.ConfigureAwait(false); + using var memoryOwner = msgBytes.Data; + var replyData = (replySerializer ?? NatsDefaultSerializer.Default) + .Deserialize(new ReadOnlySequence(memoryOwner.Memory)); + return new NatsMsg( + msgBytes.Subject, + msgBytes.ReplyTo, + msgBytes.Size, + msgBytes.Headers, + replyData, + msgBytes.Connection); + } + + internal void SetRequestReply(string subject, string? replyTo, int sid, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer, long id) + { + ReqCmd req; + lock (_requests) + { + if (!_requests.Remove(id, out req)) + { + return; + } + } + + var natsMsg = NatsMsg>.Build( + subject, + replyTo, + headersBuffer, + payloadBuffer, + this, + HeaderParser, + NatsDefaultSerializer>.Default); + + req.Tcs.TrySetResult(natsMsg); + } + + long _requestId; + Dictionary _requests = new(); + + internal class ReqCmd + { + public TaskCompletionSource>> Tcs; + } + /// public async IAsyncEnumerable> RequestManyAsync( string subject, diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index ef7dad6b3..b65b2352d 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -1,5 +1,6 @@ using System.Buffers; using System.Diagnostics; +using System.Text; using System.Threading.Channels; using Microsoft.Extensions.Logging; using NATS.Client.Core.Commands; @@ -81,6 +82,7 @@ public NatsConnection(NatsOpts opts) Counter = new ConnectionStatsCounter(); CommandWriter = new CommandWriter(this, _pool, Opts, Counter, EnqueuePing); InboxPrefix = NewInbox(opts.InboxPrefix); + InboxPrefixBytes = Encoding.ASCII.GetBytes(InboxPrefix); SubscriptionManager = new SubscriptionManager(this, InboxPrefix); _logger = opts.LoggerFactory.CreateLogger(); _clientOpts = ClientOpts.Create(Opts); @@ -122,6 +124,8 @@ public NatsConnectionState ConnectionState public INatsServerInfo? ServerInfo => WritableServerInfo; // server info is set when received INFO + internal readonly ReadOnlyMemory InboxPrefixBytes; + internal NatsHeaderParser HeaderParser { get; } internal SubscriptionManager SubscriptionManager { get; } @@ -208,8 +212,14 @@ internal string SpanDestinationName(string subject) internal NatsStats GetStats() => Counter.ToStats(); - internal ValueTask PublishToClientHandlersAsync(string subject, string? replyTo, int sid, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer) + internal ValueTask PublishToClientHandlersAsync(string subject, string? replyTo, int sid, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer, long? responseId) { + if (responseId is { } id) + { + SetRequestReply(subject, replyTo, sid, headersBuffer, payloadBuffer, id); + return default; + } + return SubscriptionManager.PublishToClientHandlersAsync(subject, replyTo, sid, headersBuffer, payloadBuffer); } diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index f16c71166..09ce9563b 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -173,6 +173,101 @@ public async ValueTask PublishAsync( throw new NatsJSPublishNoResponseException(); } + public async ValueTask PublishAsync2( + string subject, + T? data, + INatsSerialize? serializer = default, + NatsJSPubOpts? opts = default, + NatsHeaders? headers = default, + CancellationToken cancellationToken = default) + { + if (opts != null) + { + if (opts.MsgId != null) + { + headers ??= new NatsHeaders(); + headers["Nats-Msg-Id"] = opts.MsgId; + } + + if (opts.ExpectedLastMsgId != null) + { + headers ??= new NatsHeaders(); + headers["Nats-Expected-Last-Msg-Id"] = opts.ExpectedLastMsgId; + } + + if (opts.ExpectedStream != null) + { + headers ??= new NatsHeaders(); + headers["Nats-Expected-Stream"] = opts.ExpectedStream; + } + + if (opts.ExpectedLastSequence != null) + { + headers ??= new NatsHeaders(); + headers["Nats-Expected-Last-Sequence"] = opts.ExpectedLastSequence.ToString(); + } + + if (opts.ExpectedLastSubjectSequence != null) + { + headers ??= new NatsHeaders(); + headers["Nats-Expected-Last-Subject-Sequence"] = opts.ExpectedLastSubjectSequence.ToString(); + } + } + + opts ??= NatsJSPubOpts.Default; + var retryMax = opts.RetryAttempts; + var retryWait = opts.RetryWaitBetweenAttempts; + + for (var i = 0; i < retryMax; i++) + { + try + { + var msg = await Connection.RequestAsync2( + subject: subject, + data: data, + headers: headers, + requestSerializer: serializer, + replySerializer: NatsJSJsonSerializer.Default, + requestOpts: opts, + replyOpts: new NatsSubOpts + { + // It's important to set the timeout here so that the subscription can be + // stopped if the server doesn't respond or more likely case is that if there + // is a reconnect to the cluster between the request and waiting for a response, + // without the timeout the publish call will hang forever since the server + // which received the request won't be there to respond anymore. + Timeout = Connection.Opts.RequestTimeout, + + // If JetStream is disabled, a no responders error will be returned + // No responders error might also happen when reconnecting to cluster + ThrowIfNoResponders = true, + }, + cancellationToken) + .ConfigureAwait(false); + + if (msg.Data == null) + { + throw new NatsJSException("No response data received"); + } + + return msg.Data; + } + catch (NatsNoRespondersException) + { + } + + if (i < retryMax) + { + _logger.LogDebug(NatsJSLogEvents.PublishNoResponseRetry, "No response received, retrying {RetryCount}/{RetryMax}", i + 1, retryMax); + await Task.Delay(retryWait, cancellationToken); + } + } + + // We throw a specific exception here for convenience so that the caller doesn't + // have to check for the exception message etc. + throw new NatsJSPublishNoResponseException(); + } + internal static void ThrowIfInvalidStreamName([NotNull] string? name, [CallerArgumentExpression("name")] string? paramName = null) { ArgumentNullException.ThrowIfNull(name, paramName); From 624c72849e988c675548840d98e906f3b41b2de0 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Wed, 27 Mar 2024 08:50:57 +0000 Subject: [PATCH 3/8] wip --- sandbox/MicroBenchmark/RequestReplyBench.cs | 73 +++++++--- src/NATS.Client.Core/Commands/PingCommand.cs | 58 ++++++++ .../Internal/DebuggingExtensions.cs | 2 +- .../Internal/RequestManager.cs | 70 ++++++++++ .../NatsConnection.RequestReply.cs | 131 +++++++++--------- src/NATS.Client.Core/NatsConnection.cs | 5 +- .../NatsJSContext.Streams.cs | 12 ++ src/NATS.Client.JetStream/NatsJSContext.cs | 131 +++++++----------- 8 files changed, 307 insertions(+), 175 deletions(-) create mode 100644 src/NATS.Client.Core/Internal/RequestManager.cs diff --git a/sandbox/MicroBenchmark/RequestReplyBench.cs b/sandbox/MicroBenchmark/RequestReplyBench.cs index 3fa1b4749..d48f794f0 100644 --- a/sandbox/MicroBenchmark/RequestReplyBench.cs +++ b/sandbox/MicroBenchmark/RequestReplyBench.cs @@ -3,6 +3,7 @@ using NATS.Client.Core; using NATS.Client.JetStream; using NATS.Client.JetStream.Models; +using StreamInfo = NATS.Client.JetStream.StreamInfo; #pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable. @@ -14,11 +15,20 @@ namespace MicroBenchmark; public class RequestReplyBench { private readonly StreamConfig _streamConfig = new("benchreqrep", new[] { "benchreqrep.>" }); + private readonly StreamConfig _streamConfig2 = new("benchreqrepV2", new[] { "benchreqrepV2.>" }) + { + NumReplicas = 1, + Discard = StreamConfigDiscard.Old, + DuplicateWindow = TimeSpan.Zero, + }; + private readonly byte[] _data = new byte[128]; private NatsConnection _nats; private NatsJSContext _js; private IJetStream _jetStream; + private IJetStreamManagement _jetStreamManagement; + private StreamConfiguration _streamConfiguration; [GlobalSetup] public async Task SetupAsync() @@ -27,50 +37,67 @@ public async Task SetupAsync() await _nats.ConnectAsync(); _js = new NatsJSContext(_nats); - await Task.Delay(1_000); - try - { - await _js.DeleteStreamAsync("benchreqrep"); - } - catch - { - // ignored - } + // await CleanupAsync(); - await Task.Delay(1_000); await _js.CreateStreamAsync(_streamConfig); ConnectionFactory connectionFactory = new ConnectionFactory(); Options opts = ConnectionFactory.GetDefaultOptions("localhost"); var conn = connectionFactory.CreateConnection(opts); _jetStream = conn.CreateJetStreamContext(); + _jetStreamManagement = conn.CreateJetStreamManagementContext(); + _streamConfiguration = StreamConfiguration.Builder() + .WithName("benchreqrepV1") + .WithStorageType(StorageType.File) + .WithSubjects("benchreqrepv1.>") + .Build(); } - [GlobalCleanup] + // [GlobalCleanup] public async Task CleanupAsync() { await Task.Delay(1_000); - try - { - await _js.DeleteStreamAsync("benchreqrep"); - } - catch - { - // ignored - } - + await DeleteStreamAsync("benchreqrep"); + await DeleteStreamAsync("benchreqrepV1"); + await DeleteStreamAsync("benchreqrepV2"); await Task.Delay(1_000); } [Benchmark] - public async Task RequestAsync() => + public async Task JSPublishAsync() => await _js.PublishAsync("benchreqrep.x", _data); [Benchmark] - public async Task RequestAsync2() => + public async Task JSPublishAsync2() => await _js.PublishAsync2("benchreqrep.x", _data); + // + // [Benchmark] + // public async Task CreateStreamAsync() => + // await _js.CreateStreamAsync(_streamConfig2); + + [Benchmark] - public async Task RequestAsyncV1() => + public async Task JSPublishAsyncV1() => await _jetStream.PublishAsync(subject: "benchreqrep.x", data: _data); + // + // [Benchmark] + // public StreamInfo? CreateStreamV1() => + // _jetStreamManagement.AddStream(_streamConfiguration); + // + // [Benchmark] + // public async Task CreateStreamAsync2() => + // await _js.CreateStreamAsync2(_streamConfig2); + + private async Task DeleteStreamAsync(string name) + { + try + { + await _js.DeleteStreamAsync(stream: name); + } + catch + { + // ignored + } + } } diff --git a/src/NATS.Client.Core/Commands/PingCommand.cs b/src/NATS.Client.Core/Commands/PingCommand.cs index a8c6236d4..d22cfacf2 100644 --- a/src/NATS.Client.Core/Commands/PingCommand.cs +++ b/src/NATS.Client.Core/Commands/PingCommand.cs @@ -54,3 +54,61 @@ public TimeSpan GetResult(short token) public void OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _core.OnCompleted(continuation, state, token, flags); } + +internal class RequestCommand : IValueTaskSource>>, IObjectPoolNode +{ + private readonly ObjectPool? _pool; + private ManualResetValueTaskSourceCore>> _core; + private RequestCommand? _next; + + public RequestCommand(ObjectPool? pool) + { + _pool = pool; + _core = new ManualResetValueTaskSourceCore>> + { + RunContinuationsAsynchronously = true, + }; + } + + public static RequestCommand Rent(ObjectPool pool) + { + RequestCommand result; + if (!pool.TryRent(out result!)) + { + result = new RequestCommand(pool); + } + + return result; + } + + public ref RequestCommand? NextNode => ref _next; + + public void SetResult(NatsMsg> msg) => _core.SetResult(msg); + + public void SetCanceled() => _core.SetException(new OperationCanceledException()); + + public void Reset() + { + _core.Reset(); + } + + public ValueTask>> RunAsync() => new(this, _core.Version); + + public NatsMsg> GetResult(short token) + { + var result = _core.GetResult(token); + + if (_pool is not null) + { + Reset(); + _pool.Return(this); + } + + return result; + } + + public ValueTaskSourceStatus GetStatus(short token) => _core.GetStatus(token); + + public void OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) + => _core.OnCompleted(continuation, state, token, flags); +} diff --git a/src/NATS.Client.Core/Internal/DebuggingExtensions.cs b/src/NATS.Client.Core/Internal/DebuggingExtensions.cs index efdcc987d..ca6750523 100644 --- a/src/NATS.Client.Core/Internal/DebuggingExtensions.cs +++ b/src/NATS.Client.Core/Internal/DebuggingExtensions.cs @@ -3,7 +3,7 @@ namespace NATS.Client.Core.Internal; -internal static class DebuggingExtensions +public static class DebuggingExtensions { public static string Dump(this ReadOnlySequence buffer) { diff --git a/src/NATS.Client.Core/Internal/RequestManager.cs b/src/NATS.Client.Core/Internal/RequestManager.cs new file mode 100644 index 000000000..5cae1671e --- /dev/null +++ b/src/NATS.Client.Core/Internal/RequestManager.cs @@ -0,0 +1,70 @@ +using System.Buffers; +using NATS.Client.Core.Commands; + +namespace NATS.Client.Core.Internal; + +public class RequestManager +{ + private readonly NatsConnection _connection; + private readonly string _inboxPrefix; + + long _requestId; + private readonly Dictionary _requests = new(); + + // internal class ReqCmd + // { + // public TaskCompletionSource>> Tcs; + // } + + public RequestManager(NatsConnection connection, string inboxPrefix) + { + _connection = connection; + _inboxPrefix = inboxPrefix; + } + + internal async Task<(RequestCommand, string)> NewRequestAsync(CancellationToken cancellationToken) + { + // RequestManager + await _connection.SubscriptionManager.EnsureMuxInboxSubscribedAsync(cancellationToken).ConfigureAwait(false); + var req = RequestCommand.Rent(_connection.ObjectPool); + // req.Tcs = new TaskCompletionSource>>(TaskCreationOptions.RunContinuationsAsynchronously); + // var tcs = new TaskCompletionSource>>(TaskCreationOptions.RunContinuationsAsynchronously); + + var id = Interlocked.Increment(ref _requestId); + lock (_requests) + { + // _requests.Add(id, req); + _requests.Add(id, req); + } + + var replyTo = NatsConnection.NewInbox(_inboxPrefix, id); // $"{InboxPrefix}.{id}"; + + return (req, replyTo); + } + + internal void SetRequestReply(string subject, string? replyTo, int sid, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer, long id) + { + // ReqCmd req; + RequestCommand tcs; + lock (_requests) + { + // if (!_requests.Remove(id, out req)) + if (!_requests.Remove(id, out tcs)) + { + return; + } + } + + var natsMsg = NatsMsg>.Build( + subject, + replyTo, + headersBuffer, + payloadBuffer, + _connection, + _connection.HeaderParser, + NatsDefaultSerializer>.Default); + + // req.Tcs.TrySetResult(natsMsg); + tcs.SetResult(natsMsg); + } +} diff --git a/src/NATS.Client.Core/NatsConnection.RequestReply.cs b/src/NATS.Client.Core/NatsConnection.RequestReply.cs index 3c15627dc..4d25ae425 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestReply.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestReply.cs @@ -1,7 +1,9 @@ using System.Buffers; +using System.Buffers.Text; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; using NATS.Client.Core.Internal; namespace NATS.Client.Core; @@ -87,89 +89,33 @@ public async ValueTask> RequestAsync2( NatsSubOpts? replyOpts = default, CancellationToken cancellationToken = default) { - if (Telemetry.HasListeners()) - { - using var activity = Telemetry.StartSendActivity($"{SpanDestinationName(subject)} {Telemetry.Constants.RequestReplyActivityName}", this, subject, null); - try - { - replyOpts = SetReplyOptsDefaults(replyOpts); - await using var sub1 = await RequestSubAsync(subject, data, headers, requestSerializer, replySerializer, requestOpts, replyOpts, cancellationToken) - .ConfigureAwait(false); + var (tcs, replyTo) = await RequestManager.NewRequestAsync(cancellationToken).ConfigureAwait(false); - if (await sub1.Msgs.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) - { - if (sub1.Msgs.TryRead(out var msg)) - { - return msg; - } - } + await PublishAsync(subject, data, headers, replyTo, requestSerializer, requestOpts, cancellationToken).ConfigureAwait(false); - throw new NatsNoReplyException(); - } - catch (Exception e) - { - Telemetry.SetException(activity, e); - throw; - } - } + // var msgBytes = await req.Tcs.Task.ConfigureAwait(false); - // RequestManager - await SubscriptionManager.EnsureMuxInboxSubscribedAsync(cancellationToken).ConfigureAwait(false); - var req = new ReqCmd(); - req.Tcs = new TaskCompletionSource>>(TaskCreationOptions.RunContinuationsAsynchronously); - var id = Interlocked.Increment(ref _requestId); - lock (_requests) + var msgBytes = await tcs.RunAsync().ConfigureAwait(false); + + if (msgBytes.Headers?.Code == 503) { - _requests.Add(id, req); + throw new NatsNoRespondersException(); } - var replyTo = $"{InboxPrefix}.{id}"; - - await PublishAsync(subject, data, headers, replyTo, requestSerializer, requestOpts, cancellationToken).ConfigureAwait(false); - var msgBytes = await req.Tcs.Task.ConfigureAwait(false); using var memoryOwner = msgBytes.Data; - var replyData = (replySerializer ?? NatsDefaultSerializer.Default) - .Deserialize(new ReadOnlySequence(memoryOwner.Memory)); + + // var replyData = (replySerializer ?? NatsDefaultSerializer.Default) + // .Deserialize(new ReadOnlySequence(memoryOwner.Memory)); + return new NatsMsg( msgBytes.Subject, msgBytes.ReplyTo, msgBytes.Size, msgBytes.Headers, - replyData, + default, // replyData, msgBytes.Connection); } - internal void SetRequestReply(string subject, string? replyTo, int sid, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer, long id) - { - ReqCmd req; - lock (_requests) - { - if (!_requests.Remove(id, out req)) - { - return; - } - } - - var natsMsg = NatsMsg>.Build( - subject, - replyTo, - headersBuffer, - payloadBuffer, - this, - HeaderParser, - NatsDefaultSerializer>.Default); - - req.Tcs.TrySetResult(natsMsg); - } - - long _requestId; - Dictionary _requests = new(); - - internal class ReqCmd - { - public TaskCompletionSource>> Tcs; - } - /// public async IAsyncEnumerable> RequestManyAsync( string subject, @@ -230,6 +176,55 @@ string Throw() } } + [SkipLocalsInit] + internal static string NewInbox(ReadOnlySpan prefix, long id) + { + Span buffer = stackalloc char[64]; + Span idBuffer = stackalloc byte[32]; + + if (!Utf8Formatter.TryFormat(id, idBuffer, out var idLength)) + { + return Throw(); + } + + var separatorLength = prefix.Length > 0 ? 1u : 0u; + var totalLength = (uint)prefix.Length + (uint)idLength + separatorLength; + if (totalLength <= buffer.Length) + { + buffer = buffer.Slice(0, (int)totalLength); + } + else + { + buffer = new char[totalLength]; + } + + var totalPrefixLength = (uint)prefix.Length + separatorLength; + if ((uint)buffer.Length > totalPrefixLength && (uint)buffer.Length > (uint)prefix.Length) + { + prefix.CopyTo(buffer); + buffer[prefix.Length] = '.'; + var remaining = buffer.Slice((int)totalPrefixLength); + + var bs = idBuffer.Slice(0, idLength); + for (var index = 0; index < bs.Length; index++) + { + var b = bs[index]; + remaining[index] = (char)b; + } + + return new string(buffer); + } + + return Throw(); + + [DoesNotReturn] + string Throw() + { + Debug.Fail("Must not happen"); + throw new InvalidOperationException("This should never be raised!"); + } + } + private NatsSubOpts SetReplyOptsDefaults(NatsSubOpts? replyOpts) { var opts = replyOpts ?? ReplyOptsDefault; diff --git a/src/NATS.Client.Core/NatsConnection.cs b/src/NATS.Client.Core/NatsConnection.cs index b65b2352d..9e0fb5aea 100644 --- a/src/NATS.Client.Core/NatsConnection.cs +++ b/src/NATS.Client.Core/NatsConnection.cs @@ -84,6 +84,7 @@ public NatsConnection(NatsOpts opts) InboxPrefix = NewInbox(opts.InboxPrefix); InboxPrefixBytes = Encoding.ASCII.GetBytes(InboxPrefix); SubscriptionManager = new SubscriptionManager(this, InboxPrefix); + RequestManager = new RequestManager(this, InboxPrefix); _logger = opts.LoggerFactory.CreateLogger(); _clientOpts = ClientOpts.Create(Opts); HeaderParser = new NatsHeaderParser(opts.HeaderEncoding); @@ -130,6 +131,8 @@ public NatsConnectionState ConnectionState internal SubscriptionManager SubscriptionManager { get; } + internal RequestManager RequestManager { get; } + internal CommandWriter CommandWriter { get; } internal string InboxPrefix { get; } @@ -216,7 +219,7 @@ internal ValueTask PublishToClientHandlersAsync(string subject, string? replyTo, { if (responseId is { } id) { - SetRequestReply(subject, replyTo, sid, headersBuffer, payloadBuffer, id); + RequestManager.SetRequestReply(subject, replyTo, sid, headersBuffer, payloadBuffer, id); return default; } diff --git a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs index 53d61cbdd..ae09980fe 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs @@ -27,6 +27,18 @@ public async ValueTask CreateStreamAsync( return new NatsJSStream(this, response); } + public async ValueTask CreateStreamAsync2( + StreamConfig config, + CancellationToken cancellationToken = default) + { + ThrowIfInvalidStreamName(config.Name, nameof(config.Name)); + var response = await JSRequestResponseAsync2( + subject: $"{Opts.Prefix}.STREAM.CREATE.{config.Name}", + config, + cancellationToken); + return null;//new NatsJSStream(this, response); + } + /// /// Deletes a stream. /// diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index 09ce9563b..43a19c7a2 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -181,91 +181,20 @@ public async ValueTask PublishAsync2( NatsHeaders? headers = default, CancellationToken cancellationToken = default) { - if (opts != null) - { - if (opts.MsgId != null) - { - headers ??= new NatsHeaders(); - headers["Nats-Msg-Id"] = opts.MsgId; - } - - if (opts.ExpectedLastMsgId != null) - { - headers ??= new NatsHeaders(); - headers["Nats-Expected-Last-Msg-Id"] = opts.ExpectedLastMsgId; - } - - if (opts.ExpectedStream != null) - { - headers ??= new NatsHeaders(); - headers["Nats-Expected-Stream"] = opts.ExpectedStream; - } - - if (opts.ExpectedLastSequence != null) - { - headers ??= new NatsHeaders(); - headers["Nats-Expected-Last-Sequence"] = opts.ExpectedLastSequence.ToString(); - } - - if (opts.ExpectedLastSubjectSequence != null) - { - headers ??= new NatsHeaders(); - headers["Nats-Expected-Last-Subject-Sequence"] = opts.ExpectedLastSubjectSequence.ToString(); - } - } - opts ??= NatsJSPubOpts.Default; - var retryMax = opts.RetryAttempts; - var retryWait = opts.RetryWaitBetweenAttempts; - - for (var i = 0; i < retryMax; i++) - { - try - { - var msg = await Connection.RequestAsync2( - subject: subject, - data: data, - headers: headers, - requestSerializer: serializer, - replySerializer: NatsJSJsonSerializer.Default, - requestOpts: opts, - replyOpts: new NatsSubOpts - { - // It's important to set the timeout here so that the subscription can be - // stopped if the server doesn't respond or more likely case is that if there - // is a reconnect to the cluster between the request and waiting for a response, - // without the timeout the publish call will hang forever since the server - // which received the request won't be there to respond anymore. - Timeout = Connection.Opts.RequestTimeout, - - // If JetStream is disabled, a no responders error will be returned - // No responders error might also happen when reconnecting to cluster - ThrowIfNoResponders = true, - }, - cancellationToken) - .ConfigureAwait(false); - - if (msg.Data == null) - { - throw new NatsJSException("No response data received"); - } - - return msg.Data; - } - catch (NatsNoRespondersException) - { - } - if (i < retryMax) - { - _logger.LogDebug(NatsJSLogEvents.PublishNoResponseRetry, "No response received, retrying {RetryCount}/{RetryMax}", i + 1, retryMax); - await Task.Delay(retryWait, cancellationToken); - } - } + await Connection.RequestAsync2( + subject: subject, + data: data, + headers: headers, + requestSerializer: serializer, + replySerializer: NatsJSJsonSerializer.Default, + requestOpts: opts, + replyOpts: default, + cancellationToken) + .ConfigureAwait(false); - // We throw a specific exception here for convenience so that the caller doesn't - // have to check for the exception message etc. - throw new NatsJSPublishNoResponseException(); + return default; } internal static void ThrowIfInvalidStreamName([NotNull] string? name, [CallerArgumentExpression("name")] string? paramName = null) @@ -298,6 +227,18 @@ internal async ValueTask JSRequestResponseAsync( return response.Response!; } + internal async ValueTask JSRequestResponseAsync2( + string subject, + TRequest? request, + CancellationToken cancellationToken = default) + where TRequest : class + where TResponse : class + { + var response = await JSRequestAsync2(subject, request, cancellationToken); + response.EnsureSuccess(); + return response.Response!; + } + internal async ValueTask> JSRequestAsync( string subject, TRequest? request, @@ -352,6 +293,32 @@ internal async ValueTask> JSRequestAsync> JSRequestAsync2( + string subject, + TRequest? request, + CancellationToken cancellationToken = default) + where TRequest : class + where TResponse : class + { + if (request != null) + { + // TODO: Can't validate using JSON serializer context at the moment. + // Validator.ValidateObject(request, new ValidationContext(request)); + } + + var msg = await Connection.RequestAsync2( + subject: subject, + data: request, + headers: default, + replyOpts: default, // new NatsSubOpts { Timeout = Connection.Opts.RequestTimeout }, + requestSerializer: NatsJSJsonSerializer.Default, + replySerializer: NatsJSErrorAwareJsonSerializer.Default, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + + return new NatsJSResponse(msg.Data, default); + } + [DoesNotReturn] private static void ThrowInvalidStreamNameException(string? paramName) => throw new ArgumentException("Stream name cannot contain ' ', '.'", paramName); From ca4f240b39be7a1342f02cd38928d97b375806aa Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sun, 5 May 2024 16:59:10 +0100 Subject: [PATCH 4/8] Revert experimental code --- src/NATS.Client.Core/Commands/PingCommand.cs | 58 ------------ .../Internal/DebuggingExtensions.cs | 2 +- .../Internal/NatsReadProtocolProcessor.cs | 54 +++-------- .../Internal/SubscriptionManager.cs | 7 +- .../NatsConnection.RequestReply.cs | 91 ------------------- .../NatsJSContext.Streams.cs | 12 --- src/NATS.Client.JetStream/NatsJSContext.cs | 62 ------------- 7 files changed, 15 insertions(+), 271 deletions(-) diff --git a/src/NATS.Client.Core/Commands/PingCommand.cs b/src/NATS.Client.Core/Commands/PingCommand.cs index d22cfacf2..a8c6236d4 100644 --- a/src/NATS.Client.Core/Commands/PingCommand.cs +++ b/src/NATS.Client.Core/Commands/PingCommand.cs @@ -54,61 +54,3 @@ public TimeSpan GetResult(short token) public void OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _core.OnCompleted(continuation, state, token, flags); } - -internal class RequestCommand : IValueTaskSource>>, IObjectPoolNode -{ - private readonly ObjectPool? _pool; - private ManualResetValueTaskSourceCore>> _core; - private RequestCommand? _next; - - public RequestCommand(ObjectPool? pool) - { - _pool = pool; - _core = new ManualResetValueTaskSourceCore>> - { - RunContinuationsAsynchronously = true, - }; - } - - public static RequestCommand Rent(ObjectPool pool) - { - RequestCommand result; - if (!pool.TryRent(out result!)) - { - result = new RequestCommand(pool); - } - - return result; - } - - public ref RequestCommand? NextNode => ref _next; - - public void SetResult(NatsMsg> msg) => _core.SetResult(msg); - - public void SetCanceled() => _core.SetException(new OperationCanceledException()); - - public void Reset() - { - _core.Reset(); - } - - public ValueTask>> RunAsync() => new(this, _core.Version); - - public NatsMsg> GetResult(short token) - { - var result = _core.GetResult(token); - - if (_pool is not null) - { - Reset(); - _pool.Return(this); - } - - return result; - } - - public ValueTaskSourceStatus GetStatus(short token) => _core.GetStatus(token); - - public void OnCompleted(Action continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) - => _core.OnCompleted(continuation, state, token, flags); -} diff --git a/src/NATS.Client.Core/Internal/DebuggingExtensions.cs b/src/NATS.Client.Core/Internal/DebuggingExtensions.cs index ca6750523..efdcc987d 100644 --- a/src/NATS.Client.Core/Internal/DebuggingExtensions.cs +++ b/src/NATS.Client.Core/Internal/DebuggingExtensions.cs @@ -3,7 +3,7 @@ namespace NATS.Client.Core.Internal; -public static class DebuggingExtensions +internal static class DebuggingExtensions { public static string Dump(this ReadOnlySequence buffer) { diff --git a/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs b/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs index 711fd32a4..fd77311d6 100644 --- a/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs +++ b/src/NATS.Client.Core/Internal/NatsReadProtocolProcessor.cs @@ -174,7 +174,7 @@ private async Task ReadLoopAsync() } var msgHeader = buffer.Slice(0, positionBeforePayload.Value); - var (subject, sid, payloadLength, replyTo, responseId) = ParseMessageHeader(msgHeader); + var (subject, sid, payloadLength, replyTo) = ParseMessageHeader(msgHeader); if (payloadLength == 0) { @@ -192,7 +192,7 @@ private async Task ReadLoopAsync() buffer = buffer.Slice(buffer.GetPosition(3, positionBeforePayload.Value)); } - await _connection.PublishToClientHandlersAsync(subject, replyTo, sid, null, ReadOnlySequence.Empty, responseId).ConfigureAwait(false); + await _connection.PublishToClientHandlersAsync(subject, replyTo, sid, null, ReadOnlySequence.Empty).ConfigureAwait(false); } else { @@ -213,7 +213,7 @@ private async Task ReadLoopAsync() buffer = buffer.Slice(buffer.GetPosition(2, payloadSlice.End)); // payload + \r\n - await _connection.PublishToClientHandlersAsync(subject, replyTo, sid, null, payloadSlice, responseId).ConfigureAwait(false); + await _connection.PublishToClientHandlersAsync(subject, replyTo, sid, null, payloadSlice).ConfigureAwait(false); } } else if (code == ServerOpCodes.HMsg) @@ -237,7 +237,7 @@ private async Task ReadLoopAsync() _logger.LogTrace(NatsLogEvents.Protocol, "HMSG trace dump: {MsgHeader}", msgHeader.Dump()); } - var (subject, sid, replyTo, headersLength, totalLength, responseId) = ParseHMessageHeader(msgHeader); + var (subject, sid, replyTo, headersLength, totalLength) = ParseHMessageHeader(msgHeader); if (_trace) { @@ -271,7 +271,7 @@ private async Task ReadLoopAsync() var headerSlice = totalSlice.Slice(0, headersLength); var payloadSlice = totalSlice.Slice(headersLength, payloadLength); - await _connection.PublishToClientHandlersAsync(subject, replyTo, sid, headerSlice, payloadSlice, responseId) + await _connection.PublishToClientHandlersAsync(subject, replyTo, sid, headerSlice, payloadSlice) .ConfigureAwait(false); } else @@ -427,7 +427,7 @@ private async ValueTask> DispatchCommandAsync(int code, R // https://docs.nats.io/reference/reference-protocols/nats-protocol#msg // MSG [reply-to] <#bytes>\r\n[payload] - private (string subject, int sid, int payloadLength, string? replyTo, long? responseId) ParseMessageHeader(ReadOnlySpan msgHeader) + private (string subject, int sid, int payloadLength, string? replyTo) ParseMessageHeader(ReadOnlySpan msgHeader) { msgHeader = msgHeader.Slice(4); msgHeader.Split(out var subjectBytes, out msgHeader); @@ -436,25 +436,11 @@ private async ValueTask> DispatchCommandAsync(int code, R var subject = Encoding.ASCII.GetString(subjectBytes); - // Parse: _INBOX.NUID.ID - long? responseId = null; - if (subjectBytes.StartsWith(_connection.InboxPrefixBytes.Span)) - { - var lastIndex = subjectBytes.LastIndexOf((byte)'.'); - if (lastIndex != -1) - { - if (Utf8Parser.TryParse(subjectBytes.Slice(lastIndex + 1), out long id, out _)) - { - responseId = id; - } - } - } - if (msgHeader.Length == 0) { var sid = GetInt32(sidBytes); var size = GetInt32(replyToOrSizeBytes); - return (subject, sid, size, null, responseId); + return (subject, sid, size, null); } else { @@ -464,11 +450,11 @@ private async ValueTask> DispatchCommandAsync(int code, R var sid = GetInt32(sidBytes); var payloadLength = GetInt32(bytesSlice); var replyTo = Encoding.ASCII.GetString(replyToBytes); - return (subject, sid, payloadLength, replyTo, responseId); + return (subject, sid, payloadLength, replyTo); } } - private (string subject, int sid, int payloadLength, string? replyTo, long? responseId) ParseMessageHeader(in ReadOnlySequence msgHeader) + private (string subject, int sid, int payloadLength, string? replyTo) ParseMessageHeader(in ReadOnlySequence msgHeader) { if (msgHeader.IsSingleSegment) { @@ -493,7 +479,7 @@ private async ValueTask> DispatchCommandAsync(int code, R // https://docs.nats.io/reference/reference-protocols/nats-protocol#hmsg // HMSG [reply-to] <#header bytes> <#total bytes>\r\n[headers]\r\n\r\n[payload]\r\n - private (string subject, int sid, string? replyTo, int headersLength, int totalLength, long? responseId) ParseHMessageHeader(ReadOnlySpan msgHeader) + private (string subject, int sid, string? replyTo, int headersLength, int totalLength) ParseHMessageHeader(ReadOnlySpan msgHeader) { // 'HMSG' literal msgHeader.Split(out _, out msgHeader); @@ -503,20 +489,6 @@ private async ValueTask> DispatchCommandAsync(int code, R msgHeader.Split(out var replyToOrHeaderLenBytes, out msgHeader); msgHeader.Split(out var headerLenOrTotalLenBytes, out msgHeader); - // Parse: _INBOX.NUID.ID - long? responseId = null; - if (subjectBytes.StartsWith(_connection.InboxPrefixBytes.Span)) - { - var lastIndex = subjectBytes.LastIndexOf((byte)'.'); - if (lastIndex != -1) - { - if (Utf8Parser.TryParse(subjectBytes.Slice(lastIndex + 1), out long id, out _)) - { - responseId = id; - } - } - } - var subject = Encoding.ASCII.GetString(subjectBytes); var sid = GetInt32(sidBytes); @@ -525,7 +497,7 @@ private async ValueTask> DispatchCommandAsync(int code, R { var headersLength = GetInt32(replyToOrHeaderLenBytes); var totalLen = GetInt32(headerLenOrTotalLenBytes); - return (subject, sid, null, headersLength, totalLen, responseId); + return (subject, sid, null, headersLength, totalLen); } // There is more data because of the reply-to field @@ -539,11 +511,11 @@ private async ValueTask> DispatchCommandAsync(int code, R var lastBytes = msgHeader; var totalLen = GetInt32(lastBytes); - return (subject, sid, replyTo, headerLen, totalLen, responseId); + return (subject, sid, replyTo, headerLen, totalLen); } } - private (string subject, int sid, string? replyTo, int headersLength, int totalLength, long? responseId) ParseHMessageHeader(in ReadOnlySequence msgHeader) + private (string subject, int sid, string? replyTo, int headersLength, int totalLength) ParseHMessageHeader(in ReadOnlySequence msgHeader) { if (msgHeader.IsSingleSegment) { diff --git a/src/NATS.Client.Core/Internal/SubscriptionManager.cs b/src/NATS.Client.Core/Internal/SubscriptionManager.cs index 982328557..e363d0068 100644 --- a/src/NATS.Client.Core/Internal/SubscriptionManager.cs +++ b/src/NATS.Client.Core/Internal/SubscriptionManager.cs @@ -198,7 +198,7 @@ public ISubscriptionManager GetManagerFor(string subject) return this; } - public async Task EnsureMuxInboxSubscribedAsync(CancellationToken cancellationToken) + private async ValueTask SubscribeInboxAsync(NatsSubBase sub, CancellationToken cancellationToken) { if (Interlocked.CompareExchange(ref _inboxSub, _inboxSubSentinel, _inboxSubSentinel) == _inboxSubSentinel) { @@ -227,11 +227,6 @@ await SubscribeInternalAsync( _inboxSubLock.Release(); } } - } - - private async ValueTask SubscribeInboxAsync(NatsSubBase sub, CancellationToken cancellationToken) - { - await EnsureMuxInboxSubscribedAsync(cancellationToken).ConfigureAwait(false); await InboxSubBuilder.RegisterAsync(sub).ConfigureAwait(false); } diff --git a/src/NATS.Client.Core/NatsConnection.RequestReply.cs b/src/NATS.Client.Core/NatsConnection.RequestReply.cs index 4d25ae425..76a3d1b0c 100644 --- a/src/NATS.Client.Core/NatsConnection.RequestReply.cs +++ b/src/NATS.Client.Core/NatsConnection.RequestReply.cs @@ -1,9 +1,6 @@ -using System.Buffers; -using System.Buffers.Text; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; -using System.Runtime.InteropServices; using NATS.Client.Core.Internal; namespace NATS.Client.Core; @@ -77,45 +74,6 @@ public async ValueTask> RequestAsync( throw new NatsNoReplyException(); } - // - // - public async ValueTask> RequestAsync2( - string subject, - TRequest? data, - NatsHeaders? headers = default, - INatsSerialize? requestSerializer = default, - INatsDeserialize? replySerializer = default, - NatsPubOpts? requestOpts = default, - NatsSubOpts? replyOpts = default, - CancellationToken cancellationToken = default) - { - var (tcs, replyTo) = await RequestManager.NewRequestAsync(cancellationToken).ConfigureAwait(false); - - await PublishAsync(subject, data, headers, replyTo, requestSerializer, requestOpts, cancellationToken).ConfigureAwait(false); - - // var msgBytes = await req.Tcs.Task.ConfigureAwait(false); - - var msgBytes = await tcs.RunAsync().ConfigureAwait(false); - - if (msgBytes.Headers?.Code == 503) - { - throw new NatsNoRespondersException(); - } - - using var memoryOwner = msgBytes.Data; - - // var replyData = (replySerializer ?? NatsDefaultSerializer.Default) - // .Deserialize(new ReadOnlySequence(memoryOwner.Memory)); - - return new NatsMsg( - msgBytes.Subject, - msgBytes.ReplyTo, - msgBytes.Size, - msgBytes.Headers, - default, // replyData, - msgBytes.Connection); - } - /// public async IAsyncEnumerable> RequestManyAsync( string subject, @@ -176,55 +134,6 @@ string Throw() } } - [SkipLocalsInit] - internal static string NewInbox(ReadOnlySpan prefix, long id) - { - Span buffer = stackalloc char[64]; - Span idBuffer = stackalloc byte[32]; - - if (!Utf8Formatter.TryFormat(id, idBuffer, out var idLength)) - { - return Throw(); - } - - var separatorLength = prefix.Length > 0 ? 1u : 0u; - var totalLength = (uint)prefix.Length + (uint)idLength + separatorLength; - if (totalLength <= buffer.Length) - { - buffer = buffer.Slice(0, (int)totalLength); - } - else - { - buffer = new char[totalLength]; - } - - var totalPrefixLength = (uint)prefix.Length + separatorLength; - if ((uint)buffer.Length > totalPrefixLength && (uint)buffer.Length > (uint)prefix.Length) - { - prefix.CopyTo(buffer); - buffer[prefix.Length] = '.'; - var remaining = buffer.Slice((int)totalPrefixLength); - - var bs = idBuffer.Slice(0, idLength); - for (var index = 0; index < bs.Length; index++) - { - var b = bs[index]; - remaining[index] = (char)b; - } - - return new string(buffer); - } - - return Throw(); - - [DoesNotReturn] - string Throw() - { - Debug.Fail("Must not happen"); - throw new InvalidOperationException("This should never be raised!"); - } - } - private NatsSubOpts SetReplyOptsDefaults(NatsSubOpts? replyOpts) { var opts = replyOpts ?? ReplyOptsDefault; diff --git a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs index ae09980fe..53d61cbdd 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs @@ -27,18 +27,6 @@ public async ValueTask CreateStreamAsync( return new NatsJSStream(this, response); } - public async ValueTask CreateStreamAsync2( - StreamConfig config, - CancellationToken cancellationToken = default) - { - ThrowIfInvalidStreamName(config.Name, nameof(config.Name)); - var response = await JSRequestResponseAsync2( - subject: $"{Opts.Prefix}.STREAM.CREATE.{config.Name}", - config, - cancellationToken); - return null;//new NatsJSStream(this, response); - } - /// /// Deletes a stream. /// diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index 43a19c7a2..f16c71166 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -173,30 +173,6 @@ public async ValueTask PublishAsync( throw new NatsJSPublishNoResponseException(); } - public async ValueTask PublishAsync2( - string subject, - T? data, - INatsSerialize? serializer = default, - NatsJSPubOpts? opts = default, - NatsHeaders? headers = default, - CancellationToken cancellationToken = default) - { - opts ??= NatsJSPubOpts.Default; - - await Connection.RequestAsync2( - subject: subject, - data: data, - headers: headers, - requestSerializer: serializer, - replySerializer: NatsJSJsonSerializer.Default, - requestOpts: opts, - replyOpts: default, - cancellationToken) - .ConfigureAwait(false); - - return default; - } - internal static void ThrowIfInvalidStreamName([NotNull] string? name, [CallerArgumentExpression("name")] string? paramName = null) { ArgumentNullException.ThrowIfNull(name, paramName); @@ -227,18 +203,6 @@ internal async ValueTask JSRequestResponseAsync( return response.Response!; } - internal async ValueTask JSRequestResponseAsync2( - string subject, - TRequest? request, - CancellationToken cancellationToken = default) - where TRequest : class - where TResponse : class - { - var response = await JSRequestAsync2(subject, request, cancellationToken); - response.EnsureSuccess(); - return response.Response!; - } - internal async ValueTask> JSRequestAsync( string subject, TRequest? request, @@ -293,32 +257,6 @@ internal async ValueTask> JSRequestAsync> JSRequestAsync2( - string subject, - TRequest? request, - CancellationToken cancellationToken = default) - where TRequest : class - where TResponse : class - { - if (request != null) - { - // TODO: Can't validate using JSON serializer context at the moment. - // Validator.ValidateObject(request, new ValidationContext(request)); - } - - var msg = await Connection.RequestAsync2( - subject: subject, - data: request, - headers: default, - replyOpts: default, // new NatsSubOpts { Timeout = Connection.Opts.RequestTimeout }, - requestSerializer: NatsJSJsonSerializer.Default, - replySerializer: NatsJSErrorAwareJsonSerializer.Default, - cancellationToken: cancellationToken) - .ConfigureAwait(false); - - return new NatsJSResponse(msg.Data, default); - } - [DoesNotReturn] private static void ThrowInvalidStreamNameException(string? paramName) => throw new ArgumentException("Stream name cannot contain ' ', '.'", paramName); From 50cb0a161d700b288d336421a4348ed19feef12b Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sun, 5 May 2024 17:01:07 +0100 Subject: [PATCH 5/8] Revert experimental code --- .../Internal/RequestManager.cs | 70 ------------------- 1 file changed, 70 deletions(-) delete mode 100644 src/NATS.Client.Core/Internal/RequestManager.cs diff --git a/src/NATS.Client.Core/Internal/RequestManager.cs b/src/NATS.Client.Core/Internal/RequestManager.cs deleted file mode 100644 index 5cae1671e..000000000 --- a/src/NATS.Client.Core/Internal/RequestManager.cs +++ /dev/null @@ -1,70 +0,0 @@ -using System.Buffers; -using NATS.Client.Core.Commands; - -namespace NATS.Client.Core.Internal; - -public class RequestManager -{ - private readonly NatsConnection _connection; - private readonly string _inboxPrefix; - - long _requestId; - private readonly Dictionary _requests = new(); - - // internal class ReqCmd - // { - // public TaskCompletionSource>> Tcs; - // } - - public RequestManager(NatsConnection connection, string inboxPrefix) - { - _connection = connection; - _inboxPrefix = inboxPrefix; - } - - internal async Task<(RequestCommand, string)> NewRequestAsync(CancellationToken cancellationToken) - { - // RequestManager - await _connection.SubscriptionManager.EnsureMuxInboxSubscribedAsync(cancellationToken).ConfigureAwait(false); - var req = RequestCommand.Rent(_connection.ObjectPool); - // req.Tcs = new TaskCompletionSource>>(TaskCreationOptions.RunContinuationsAsynchronously); - // var tcs = new TaskCompletionSource>>(TaskCreationOptions.RunContinuationsAsynchronously); - - var id = Interlocked.Increment(ref _requestId); - lock (_requests) - { - // _requests.Add(id, req); - _requests.Add(id, req); - } - - var replyTo = NatsConnection.NewInbox(_inboxPrefix, id); // $"{InboxPrefix}.{id}"; - - return (req, replyTo); - } - - internal void SetRequestReply(string subject, string? replyTo, int sid, in ReadOnlySequence? headersBuffer, in ReadOnlySequence payloadBuffer, long id) - { - // ReqCmd req; - RequestCommand tcs; - lock (_requests) - { - // if (!_requests.Remove(id, out req)) - if (!_requests.Remove(id, out tcs)) - { - return; - } - } - - var natsMsg = NatsMsg>.Build( - subject, - replyTo, - headersBuffer, - payloadBuffer, - _connection, - _connection.HeaderParser, - NatsDefaultSerializer>.Default); - - // req.Tcs.TrySetResult(natsMsg); - tcs.SetResult(natsMsg); - } -} From 42dd7658317e1be5c12b441a29fd678b2b42f377 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sun, 5 May 2024 18:14:45 +0100 Subject: [PATCH 6/8] Revert MicroBenchmark project --- sandbox/MicroBenchmark/MicroBenchmark.csproj | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sandbox/MicroBenchmark/MicroBenchmark.csproj b/sandbox/MicroBenchmark/MicroBenchmark.csproj index 6ee79ebd5..a46099bf1 100644 --- a/sandbox/MicroBenchmark/MicroBenchmark.csproj +++ b/sandbox/MicroBenchmark/MicroBenchmark.csproj @@ -13,12 +13,11 @@ - + - From 9fd5286e734813782abd60bb8f2fc10ac4781932 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sun, 5 May 2024 18:15:24 +0100 Subject: [PATCH 7/8] Simplified req-rep benchmark --- sandbox/MicroBenchmark/RequestReplyBench.cs | 92 ++++----------------- 1 file changed, 16 insertions(+), 76 deletions(-) diff --git a/sandbox/MicroBenchmark/RequestReplyBench.cs b/sandbox/MicroBenchmark/RequestReplyBench.cs index d48f794f0..c507a2d86 100644 --- a/sandbox/MicroBenchmark/RequestReplyBench.cs +++ b/sandbox/MicroBenchmark/RequestReplyBench.cs @@ -1,9 +1,5 @@ using BenchmarkDotNet.Attributes; -using NATS.Client; using NATS.Client.Core; -using NATS.Client.JetStream; -using NATS.Client.JetStream.Models; -using StreamInfo = NATS.Client.JetStream.StreamInfo; #pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable. @@ -14,90 +10,34 @@ namespace MicroBenchmark; [PlainExporter] public class RequestReplyBench { - private readonly StreamConfig _streamConfig = new("benchreqrep", new[] { "benchreqrep.>" }); - private readonly StreamConfig _streamConfig2 = new("benchreqrepV2", new[] { "benchreqrepV2.>" }) - { - NumReplicas = 1, - Discard = StreamConfigDiscard.Old, - DuplicateWindow = TimeSpan.Zero, - }; - - private readonly byte[] _data = new byte[128]; - private NatsConnection _nats; - private NatsJSContext _js; - private IJetStream _jetStream; - private IJetStreamManagement _jetStreamManagement; - private StreamConfiguration _streamConfiguration; + private CancellationTokenSource _cts; + private Task _subscription; [GlobalSetup] public async Task SetupAsync() { _nats = new NatsConnection(); await _nats.ConnectAsync(); - _js = new NatsJSContext(_nats); - - // await CleanupAsync(); - - await _js.CreateStreamAsync(_streamConfig); - - ConnectionFactory connectionFactory = new ConnectionFactory(); - Options opts = ConnectionFactory.GetDefaultOptions("localhost"); - var conn = connectionFactory.CreateConnection(opts); - _jetStream = conn.CreateJetStreamContext(); - _jetStreamManagement = conn.CreateJetStreamManagementContext(); - _streamConfiguration = StreamConfiguration.Builder() - .WithName("benchreqrepV1") - .WithStorageType(StorageType.File) - .WithSubjects("benchreqrepv1.>") - .Build(); + _cts = new CancellationTokenSource(); + _subscription = Task.Run(async () => + { + await foreach (var msg in _nats.SubscribeAsync("req_rep_bench", cancellationToken: _cts.Token)) + { + await msg.ReplyAsync(0xBEEF); + } + }); } - // [GlobalCleanup] + [GlobalCleanup] public async Task CleanupAsync() { - await Task.Delay(1_000); - await DeleteStreamAsync("benchreqrep"); - await DeleteStreamAsync("benchreqrepV1"); - await DeleteStreamAsync("benchreqrepV2"); - await Task.Delay(1_000); + await _cts.CancelAsync(); + await _subscription; + await _nats.DisposeAsync(); } [Benchmark] - public async Task JSPublishAsync() => - await _js.PublishAsync("benchreqrep.x", _data); - - [Benchmark] - public async Task JSPublishAsync2() => - await _js.PublishAsync2("benchreqrep.x", _data); - // - // [Benchmark] - // public async Task CreateStreamAsync() => - // await _js.CreateStreamAsync(_streamConfig2); - - - - [Benchmark] - public async Task JSPublishAsyncV1() => - await _jetStream.PublishAsync(subject: "benchreqrep.x", data: _data); - // - // [Benchmark] - // public StreamInfo? CreateStreamV1() => - // _jetStreamManagement.AddStream(_streamConfiguration); - // - // [Benchmark] - // public async Task CreateStreamAsync2() => - // await _js.CreateStreamAsync2(_streamConfig2); - - private async Task DeleteStreamAsync(string name) - { - try - { - await _js.DeleteStreamAsync(stream: name); - } - catch - { - // ignored - } - } + public async Task> RequestReplyAsync() => + await _nats.RequestAsync("req_rep_bench", 0xDEAD); } From e7bbeb948de8e975fe79405e64e0d24e8d6cdc75 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sun, 5 May 2024 18:25:13 +0100 Subject: [PATCH 8/8] Check return value --- sandbox/MicroBenchmark/RequestReplyBench.cs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sandbox/MicroBenchmark/RequestReplyBench.cs b/sandbox/MicroBenchmark/RequestReplyBench.cs index c507a2d86..e8710c5ab 100644 --- a/sandbox/MicroBenchmark/RequestReplyBench.cs +++ b/sandbox/MicroBenchmark/RequestReplyBench.cs @@ -38,6 +38,11 @@ public async Task CleanupAsync() } [Benchmark] - public async Task> RequestReplyAsync() => - await _nats.RequestAsync("req_rep_bench", 0xDEAD); + public async Task RequestReplyAsync() + { + var reply = await _nats.RequestAsync("req_rep_bench", 0xDEAD); + var result = reply.Data; + ArgumentOutOfRangeException.ThrowIfNotEqual(0xBEEF, result); + return result; + } }