Skip to content

Commit

Permalink
Use GCHandle.Alloc() to pin key and value bytes (#366)
Browse files Browse the repository at this point in the history
As suggested [here](microsoft/FASTER#885 (comment)), `GCHandle.Alloc` can be used to pin the key and value bytes during an asynchronous FASTER upsert. This avoids the need to copy the bytes into a pinned `Memory<byte>`.

The commit also seals the `FasterKeyValueStore` class, removes the `IDisposable` implementation and simplifies the implementation of `IAsyncDisposable.DisposeAsync`.
  • Loading branch information
wazzamatazz authored Nov 22, 2023
1 parent 101bfc1 commit 5fd07c0
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 127 deletions.
132 changes: 35 additions & 97 deletions src/DataCore.Adapter.KeyValueStore.FASTER/FasterKeyValueStore.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
using System;
using System.Buffers;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
Expand All @@ -18,7 +18,7 @@ namespace DataCore.Adapter.KeyValueStore.FASTER {
/// <summary>
/// Default <see cref="IKeyValueStore"/> implementation.
/// </summary>
public partial class FasterKeyValueStore : RawKeyValueStore<FasterKeyValueStoreOptions>, IDisposable, IAsyncDisposable {
public sealed partial class FasterKeyValueStore : RawKeyValueStore<FasterKeyValueStoreOptions>, IAsyncDisposable {

/// <summary>
/// Flags if the object has been disposed.
Expand Down Expand Up @@ -331,6 +331,7 @@ public async ValueTask<bool> TakeFullCheckpointAsync(CancellationToken cancellat
}



/// <summary>
/// Takes an incremental checkpoint of the FASTER log.
/// </summary>
Expand Down Expand Up @@ -500,43 +501,30 @@ private async ValueTask WriteCoreAsync(byte[] key, byte[] value) {

var session = GetPooledSession();

// Rent some memory that we will copy the key and value bytes into. We can then pin
// the memory for the duration of the upsert, as required when using SpanByte:
// https://github.com/microsoft/FASTER/pull/349

using (var memoryOwner = MemoryPool<byte>.Shared.Rent(key.Length + value.Length)) {
if (memoryOwner == null) {
throw new InvalidOperationException(Resources.Error_UnableToRentSharedMemory);
}

var memory = memoryOwner.Memory;
// We need to pin the key and value bytes for the duration of the upsert, as required
// when using SpanByte: https://github.com/microsoft/FASTER/pull/349

try {
for (var i = 0; i < key.Length; i++) {
memory.Span[i] = key[i];
}
var keyHandle = GCHandle.Alloc(key, GCHandleType.Pinned);
var valueHandle = GCHandle.Alloc(value, GCHandleType.Pinned);

for (var i = 0; i < value.Length; i++) {
memory.Span[key.Length + i] = value[i];
}
try {
var keySpanByte = new SpanByte(key.Length, keyHandle.AddrOfPinnedObject());
var valueSpanByte = new SpanByte(value.Length, valueHandle.AddrOfPinnedObject());

using (memory.Pin()) {
var keySpanByte = SpanByte.FromFixedSpan(memory.Slice(0, key.Length).Span);
var valueSpanByte = SpanByte.FromFixedSpan(memory.Slice(key.Length, value.Length).Span);
var result = await session.UpsertAsync(ref keySpanByte, ref valueSpanByte).ConfigureAwait(false);

var result = await session.UpsertAsync(ref keySpanByte, ref valueSpanByte).ConfigureAwait(false);
while (result.Status.IsPending) {
result = await result.CompleteAsync().ConfigureAwait(false);
}

while (result.Status.IsPending) {
result = await result.CompleteAsync().ConfigureAwait(false);
}
}
// Mark the cache as dirty.
Interlocked.Exchange(ref _fullCheckpointIsRequired, 1);
}
finally {
keyHandle.Free();
valueHandle.Free();

// Mark the cache as dirty.
Interlocked.Exchange(ref _fullCheckpointIsRequired, 1);
}
finally {
ReturnPooledSession(session);
}
ReturnPooledSession(session);
}
}

Expand Down Expand Up @@ -685,26 +673,23 @@ private async IAsyncEnumerable<FasterRecord> GetRecordsCoreAsync(KVKey? prefix,
}


/// <inheritdoc/>
public void Dispose() {
Dispose(true);
GC.SuppressFinalize(this);
}


/// <inheritdoc/>
public async ValueTask DisposeAsync() {
await DisposeAsyncCore().ConfigureAwait(false);
Dispose(false);
}

if (_disposed) {
return;
}

/// <summary>
/// Disposes of managed resources.
/// </summary>
private void DisposeCommon() {
_disposedTokenSource.Cancel();
_disposedTokenSource.Dispose();

// Record final checkpoint.
if (_canRecordCheckpoints) {
try {
await TakeFullCheckpointAsync(default).ConfigureAwait(false);
}
catch (Exception e) {
LogErrorWhileCreatingFullCheckpoint(Logger, e);
}
}

// Dispose of all sessions.
while (_sessionPool.TryDequeue(out var session)) {
Expand All @@ -716,60 +701,13 @@ private void DisposeCommon() {

// Dispose of underlying log device.
_logDevice.Dispose();
}


/// <summary>
/// Disposes of resources.
/// </summary>
/// <param name="disposing">
/// <see langword="true"/> if the object is being disposed, or <see langword="false"/>
/// if it is being finalized.
/// </param>
protected virtual void Dispose(bool disposing) {
if (_disposed) {
return;
}

if (disposing) {
if (_canRecordCheckpoints) {
using (var @lock = new ManualResetEventSlim()) {
_ = Task.Run(async () => {
try {
await TakeFullCheckpointAsync().ConfigureAwait(false);
}
catch (Exception e) {
LogErrorWhileCreatingFullCheckpoint(Logger, e);
}
finally {
@lock.Set();
}
});
@lock.Wait();
}

}
DisposeCommon();
}
_disposedTokenSource.Dispose();

_disposed = true;
}


/// <summary>
/// Asynchronously disposes of resources.
/// </summary>
/// <returns>
/// A <see cref="ValueTask"/> that will asynchronously dispose of resources.
/// </returns>
protected virtual async ValueTask DisposeAsyncCore() {
if (_canRecordCheckpoints) {
await TakeFullCheckpointAsync(default).ConfigureAwait(false);
}
DisposeCommon();
}


[LoggerMessage(100, LogLevel.Information, "Checkpoint management is disabled; backup and restore of data will not be performed.")]
static partial void LogCheckpointManagerIsDisabled(ILogger logger);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#nullable enable
DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore
DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.Dispose() -> void
DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.DisposeAsync() -> System.Threading.Tasks.ValueTask
DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.FasterKeyValueStore(DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStoreOptions! options, Microsoft.Extensions.Logging.ILogger<DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore!>? logger = null) -> void
DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.TakeFullCheckpointAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<bool>
Expand All @@ -26,8 +25,4 @@ DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStoreOptions.ReadOnly.get ->
DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStoreOptions.ReadOnly.set -> void
DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStoreOptions.SegmentSizeBits.get -> int
DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStoreOptions.SegmentSizeBits.set -> void
override DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.DeleteAsync(DataCore.Adapter.Services.KVKey key) -> System.Threading.Tasks.ValueTask<bool>
override DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.GetKeysAsync(DataCore.Adapter.Services.KVKey? prefix) -> System.Collections.Generic.IAsyncEnumerable<DataCore.Adapter.Services.KVKey>!
static DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.CreateLocalStorageCheckpointManager(string! path, bool removeOutdated = true) -> FASTER.core.ICheckpointManager!
virtual DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.Dispose(bool disposing) -> void
virtual DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.DisposeAsyncCore() -> System.Threading.Tasks.ValueTask
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,3 @@ DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStoreOptions.EnableRawWrites
DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStoreOptions.EnableRawWrites.set -> void
DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStoreOptions.IncrementalCheckpointInterval.get -> System.TimeSpan?
DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStoreOptions.IncrementalCheckpointInterval.set -> void
override DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.AllowRawWrites.get -> bool
override DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.ReadAsync<T>(DataCore.Adapter.Services.KVKey key) -> System.Threading.Tasks.ValueTask<T?>
override DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.ReadRawAsync(DataCore.Adapter.Services.KVKey key) -> System.Threading.Tasks.ValueTask<byte[]?>
override DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.WriteAsync<T>(DataCore.Adapter.Services.KVKey key, T value) -> System.Threading.Tasks.ValueTask
override DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.WriteRawAsync(DataCore.Adapter.Services.KVKey key, byte[]! value) -> System.Threading.Tasks.ValueTask
6 changes: 3 additions & 3 deletions test/DataCore.Adapter.Tests/FasterKeyValueStoreTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public async Task ShouldPersistStateBetweenRestarts() {
try {
var now = DateTime.UtcNow;

using (var store1 = new FasterKeyValueStore(new FasterKeyValueStoreOptions() {
await using (var store1 = new FasterKeyValueStore(new FasterKeyValueStoreOptions() {
CheckpointManagerFactory = () => FasterKeyValueStore.CreateLocalStorageCheckpointManager(tmpPath.FullName)
})) {

Expand All @@ -39,7 +39,7 @@ public async Task ShouldPersistStateBetweenRestarts() {
// checkpoint manager.
}

using (var store2 = new FasterKeyValueStore(new FasterKeyValueStoreOptions() {
await using (var store2 = new FasterKeyValueStore(new FasterKeyValueStoreOptions() {
CheckpointManagerFactory = () => FasterKeyValueStore.CreateLocalStorageCheckpointManager(tmpPath.FullName)
})) {
var readResult = await ((IKeyValueStore) store2).ReadAsync<DateTime>(TestContext.TestName);
Expand All @@ -57,7 +57,7 @@ public async Task ShouldNotCreateCheckpointUnlessDirty() {
var tmpPath = new DirectoryInfo(Path.Combine(Path.GetTempPath(), nameof(FasterKeyValueStoreTests), Guid.NewGuid().ToString()));

try {
using (var store = new FasterKeyValueStore(new FasterKeyValueStoreOptions() {
await using (var store = new FasterKeyValueStore(new FasterKeyValueStoreOptions() {
CheckpointManagerFactory = () => FasterKeyValueStore.CreateLocalStorageCheckpointManager(tmpPath.FullName)
})) {

Expand Down
12 changes: 6 additions & 6 deletions test/DataCore.Adapter.Tests/SnapshotTagValueManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public async Task ShouldReturnCachedValuesByTagName() {
var val1 = new TagValueBuilder().WithValue(99.999).Build();
var val2 = new TagValueBuilder().WithValue(Guid.NewGuid().ToString()).Build();

using (var store = new FasterKeyValueStore(new FasterKeyValueStoreOptions() {
await using (var store = new FasterKeyValueStore(new FasterKeyValueStoreOptions() {
CheckpointManagerFactory = () => FasterKeyValueStore.CreateLocalStorageCheckpointManager(tmpPath.FullName)
}))
using (var stvm = ActivatorUtilities.CreateInstance<SnapshotTagValueManager>(AssemblyInitializer.ApplicationServices, new SnapshotTagValueManagerOptions() {
Expand Down Expand Up @@ -68,7 +68,7 @@ public async Task ShouldReturnCachedValuesByTagId() {
var val1 = new TagValueBuilder().WithValue(99.999).Build();
var val2 = new TagValueBuilder().WithValue(Guid.NewGuid().ToString()).Build();

using (var store = new FasterKeyValueStore(new FasterKeyValueStoreOptions() {
await using (var store = new FasterKeyValueStore(new FasterKeyValueStoreOptions() {
CheckpointManagerFactory = () => FasterKeyValueStore.CreateLocalStorageCheckpointManager(tmpPath.FullName)
}))
using (var stvm = ActivatorUtilities.CreateInstance<SnapshotTagValueManager>(AssemblyInitializer.ApplicationServices, new SnapshotTagValueManagerOptions() {
Expand Down Expand Up @@ -114,7 +114,7 @@ public async Task ShouldReturnCachedValuesByTagNameAfterRestore() {
var val1 = new TagValueBuilder().WithValue(99.999).Build();
var val2 = new TagValueBuilder().WithValue(Guid.NewGuid().ToString()).Build();

using (var store = new FasterKeyValueStore(new FasterKeyValueStoreOptions() {
await using (var store = new FasterKeyValueStore(new FasterKeyValueStoreOptions() {
CheckpointManagerFactory = () => FasterKeyValueStore.CreateLocalStorageCheckpointManager(tmpPath.FullName)
}))
using (var stvm = ActivatorUtilities.CreateInstance<SnapshotTagValueManager>(AssemblyInitializer.ApplicationServices, new SnapshotTagValueManagerOptions() {
Expand All @@ -125,7 +125,7 @@ public async Task ShouldReturnCachedValuesByTagNameAfterRestore() {
}


using (var store = new FasterKeyValueStore(new FasterKeyValueStoreOptions() {
await using (var store = new FasterKeyValueStore(new FasterKeyValueStoreOptions() {
CheckpointManagerFactory = () => FasterKeyValueStore.CreateLocalStorageCheckpointManager(tmpPath.FullName)
}))
using (var stvm = ActivatorUtilities.CreateInstance<SnapshotTagValueManager>(AssemblyInitializer.ApplicationServices, new SnapshotTagValueManagerOptions() {
Expand Down Expand Up @@ -168,7 +168,7 @@ public async Task ShouldReturnCachedValuesByTagIdAfterRestore() {
var val1 = new TagValueBuilder().WithValue(99.999).Build();
var val2 = new TagValueBuilder().WithValue(Guid.NewGuid().ToString()).Build();

using (var store = new FasterKeyValueStore(new FasterKeyValueStoreOptions() {
await using (var store = new FasterKeyValueStore(new FasterKeyValueStoreOptions() {
CheckpointManagerFactory = () => FasterKeyValueStore.CreateLocalStorageCheckpointManager(tmpPath.FullName)
}))
using (var stvm = ActivatorUtilities.CreateInstance<SnapshotTagValueManager>(AssemblyInitializer.ApplicationServices, new SnapshotTagValueManagerOptions() {
Expand All @@ -179,7 +179,7 @@ public async Task ShouldReturnCachedValuesByTagIdAfterRestore() {
}


using (var store = new FasterKeyValueStore(new FasterKeyValueStoreOptions() {
await using (var store = new FasterKeyValueStore(new FasterKeyValueStoreOptions() {
CheckpointManagerFactory = () => FasterKeyValueStore.CreateLocalStorageCheckpointManager(tmpPath.FullName)
}))
using (var stvm = ActivatorUtilities.CreateInstance<SnapshotTagValueManager>(AssemblyInitializer.ApplicationServices, new SnapshotTagValueManagerOptions() {
Expand Down
Loading

0 comments on commit 5fd07c0

Please sign in to comment.