Skip to content

Commit

Permalink
Shared memory WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
sanych-sun committed Sep 5, 2023
1 parent f153691 commit 11cb5a5
Show file tree
Hide file tree
Showing 22 changed files with 568 additions and 175 deletions.
19 changes: 19 additions & 0 deletions src/RabbitMQ.Next/Buffers/IMemoryAccessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System;
using System.IO;

namespace RabbitMQ.Next.Buffers;

internal interface IMemoryAccessor : IDisposable
{
int Size { get; }

ReadOnlyMemory<byte> Memory { get; }

IMemoryAccessor Next { get; }

IMemoryAccessor Append(IMemoryAccessor next);

void WriteTo(Stream stream);

void CopyTo(Span<byte> destination);
}
100 changes: 100 additions & 0 deletions src/RabbitMQ.Next/Buffers/MemoryAccessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
using System;
using System.IO;
using System.Runtime.CompilerServices;
using Microsoft.Extensions.ObjectPool;

namespace RabbitMQ.Next.Buffers;

internal sealed class MemoryAccessor : IMemoryAccessor
{
private readonly ObjectPool<byte[]> memoryPool;
private byte[] memory;
private readonly int offset;

public MemoryAccessor(ObjectPool<byte[]> memoryPool, byte[] memory, int offset, int size)
{
ArgumentNullException.ThrowIfNull(memoryPool);
ArgumentNullException.ThrowIfNull(memory);

if (offset < 0)
{
throw new ArgumentOutOfRangeException(nameof(offset));
}

if (offset > memory.Length)
{
throw new ArgumentOutOfRangeException(nameof(offset));
}

if(offset + size >= memory.Length)
{
throw new ArgumentOutOfRangeException(nameof(size));
}

this.memoryPool = memoryPool;
this.memory = memory;
this.offset = offset;
this.Size = size;
}

public void Dispose()
{
if (this.memory == null)
{
return;
}

this.memoryPool.Return(this.memory);
this.memory = null;
this.Next = null;
}

public int Size { get; }

public ReadOnlyMemory<byte> Memory
{
get
{
this.CheckDisposed();
return new(this.memory, this.offset, this.Size);
}
}

public IMemoryAccessor Next { get; private set; }

public IMemoryAccessor Append(IMemoryAccessor next)
{
if (this.Next != null)
{
throw new InvalidOperationException();
}

this.Next = next;
return next;
}

public void WriteTo(Stream stream)
{
ArgumentNullException.ThrowIfNull(stream);
this.CheckDisposed();

stream.Write(this.memory, this.offset, this.Size);
}

public void CopyTo(Span<byte> destination)
{
if (this.Size > 0)
{
this.Memory.Span.CopyTo(destination);
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void CheckDisposed()
{
if (this.memory == null)
{
throw new ObjectDisposedException(nameof(MemoryAccessor));
}
}
}
10 changes: 5 additions & 5 deletions src/RabbitMQ.Next/Buffers/MemoryBlockExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,25 @@ namespace RabbitMQ.Next.Buffers;

internal static class MemoryBlockExtensions
{
public static ReadOnlySequence<byte> ToSequence(this MemoryBlock source)
public static ReadOnlySequence<byte> ToSequence(this IMemoryAccessor source)
{
if (source == null || source.Length == 0)
if (source == null || source.Size == 0)
{
return ReadOnlySequence<byte>.Empty;
}

if (source.Next == null)
{
return new ReadOnlySequence<byte>(source);
return new ReadOnlySequence<byte>(source.Memory);
}

var first = new MemorySegment<byte>(source);
var first = new MemorySegment<byte>(source.Memory);
var last = first;
var current = source.Next;

while (current != null)
{
last = last.Append(current);
last = last.Append(current.Memory);
current = current.Next;
}

Expand Down
19 changes: 19 additions & 0 deletions src/RabbitMQ.Next/Buffers/MemoryPoolPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using Microsoft.Extensions.ObjectPool;

namespace RabbitMQ.Next.Buffers;

internal sealed class MemoryPoolPolicy : PooledObjectPolicy<byte[]>
{
private readonly int bufferSize;

public MemoryPoolPolicy(int bufferSize)
{
this.bufferSize = bufferSize;
}

public override byte[] Create()
=> new byte[this.bufferSize];

public override bool Return(byte[] obj)
=> obj.Length == this.bufferSize;
}
193 changes: 193 additions & 0 deletions src/RabbitMQ.Next/Buffers/SharedMemory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
using System;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using Microsoft.Extensions.ObjectPool;

namespace RabbitMQ.Next.Buffers;

internal sealed class SharedMemory : IDisposable
{
private readonly ObjectPool<byte[]> memoryPool;
private int referencesCount = 1;
private byte[] memory;

public SharedMemory(ObjectPool<byte[]> memoryPool, byte[] memory)
{
ArgumentNullException.ThrowIfNull(memoryPool);
ArgumentNullException.ThrowIfNull(memory);

this.memoryPool = memoryPool;
this.memory = memory;
}

public void Dispose()
{
var refsCount = Interlocked.Decrement(ref this.referencesCount);
if (refsCount != 0)
{
return;
}

this.memoryPool.Return(this.memory);
this.memory = null;
}

public MemoryAccessor Slice(int offset, int size)
=> new(this, offset, size);

private void DisposeCheck()
{
if (this.memory == null)
{
throw new ObjectDisposedException(nameof(SharedMemory));
}
}

public readonly ref struct MemoryAccessor
{
private readonly SharedMemory owner;
private readonly int offset;

public MemoryAccessor(SharedMemory owner, int offset, int length)
{
ArgumentNullException.ThrowIfNull(owner);
owner.DisposeCheck();

if (offset < 0)
{
throw new ArgumentOutOfRangeException(nameof(offset));
}

if (offset > owner.memory.Length)
{
throw new ArgumentOutOfRangeException(nameof(offset));
}

if(offset + length >= owner.memory.Length)
{
throw new ArgumentOutOfRangeException(nameof(length));
}

this.owner = owner;
this.offset = offset;
this.Length = length;
}

public int Length { get; }

public ReadOnlySpan<byte> Span
{
get
{
this.owner.DisposeCheck();
return new (this.owner.memory, this.offset, this.Length);
}
}

public MemoryAccessor Slice(int offset)
=> this.Slice(offset, this.Length - offset);

public MemoryAccessor Slice(int offset, int length)
{
if (offset < 0)
{
throw new ArgumentOutOfRangeException(nameof(offset));
}

if (length < 0)
{
throw new ArgumentOutOfRangeException(nameof(length));
}

if (offset + length > this.Length)
{
throw new ArgumentOutOfRangeException(nameof(length));
}

return new(this.owner, this.offset + offset, length);
}

public IMemoryAccessor AsRef()
{
this.owner.DisposeCheck();
return new SharedMemoryAccessor(this.owner, this.offset, this.Length);
}
}

private sealed class SharedMemoryAccessor : IMemoryAccessor
{
private readonly int offset;
private SharedMemory owner;

public SharedMemoryAccessor(SharedMemory owner, int offset, int size)
{
this.owner = owner;
this.offset = offset;
this.Size = size;

Interlocked.Increment(ref this.owner.referencesCount);
}


public void Dispose()
{
if (this.owner == null)
{
return;
}

this.owner.Dispose();
this.owner = null;
this.Next?.Dispose();
this.Next = null;
}

public int Size { get; }

public ReadOnlyMemory<byte> Memory
{
get
{
this.CheckDisposed();
return new(this.owner.memory, this.offset, this.Size);
}
}

public IMemoryAccessor Next { get; private set; }

public IMemoryAccessor Append(IMemoryAccessor next)
{
if (this.Next != null)
{
throw new InvalidOperationException();
}

this.Next = next;
return next;
}

public void WriteTo(Stream stream)
{
ArgumentNullException.ThrowIfNull(stream);
this.CheckDisposed();

stream.Write(this.owner.memory, this.offset, this.Size);
}

public void CopyTo(Span<byte> destination)
{
this.CheckDisposed();
this.Memory.Span.CopyTo(destination);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void CheckDisposed()
{
if (this.owner == null)
{
throw new ObjectDisposedException(nameof(SharedMemoryAccessor));
}
}
}
}
Loading

0 comments on commit 11cb5a5

Please sign in to comment.