Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send buffer changes #346

Merged
merged 30 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e90d588
Send buffer refinements
mtmk Jan 22, 2024
2604370
Command writer pipeline encapsulation
mtmk Jan 22, 2024
85972f5
Default write buffer size
mtmk Jan 22, 2024
78f0392
Revert tmp comment outs
mtmk Jan 23, 2024
abf863c
HPUB writer fix
mtmk Jan 23, 2024
d607449
Merge branch 'main' into 341-send-buffer-refinement
mtmk Jan 23, 2024
7b87f22
Use channel instead of semaphore for locks
mtmk Jan 23, 2024
0623d61
Merge branch 'main' into 341-send-buffer-refinement
mtmk Jan 26, 2024
2e15356
Reuse same pipeline avoiding data loss
mtmk Jan 26, 2024
1d11ccf
Tidy up
mtmk Jan 26, 2024
693266d
Reverted ping command
mtmk Jan 26, 2024
2ee24c0
Revert to clearing buffer
mtmk Jan 26, 2024
ddc880c
Merge branch 'main' into 341-send-buffer-refinement
mtmk Jan 26, 2024
15bf6ea
Merge branch 'main' into 341-send-buffer-refinement
mtmk Jan 28, 2024
3efe37c
Lock function
mtmk Jan 28, 2024
2404ff3
Added inlining
mtmk Jan 29, 2024
722150f
Command timeouts
mtmk Jan 30, 2024
a8a5d5d
Cancellation fixes
mtmk Jan 31, 2024
08e1095
Merge branch 'main' into 341-send-buffer-refinement
mtmk Jan 31, 2024
3787133
Derive pool rent size from buffer size option
mtmk Jan 31, 2024
45411a6
Fixed format
mtmk Jan 31, 2024
01b742b
Command timeout test
mtmk Jan 31, 2024
535d697
Test debug
mtmk Jan 31, 2024
51fa05d
Keep send buffer at message boundaries
mtmk Feb 2, 2024
54c007f
Flush buffers cleanly on dispose
mtmk Feb 2, 2024
cce3450
Fixing buffer msg boundry issue
mtmk Feb 2, 2024
d3c0288
Format fixed
mtmk Feb 2, 2024
6334b57
Buffer position
mtmk Feb 2, 2024
ad5c0b2
Handle socket exception
mtmk Feb 2, 2024
72f1213
throw away bytes in send buffer after a failed send (#368)
caleblloyd Feb 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
780 changes: 315 additions & 465 deletions src/NATS.Client.Core/Commands/CommandWriter.cs

Large diffs are not rendered by default.

208 changes: 208 additions & 0 deletions src/NATS.Client.Core/Commands/NatsPooledBufferWriter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
using System.Buffers;
using System.Numerics;
using System.Runtime.CompilerServices;
using NATS.Client.Core.Internal;

namespace NATS.Client.Core.Commands;

// adapted from https://github.com/CommunityToolkit/dotnet/blob/v8.2.2/src/CommunityToolkit.HighPerformance/Buffers/ArrayPoolBufferWriter%7BT%7D.cs
internal sealed class NatsPooledBufferWriter<T> : IBufferWriter<T>, IObjectPoolNode<NatsPooledBufferWriter<T>>
{
private const int DefaultInitialMinBufferSize = 256;
private const int DefaultInitialMaxBufferSize = 65536;

private readonly ArrayPool<T> _pool;
private readonly int _size;
private T[]? _array;
private int _index;
private NatsPooledBufferWriter<T>? _next;

public NatsPooledBufferWriter(int size)
{
if (size < DefaultInitialMinBufferSize)
{
size = DefaultInitialMinBufferSize;
}

if (size > DefaultInitialMaxBufferSize)
{
size = DefaultInitialMaxBufferSize;
}

_size = size;
_pool = ArrayPool<T>.Shared;
_array = _pool.Rent(size);
_index = 0;
}

public ref NatsPooledBufferWriter<T>? NextNode => ref _next;

/// <summary>
/// Gets the data written to the underlying buffer so far, as a <see cref="ReadOnlyMemory{T}"/>.
/// </summary>
public ReadOnlyMemory<T> WrittenMemory
{
get
{
var array = _array;

if (array is null)
{
ThrowObjectDisposedException();
}

return array!.AsMemory(0, _index);
}
}

/// <summary>
/// Gets the data written to the underlying buffer so far, as a <see cref="ReadOnlySpan{T}"/>.
/// </summary>
public ReadOnlySpan<T> WrittenSpan
{
get
{
var array = _array;

if (array is null)
{
ThrowObjectDisposedException();
}

return array!.AsSpan(0, _index);
}
}

/// <summary>
/// Gets the amount of data written to the underlying buffer so far.
/// </summary>
public int WrittenCount
{
get => _index;
}

/// <inheritdoc/>
public void Advance(int count)
{
var array = _array;

if (array is null)
{
ThrowObjectDisposedException();
}

if (count < 0)
{
ThrowArgumentOutOfRangeExceptionForNegativeCount();
}

if (_index > array!.Length - count)
{
ThrowArgumentExceptionForAdvancedTooFar();
}

_index += count;
}

/// <inheritdoc/>
public Memory<T> GetMemory(int sizeHint = 0)
{
CheckBufferAndEnsureCapacity(sizeHint);

return _array.AsMemory(_index);
}

/// <inheritdoc/>
public Span<T> GetSpan(int sizeHint = 0)
{
CheckBufferAndEnsureCapacity(sizeHint);

return _array.AsSpan(_index);
}

public void Reset()
{
if (_array != null)
_pool.Return(_array);
_array = _pool.Rent(_size);
_index = 0;
}

/// <inheritdoc/>
public override string ToString()
{
// See comments in MemoryOwner<T> about this
if (typeof(T) == typeof(char) &&
_array is char[] chars)
{
return new(chars, 0, _index);
}

// Same representation used in Span<T>
return $"NatsPooledBufferWriter<{typeof(T)}>[{_index}]";
}

[MethodImpl(MethodImplOptions.NoInlining)]
private static void ThrowArgumentOutOfRangeExceptionForNegativeCount() => throw new ArgumentOutOfRangeException("count", "The count can't be a negative value.");

[MethodImpl(MethodImplOptions.NoInlining)]
private static void ThrowArgumentOutOfRangeExceptionForNegativeSizeHint() => throw new ArgumentOutOfRangeException("sizeHint", "The size hint can't be a negative value.");

[MethodImpl(MethodImplOptions.NoInlining)]
private static void ThrowArgumentExceptionForAdvancedTooFar() => throw new ArgumentException("The buffer writer has advanced too far.");

[MethodImpl(MethodImplOptions.NoInlining)]
private static void ThrowObjectDisposedException() => throw new ObjectDisposedException("The current buffer has already been disposed.");

/// <summary>
/// Ensures that <see cref="_array"/> has enough free space to contain a given number of new items.
/// </summary>
/// <param name="sizeHint">The minimum number of items to ensure space for in <see cref="_array"/>.</param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void CheckBufferAndEnsureCapacity(int sizeHint)
{
var array = _array;

if (array is null)
{
ThrowObjectDisposedException();
}

if (sizeHint < 0)
{
ThrowArgumentOutOfRangeExceptionForNegativeSizeHint();
}

if (sizeHint == 0)
{
sizeHint = 1;
}

if (sizeHint > array!.Length - _index)
{
ResizeBuffer(sizeHint);
}
}

/// <summary>
/// Resizes <see cref="_array"/> to ensure it can fit the specified number of new items.
/// </summary>
/// <param name="sizeHint">The minimum number of items to ensure space for in <see cref="_array"/>.</param>
[MethodImpl(MethodImplOptions.NoInlining)]
private void ResizeBuffer(int sizeHint)
{
var minimumSize = (uint)_index + (uint)sizeHint;

// The ArrayPool<T> class has a maximum threshold of 1024 * 1024 for the maximum length of
// pooled arrays, and once this is exceeded it will just allocate a new array every time
// of exactly the requested size. In that case, we manually round up the requested size to
// the nearest power of two, to ensure that repeated consecutive writes when the array in
// use is bigger than that threshold don't end up causing a resize every single time.
if (minimumSize > 1024 * 1024)
{
minimumSize = BitOperations.RoundUpToPowerOf2(minimumSize);
}

_pool.Resize(ref _array, (int)minimumSize);
}
}
25 changes: 25 additions & 0 deletions src/NATS.Client.Core/Commands/PriorityCommandWriter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using NATS.Client.Core.Internal;

namespace NATS.Client.Core.Commands;

internal sealed class PriorityCommandWriter : IAsyncDisposable
{
private int _disposed;

public PriorityCommandWriter(ObjectPool pool, ISocketConnection socketConnection, NatsOpts opts, ConnectionStatsCounter counter, Action<PingCommand> enqueuePing)
{
CommandWriter = new CommandWriter(pool, opts, counter, enqueuePing, overrideCommandTimeout: Timeout.InfiniteTimeSpan);
CommandWriter.Reset(socketConnection);
}

public CommandWriter CommandWriter { get; }

public async ValueTask DisposeAsync()
{
if (Interlocked.Increment(ref _disposed) == 1)
{
// disposing command writer marks pipe writer as complete
await CommandWriter.DisposeAsync().ConfigureAwait(false);
}
}
}