Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use System.IO.Pipelines for sending #303

Merged
merged 30 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 33 additions & 29 deletions src/NATS.Client.Core/Commands/CommandWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal sealed class CommandWriter : IAsyncDisposable
private readonly PipeWriter _pipeWriter;
private readonly ProtocolWriter _protocolWriter;
private readonly ChannelWriter<QueuedCommand> _queuedCommandsWriter;
private readonly SemaphoreSlim _sem;
private readonly SemaphoreSlim _semLock;
private Task? _flushTask;
private bool _disposed;

Expand All @@ -39,12 +39,16 @@ public CommandWriter(NatsOpts opts, ConnectionStatsCounter counter, Action<PingC
_defaultCommandTimeout = overrideCommandTimeout ?? opts.CommandTimeout;
_enqueuePing = enqueuePing;
_opts = opts;
var pipe = new Pipe(new PipeOptions(pauseWriterThreshold: opts.WriterBufferSize, resumeWriterThreshold: opts.WriterBufferSize / 2, minimumSegmentSize: 65536, useSynchronizationContext: false));
var pipe = new Pipe(new PipeOptions(
pauseWriterThreshold: opts.WriterBufferSize, // flush will block after hitting
resumeWriterThreshold: opts.WriterBufferSize / 2, // will start flushing again after catching up
minimumSegmentSize: 65536, // larger segments allow for more productive syscall for socket.send
useSynchronizationContext: false));
PipeReader = pipe.Reader;
_pipeWriter = pipe.Writer;
_protocolWriter = new ProtocolWriter(_pipeWriter, opts.SubjectEncoding, opts.HeaderEncoding);
var channel = Channel.CreateUnbounded<QueuedCommand>(new UnboundedChannelOptions { SingleWriter = true, SingleReader = true });
_sem = new SemaphoreSlim(1);
_semLock = new SemaphoreSlim(1);
QueuedCommandsReader = channel.Reader;
_queuedCommandsWriter = channel.Writer;
}
Expand All @@ -57,7 +61,7 @@ public CommandWriter(NatsOpts opts, ConnectionStatsCounter counter, Action<PingC

public async ValueTask DisposeAsync()
{
await _sem.WaitAsync().ConfigureAwait(false);
await _semLock.WaitAsync().ConfigureAwait(false);
try
{
if (_disposed)
Expand All @@ -71,7 +75,7 @@ public async ValueTask DisposeAsync()
}
finally
{
_sem.Release();
_semLock.Release();
}
}

Expand All @@ -81,7 +85,7 @@ public ValueTask ConnectAsync(ClientOpts connectOpts, CancellationToken cancella
{
#pragma warning disable CA2016
#pragma warning disable VSTHRD103
if (!_sem.Wait(0))
if (!_semLock.Wait(0))
#pragma warning restore VSTHRD103
#pragma warning restore CA2016
{
Expand Down Expand Up @@ -113,7 +117,7 @@ public ValueTask ConnectAsync(ClientOpts connectOpts, CancellationToken cancella
}
finally
{
_sem.Release();
_semLock.Release();
}

return ValueTask.CompletedTask;
Expand All @@ -123,7 +127,7 @@ public ValueTask PingAsync(PingCommand pingCommand, CancellationToken cancellati
{
#pragma warning disable CA2016
#pragma warning disable VSTHRD103
if (!_sem.Wait(0))
if (!_semLock.Wait(0))
#pragma warning restore VSTHRD103
#pragma warning restore CA2016
{
Expand Down Expand Up @@ -156,7 +160,7 @@ public ValueTask PingAsync(PingCommand pingCommand, CancellationToken cancellati
}
finally
{
_sem.Release();
_semLock.Release();
}

return ValueTask.CompletedTask;
Expand All @@ -166,7 +170,7 @@ public ValueTask PongAsync(CancellationToken cancellationToken = default)
{
#pragma warning disable CA2016
#pragma warning disable VSTHRD103
if (!_sem.Wait(0))
if (!_semLock.Wait(0))
#pragma warning restore VSTHRD103
#pragma warning restore CA2016
{
Expand Down Expand Up @@ -198,7 +202,7 @@ public ValueTask PongAsync(CancellationToken cancellationToken = default)
}
finally
{
_sem.Release();
_semLock.Release();
}

return ValueTask.CompletedTask;
Expand All @@ -208,7 +212,7 @@ public ValueTask PublishAsync<T>(string subject, T? value, NatsHeaders? headers,
{
#pragma warning disable CA2016
#pragma warning disable VSTHRD103
if (!_sem.Wait(0))
if (!_semLock.Wait(0))
#pragma warning restore VSTHRD103
#pragma warning restore CA2016
{
Expand Down Expand Up @@ -240,7 +244,7 @@ public ValueTask PublishAsync<T>(string subject, T? value, NatsHeaders? headers,
}
finally
{
_sem.Release();
_semLock.Release();
}

return ValueTask.CompletedTask;
Expand All @@ -250,7 +254,7 @@ public ValueTask SubscribeAsync(int sid, string subject, string? queueGroup, int
{
#pragma warning disable CA2016
#pragma warning disable VSTHRD103
if (!_sem.Wait(0))
if (!_semLock.Wait(0))
#pragma warning restore VSTHRD103
#pragma warning restore CA2016
{
Expand Down Expand Up @@ -282,7 +286,7 @@ public ValueTask SubscribeAsync(int sid, string subject, string? queueGroup, int
}
finally
{
_sem.Release();
_semLock.Release();
}

return ValueTask.CompletedTask;
Expand All @@ -292,7 +296,7 @@ public ValueTask UnsubscribeAsync(int sid, CancellationToken cancellationToken)
{
#pragma warning disable CA2016
#pragma warning disable VSTHRD103
if (!_sem.Wait(0))
if (!_semLock.Wait(0))
#pragma warning restore VSTHRD103
#pragma warning restore CA2016
{
Expand Down Expand Up @@ -324,7 +328,7 @@ public ValueTask UnsubscribeAsync(int sid, CancellationToken cancellationToken)
}
finally
{
_sem.Release();
_semLock.Release();
}

return ValueTask.CompletedTask;
Expand All @@ -334,7 +338,7 @@ private async ValueTask ConnectStateMachineAsync(bool lockHeld, ClientOpts conne
{
if (!lockHeld)
{
if (!await _sem.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false))
if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false))
{
throw new TimeoutException();
}
Expand Down Expand Up @@ -365,15 +369,15 @@ private async ValueTask ConnectStateMachineAsync(bool lockHeld, ClientOpts conne
}
finally
{
_sem.Release();
_semLock.Release();
}
}

private async ValueTask PingStateMachineAsync(bool lockHeld, PingCommand pingCommand, CancellationToken cancellationToken)
{
if (!lockHeld)
{
if (!await _sem.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false))
if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false))
{
throw new TimeoutException();
}
Expand Down Expand Up @@ -405,15 +409,15 @@ private async ValueTask PingStateMachineAsync(bool lockHeld, PingCommand pingCom
}
finally
{
_sem.Release();
_semLock.Release();
}
}

private async ValueTask PongStateMachineAsync(bool lockHeld, CancellationToken cancellationToken)
{
if (!lockHeld)
{
if (!await _sem.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false))
if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false))
{
throw new TimeoutException();
}
Expand Down Expand Up @@ -444,15 +448,15 @@ private async ValueTask PongStateMachineAsync(bool lockHeld, CancellationToken c
}
finally
{
_sem.Release();
_semLock.Release();
}
}

private async ValueTask PublishStateMachineAsync<T>(bool lockHeld, string subject, T? value, NatsHeaders? headers, string? replyTo, INatsSerialize<T> serializer, CancellationToken cancellationToken)
{
if (!lockHeld)
{
if (!await _sem.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false))
if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false))
{
throw new TimeoutException();
}
Expand Down Expand Up @@ -483,15 +487,15 @@ private async ValueTask PublishStateMachineAsync<T>(bool lockHeld, string subjec
}
finally
{
_sem.Release();
_semLock.Release();
}
}

private async ValueTask SubscribeStateMachineAsync(bool lockHeld, int sid, string subject, string? queueGroup, int? maxMsgs, CancellationToken cancellationToken)
{
if (!lockHeld)
{
if (!await _sem.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false))
if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false))
{
throw new TimeoutException();
}
Expand Down Expand Up @@ -522,15 +526,15 @@ private async ValueTask SubscribeStateMachineAsync(bool lockHeld, int sid, strin
}
finally
{
_sem.Release();
_semLock.Release();
}
}

private async ValueTask UnsubscribeStateMachineAsync(bool lockHeld, int sid, CancellationToken cancellationToken)
{
if (!lockHeld)
{
if (!await _sem.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false))
if (!await _semLock.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false))
{
throw new TimeoutException();
}
Expand Down Expand Up @@ -561,7 +565,7 @@ private async ValueTask UnsubscribeStateMachineAsync(bool lockHeld, int sid, Can
}
finally
{
_sem.Release();
_semLock.Release();
}
}

Expand Down
Loading
Loading