Skip to content

Commit

Permalink
Protect default memory pool for bolt (#758)
Browse files Browse the repository at this point in the history
* The driver should use it's own pool by default

* fix docs

* Remove ability to disable pipelined message reader

* make pool a readonly field instead of get property
  • Loading branch information
thelonelyvulpes committed Dec 14, 2023
1 parent da06979 commit acb8ff6
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 20 deletions.
37 changes: 17 additions & 20 deletions Neo4j.Driver/Neo4j.Driver/Config.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,65 +232,62 @@ public int MaxIdleConnectionPoolSize
/// By default the driver allows the collection of this telemetry.
/// </summary>
public bool TelemetryDisabled { get; set; }

/// <summary>
/// The configuration for the driver's underlying message reading from the network.
/// </summary>
public MessageReaderConfig MessageReaderConfig { get; internal set; } = MessageReaderConfig.Default;
public MessageReaderConfig MessageReaderConfig { get; internal set; } = new();
}

/// <summary>
/// The configuration for the driver's underlying message reading from the network.
/// </summary>
public sealed class MessageReaderConfig
{
internal static MessageReaderConfig Default { get; } = new();

/// <summary>
/// Constructs a new instance of <see cref="MessageReaderConfig"/>.<br/>
/// The configuration for the driver's underlying message reading from the network.
/// </summary>
/// <param name="disablePipelinedMessageReader">As of 5.15, the driver has migrated the underlying message reading
/// mechanism utilizing <see cref="PipeReader"/>; this optimizes the reading and memory usage of the driver, and
/// setting this to true will revert the driver to the legacy message reader.</param>
/// <param name="memoryPool">The memory pool for creating buffers when reading messages. The PipeReader will borrow
/// memory from the pool of at least ReadBufferSize size. The message reader can request larger memory blocks to
/// host an entire message. User code can provide an implementation for monitoring; by default, the driver will use
/// .NET's <see cref="System.Buffers.MemoryPool{Byte}.Shared"/> pool.</param>
/// host an entire message. User code can provide an implementation for monitoring; by default, the driver will
/// allocate a new array pool that does not take advantage of shared memory pools.</param>
/// <param name="minBufferSize">The minimum buffer size to use when renting memory from the pool. The default value
/// is 65,539.</param>
/// <seealso cref="PipeReader"/>
/// <seealso cref="MemoryPool{T}"/>
/// <seealso cref="StreamPipeReaderOptions"/>
/// <remarks>
/// To optimize the memory usage of the driver pass .NET's shared memory pool(<see cref="MemoryPool{T}.Shared"/>) as
/// the <paramref name="memoryPool"/>, this should only be used when there is complete trust over the usage of
/// shared memory buffers in the application as other components may be using the same memory pool.
/// </remarks>
/// <exception cref="ArgumentOutOfRangeException">If <paramref name="minBufferSize"/>is less than 1.</exception>
public MessageReaderConfig(bool disablePipelinedMessageReader = false, MemoryPool<byte> memoryPool = null, int minBufferSize = -1)
public MessageReaderConfig(MemoryPool<byte> memoryPool = null, int minBufferSize = -1)
{
DisablePipelinedMessageReader = disablePipelinedMessageReader;
if (disablePipelinedMessageReader)
{
return;
}
MemoryPool = memoryPool ?? MemoryPool<byte>.Shared;
if (minBufferSize is < -1 or 0)
{
throw new ArgumentOutOfRangeException(nameof(minBufferSize));
}

DisablePipelinedMessageReader = false;
MinBufferSize = minBufferSize == -1 ? 65_535 + 4 : minBufferSize;
MemoryPool = memoryPool ?? new PipeReaderMemoryPool(MinBufferSize);
StreamPipeReaderOptions = new(MemoryPool, MinBufferSize, leaveOpen: true);
}

/// <summary>
/// As of 5.15, the driver has migrated the underlying message reading mechanism utilizing <see cref="PipeReader"/>;
/// this optimizes the reading and memory usage of the driver, and setting this to true will revert the driver to
/// the legacy message reader.
/// </summary>
public bool DisablePipelinedMessageReader { get; }
internal bool DisablePipelinedMessageReader { get; }

/// <summary>
/// The memory pool for creating buffers when reading messages. The PipeReader will borrow memory from the pool of
/// at least <see cref="MinBufferSize"/> size. The message reader can request larger memory blocks to host
/// an entire message. User code can provide an implementation for monitoring; by default, the driver will use
/// .NET's <see cref="MemoryPool{Byte}.Shared"/> pool.
/// an entire message. User code can provide an implementation for monitoring; by default, the driver will allocate
/// a new array pool that does not take advantage of shared memory pools.
/// </summary>
public MemoryPool<byte> MemoryPool { get; }

Expand Down
94 changes: 94 additions & 0 deletions Neo4j.Driver/Neo4j.Driver/Internal/Util/PipeReaderMemoryPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright (c) "Neo4j"
// Neo4j Sweden AB [https://neo4j.com]
//
// Licensed under the Apache License, Version 2.0 (the "License").
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Buffers;

namespace Neo4j.Driver.Internal;

/// <summary>
/// Simple memory pool based on the .NET's Pool.
/// </summary>
internal sealed class PipeReaderMemoryPool : MemoryPool<byte>
{
private readonly int _defaultSize;
private readonly ArrayPool<byte> _pool;

public PipeReaderMemoryPool(int defaultSize)
{
_defaultSize = defaultSize;
_pool = ArrayPool<byte>.Create();
}

public override int MaxBufferSize => int.MaxValue;

public override IMemoryOwner<byte> Rent(int minimumBufferSize = -1)
{
if (minimumBufferSize == -1)
{
minimumBufferSize = _defaultSize;
}

if (minimumBufferSize < 0 || minimumBufferSize > MaxBufferSize)
{
throw new ArgumentOutOfRangeException(nameof(minimumBufferSize), minimumBufferSize, "requested size is invalid");
}

return new PooledMemory(minimumBufferSize, _pool);
}

protected override void Dispose(bool disposing)
{
}

private sealed class PooledMemory : IMemoryOwner<byte>
{
private byte[] _array;
private readonly ArrayPool<byte> _pool;

public PooledMemory(int size, ArrayPool<byte> pool)
{
_array = pool.Rent(size);
_pool = pool;
}

public Memory<byte> Memory
{
get
{
var array = _array;
if (array == null)
{
throw new ObjectDisposedException(nameof(PooledMemory));
}

return new Memory<byte>(array);
}
}

public void Dispose()
{
var array = _array;

if (array == null)
{
return;
}

_array = null;
_pool.Return(array);
}
}
}

0 comments on commit acb8ff6

Please sign in to comment.