Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update array pool config, allow driver to request a max sized array #764

Merged
merged 11 commits into from
Jan 5, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Buffers;
using FluentAssertions;
using Neo4j.Driver.Internal.IO;
using Neo4j.Driver.Internal.Util;
using Xunit;

Expand Down Expand Up @@ -43,5 +45,34 @@ public void ShouldReturnNewSessionOptions()
options1.Database.Should().Be("foo");
options2.Database.Should().Be("system");
}

[Fact]
public void ShouldConfigureReaderConfig()
{
var cb = new ConfigBuilder(new Config());
cb.WithDefaultReadBufferSize(128);
var config = cb.Build();
config.MessageReaderConfig.Should().NotBeNull();
config.MessageReaderConfig.MinBufferSize.Should().Be(128);
}

[Fact]
public void ShouldDefaultReaderConfig()
{
var cb = new ConfigBuilder(new Config());
var config = cb.Build();
config.MessageReaderConfig.Should().NotBeNull();
config.MessageReaderConfig.MinBufferSize.Should().Be(Constants.DefaultReadBufferSize);
}

[Fact]
public void ShouldUseSpecifiedConfigObject()
{
var cb = new ConfigBuilder(new Config());
cb.WithMessageReaderConfig(new MessageReaderConfig(MemoryPool<byte>.Shared));
var config = cb.Build();
config.MessageReaderConfig.Should().NotBeNull();
config.MessageReaderConfig.MemoryPool.Should().Be(MemoryPool<byte>.Shared);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright (c) "Neo4j"
// Neo4j Sweden AB [https://neo4j.com]
//
// Licensed under the Apache License, Version 2.0 (the "License").
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Buffers;
using FluentAssertions;
using Neo4j.Driver.Internal.IO;
using Xunit;

namespace Neo4j.Driver.Internal.Util;

public class PipeReaderMemoryPoolTests
{
[Theory]
[InlineData(512, 512, Constants.MaxReadBufferSize)]
[InlineData(1024, 1024, Constants.MaxReadBufferSize)]
[InlineData(1025, 2048, Constants.MaxReadBufferSize)]
[InlineData(2049, 4096, Constants.MaxReadBufferSize)]
[InlineData(4096, 4096, Constants.MaxReadBufferSize)]
[InlineData(4097, 8192, Constants.MaxReadBufferSize)]
[InlineData(65535, 65536, Constants.MaxReadBufferSize)]
[InlineData(65539, 131072, Constants.MaxReadBufferSize)]
[InlineData(1025, 1025, 1024)]
public void ShouldRentMemoryInPower(int size, int expectedSize, int maxReadBufferSize)
{
var pool = new PipeReaderMemoryPool(1024, maxReadBufferSize);
using var memoryOwner = pool.Rent(size);
memoryOwner.Memory.Length.Should().Be(expectedSize);
}

[Fact]
public void ShouldRentMemoryInPowerWithDefaultSize()
{
var pool = new PipeReaderMemoryPool(1024, Constants.MaxReadBufferSize);
using var memoryOwner = pool.Rent();
memoryOwner.Memory.Length.Should().Be(1024);
}

[Fact]
public void ShouldReusePooledObjects()
{
var pool = new PipeReaderMemoryPool(1024, Constants.MaxReadBufferSize);

int length;
using (var memoryOwner = pool.Rent(4321))
{
length = memoryOwner.Memory.Length;
memoryOwner.Memory.Span[0] = 1;
}

using (var memoryOwner = pool.Rent(4321))
{
memoryOwner.Memory.Length.Should().Be(length);
memoryOwner.Memory.Span[0].Should().Be(1);
}
}

/// <summary>
/// This test is to verify the behaviour of the shared pool.
/// It is an unnecessary test but it proves the behaviour to validate <see cref="ShouldNotReturnSharedPoolObjects"/>
/// </summary>
[Fact]
public void SharedPoolShouldReturnSameValue()
{
var pool = MemoryPool<byte>.Shared;
int length;
using (var memoryOwner = pool.Rent(1024))
{
length = memoryOwner.Memory.Length;
memoryOwner.Memory.Span[0] = 1;
}

using (var memoryOwner = pool.Rent(1024))
{
memoryOwner.Memory.Length.Should().Be(length);
memoryOwner.Memory.Span[0].Should().Be(1);
}
}

[Fact]
public void ShouldNotReturnSharedPoolObjects()
{
var pool = MemoryPool<byte>.Shared;
using (var memoryOwner = pool.Rent(1024))
{
memoryOwner.Memory.Length.Should().Be(1024);
memoryOwner.Memory.Span[0] = 1;
}

pool = new PipeReaderMemoryPool(1024, 2048);
using (var memoryOwner = pool.Rent(1024))
{
memoryOwner.Memory.Length.Should().Be(1024);
memoryOwner.Memory.Span[0].Should().Be(0);
}
}

[Fact]
public void CanBorrowMaxLengthArray()
{
var pool = new PipeReaderMemoryPool(1024, Constants.MaxReadBufferSize);

// 2146435071 is the max length of an array in .NET
using (var memoryOwner = pool.Rent(2146435071))
{
// when returned to the pool, as it exceeds the max size of the pool, it will be discarded
memoryOwner.Memory.Length.Should().Be(2146435071);
}
}

[Fact]
public void ShouldNotStoreLargerThanMaxReadBufferSize()
{
var pool = new PipeReaderMemoryPool(1024, 1024);

using (var memory = pool.Rent(1025))
{
memory.Memory.Length.Should().Be(1025);
memory.Memory.Span[0] = 1;
}

using (var memory = pool.Rent(1025))
{
memory.Memory.Length.Should().Be(1025);
memory.Memory.Span[0].Should().Be(0);
}
}
}
3 changes: 2 additions & 1 deletion Neo4j.Driver/Neo4j.Driver.Tests/TestDriverContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ internal static class TestDriverContext
{
static TestDriverContext()
{
MockContext = new DriverContext(new Uri("bolt://localhost:7687"), AuthTokenManagers.None, new Config());
MockContext = new DriverContext(new Uri("bolt://localhost:7687"), AuthTokenManagers.None,
new ConfigBuilder(new Config()).Build());
}

public static DriverContext MockContext { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

using System;
using System.Buffers;
using Neo4j.Driver.Internal.IO;

namespace Neo4j.Driver.Internal.Util;

Expand All @@ -25,11 +26,11 @@ internal sealed class PipeReaderMemoryPool : MemoryPool<byte>
{
private readonly int _defaultSize;
private readonly ArrayPool<byte> _pool;

public PipeReaderMemoryPool(int defaultSize)
public PipeReaderMemoryPool(int defaultBufferSize, int maxPooledBufferSize)
{
_defaultSize = defaultSize;
_pool = ArrayPool<byte>.Create();
_defaultSize = defaultBufferSize;
_pool = ArrayPool<byte>.Create(maxPooledBufferSize, 64);
}

public override int MaxBufferSize => int.MaxValue;
Expand Down
56 changes: 45 additions & 11 deletions Neo4j.Driver/Neo4j.Driver/Public/Config.cs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public int MaxIdleConnectionPoolSize
/// <summary>
/// The configuration for the driver's underlying message reading from the network.
/// </summary>
public MessageReaderConfig MessageReaderConfig { get; internal set; } = new();
public MessageReaderConfig MessageReaderConfig { get; internal set; }
}

/// <summary>
Expand All @@ -247,14 +247,17 @@ public sealed class MessageReaderConfig
{
/// <summary>
/// Constructs a new instance of <see cref="MessageReaderConfig"/>.<br/>
/// The configuration for the driver's underlying message reading from the network.
/// The configuration for the driver's underlying message reading from the network.<br/>
/// Using this constructor overrides the <see cref="Config.DefaultReadBufferSize"/> and <see cref="Config.MaxReadBufferSize"/>.
/// </summary>
/// <param name="memoryPool">The memory pool for creating buffers when reading messages. The PipeReader will borrow
/// memory from the pool of at least ReadBufferSize size. The message reader can request larger memory blocks to
/// host an entire message. User code can provide an implementation for monitoring; by default, the driver will
/// allocate a new array pool that does not take advantage of shared memory pools.</param>
/// <param name="minBufferSize">The minimum buffer size to use when renting memory from the pool. The default value
/// is 65,539.</param>
/// <param name="minBufferSize">The minimum buffer size to use when renting memory from the pool.
/// <br/>The default value is 32,768.</param>
/// <param name="maxPooledBufferSize">The maximum buffer size to use when renting memory from neo4j's default pool.
/// <br/>The default is 131,072.</param>
/// <seealso cref="PipeReader"/>
/// <seealso cref="MemoryPool{T}"/>
/// <seealso cref="StreamPipeReaderOptions"/>
Expand All @@ -263,20 +266,51 @@ public sealed class MessageReaderConfig
/// the <paramref name="memoryPool"/>, this should only be used when there is complete trust over the usage of
/// shared memory buffers in the application as other components may be using the same memory pool.
/// </remarks>
/// <exception cref="ArgumentOutOfRangeException">If <paramref name="minBufferSize"/>is less than 1.</exception>
public MessageReaderConfig(MemoryPool<byte> memoryPool = null, int minBufferSize = -1)
/// <remarks>
/// The <paramref name="memoryPool"/> will define it's own maximum pooled buffer size, but must be able to provide
/// an memory object upto the limit 2146435071 bytes. The <paramref name="maxPooledBufferSize"/> will not be observed
/// when the <paramref name="memoryPool"/> is passed..
/// </remarks>
/// <remarks>
/// Note using a small value for <paramref name="minBufferSize"/> could cause a degradation in performance.
/// </remarks>
/// <exception cref="ArgumentOutOfRangeException">
/// If <paramref name="minBufferSize"/>is less than 1 or greater than 2146435071
/// </exception>
/// <exception cref="ArgumentOutOfRangeException">
/// If <paramref name="maxPooledBufferSize"/> is less than <paramref name="minBufferSize"/> or greater than 2146435071
/// </exception>
public MessageReaderConfig(MemoryPool<byte> memoryPool = null, int minBufferSize = -1, int maxPooledBufferSize = -1)
{
if (minBufferSize is < -1 or 0)
const int maxArrayLength = 2146435071;
if (minBufferSize is < -1 or 0 or > maxArrayLength)
{
throw new ArgumentOutOfRangeException(nameof(minBufferSize));
throw new ArgumentOutOfRangeException(nameof(minBufferSize), minBufferSize,
"Minimum buffer size must be between 1 and 2146435071, leave as -1 to use default.");
}

MinBufferSize = minBufferSize == -1 ? Constants.DefaultReadBufferSize : MinBufferSize;
if (maxPooledBufferSize != -1 && (maxPooledBufferSize < MinBufferSize || maxPooledBufferSize > maxArrayLength))
{
throw new ArgumentOutOfRangeException(
nameof(maxPooledBufferSize),
maxPooledBufferSize,
$"Max pooled buffer size buffer size must be greater than minBufferSize({MinBufferSize}), leave as -1 to use default.");
}

DisablePipelinedMessageReader = false;
MinBufferSize = minBufferSize == -1 ? 65_535 + 4 : minBufferSize;
MemoryPool = memoryPool ?? new PipeReaderMemoryPool(MinBufferSize);
MemoryPool = memoryPool ?? new PipeReaderMemoryPool(MinBufferSize,
maxPooledBufferSize == -1 ? Constants.MaxReadBufferSize : maxPooledBufferSize);
StreamPipeReaderOptions = new(MemoryPool, MinBufferSize, leaveOpen: true);
}

internal MessageReaderConfig(Config config)
{
DisablePipelinedMessageReader = false;
MinBufferSize = config.DefaultReadBufferSize;
MemoryPool = new PipeReaderMemoryPool(config.DefaultReadBufferSize, config.MaxReadBufferSize);
StreamPipeReaderOptions = new(MemoryPool, config.DefaultReadBufferSize, leaveOpen: true);
}

/// <summary>
/// As of 5.15, the driver has migrated the underlying message reading mechanism utilizing <see cref="PipeReader"/>;
/// this optimizes the reading and memory usage of the driver, and setting this to true will revert the driver to
Expand Down
3 changes: 3 additions & 0 deletions Neo4j.Driver/Neo4j.Driver/Public/ConfigBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ internal ConfigBuilder(Config config)
/// <returns>A <see cref="Config"/> instance.</returns>
internal Config Build()
{
// Initialize the message reader config with internal constructor, it can read the default and max read buffer
// sizes. if users have configured a message reader config we will use that instead.
_config.MessageReaderConfig ??= new MessageReaderConfig(_config);
return _config;
}

Expand Down