From 44d873b68eee2ce6618fce6ae28dfa17c3779710 Mon Sep 17 00:00:00 2001 From: Graham Watts Date: Thu, 23 Nov 2023 12:02:18 +0200 Subject: [PATCH] Check for existing key and add FASTER size tracking metrics (#367) * Create ScopedRawKeyValueStore.cs New wrapper class for a scoped `IRawKeyValueStore`. * Update IKeyValueStore.cs Add a new `ExistsAsync` method for detecting if a key exists without reading its value. * Implement ExistsAsync * Update ScopedKeyValueStore.cs Implement ExistsAsync and make class internal * Update KeyValueStoreExtensions.cs `BulkCopyToAsync`/`BulkCopyFromAsync` now include an `overwrite` parameter that controls if existing keys in the destination store should be overwritten. `CreateScopedStore` has been updated to return a `ScopedRawKeyValueStore` if the store to be wrapped implements `IRawKeyValueStore`. * Update API declarations * Implement ExistsAsync/add size tracking metrics Implements `ExistsAsync` in the FASTER key/value store. Adds functionality (based on the FASTER GitHub samples) for tracking the memory footprint of a FASTER key/value store. Adds metrics to `FasterKeyValueStore` for reporting memory usage. * Update unit tests Adds tests for `ExistsAsync` and disposes stores using `IAsyncDisposable.DisposeAsync` in preference to `IDisposable.Dispose` if defined on a store implementation. * Size tracking optimisations --- THIRD_PARTY_LICENSES.md | 29 ++ .../PublicAPI.Shipped.txt | 4 - .../PublicAPI.Unshipped.txt | 8 +- .../Services/IKeyValueStore.cs | 12 + .../Services/InMemoryKeyValueStore.cs | 7 + .../Services/KeyValueStore.cs | 21 ++ .../Services/KeyValueStoreExtensions.cs | 35 ++- .../Services/ScopedKeyValueStore.cs | 28 +- .../Services/ScopedRawKeyValueStore.cs | 44 +++ .../CacheSizeTracker.cs | 114 ++++++++ .../FasterKeyValueStore.cs | 270 ++++++++++-------- .../FasterKeyValueStoreOptions.cs | 11 + .../FasterRecord.cs | 57 ++++ .../LogSizeTracker.cs | 116 ++++++++ .../PublicAPI.Unshipped.txt | 16 +- .../ScanIteratorFunctions.cs | 75 +++++ .../SizeTrackingSpanByteFunctions.cs | 67 +++++ .../FileSystemKeyValueStore.cs | 16 ++ .../PublicAPI.Unshipped.txt | 1 + .../SqliteKeyValueStore.cs | 34 +++ .../FasterKeyValueStoreTests.cs | 6 +- .../KeyValueStoreTests.cs | 70 ++++- 22 files changed, 881 insertions(+), 160 deletions(-) create mode 100644 src/DataCore.Adapter.Abstractions/Services/ScopedRawKeyValueStore.cs create mode 100644 src/DataCore.Adapter.KeyValueStore.FASTER/CacheSizeTracker.cs create mode 100644 src/DataCore.Adapter.KeyValueStore.FASTER/FasterRecord.cs create mode 100644 src/DataCore.Adapter.KeyValueStore.FASTER/LogSizeTracker.cs create mode 100644 src/DataCore.Adapter.KeyValueStore.FASTER/ScanIteratorFunctions.cs create mode 100644 src/DataCore.Adapter.KeyValueStore.FASTER/SizeTrackingSpanByteFunctions.cs diff --git a/THIRD_PARTY_LICENSES.md b/THIRD_PARTY_LICENSES.md index 82f798c2..23be5a12 100644 --- a/THIRD_PARTY_LICENSES.md +++ b/THIRD_PARTY_LICENSES.md @@ -30,6 +30,35 @@ SOFTWARE. * * * +# Microsoft Corporation + +Some code in this repository is based on [Microsoft FASTER](https://github.com/microsoft/FASTER). + +MIT License + +Copyright (c) Microsoft Corporation. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + +* * * + + # Jering.KeyValueStore Some code in this repository is based on [Jering.KeyValueStore](https://github.com/JeringTech/KeyValueStore). diff --git a/src/DataCore.Adapter.Abstractions/PublicAPI.Shipped.txt b/src/DataCore.Adapter.Abstractions/PublicAPI.Shipped.txt index d4e11947..5b1d85b4 100644 --- a/src/DataCore.Adapter.Abstractions/PublicAPI.Shipped.txt +++ b/src/DataCore.Adapter.Abstractions/PublicAPI.Shipped.txt @@ -278,10 +278,6 @@ DataCore.Adapter.Services.KVKey.KVKey() -> void DataCore.Adapter.Services.KVKey.KVKey(byte[]? value) -> void DataCore.Adapter.Services.KVKey.Length.get -> int DataCore.Adapter.Services.KVKey.Value.get -> byte[]! -DataCore.Adapter.Services.ScopedKeyValueStore -DataCore.Adapter.Services.ScopedKeyValueStore.DeleteAsync(DataCore.Adapter.Services.KVKey key) -> System.Threading.Tasks.ValueTask -DataCore.Adapter.Services.ScopedKeyValueStore.GetKeysAsync(DataCore.Adapter.Services.KVKey? prefix) -> System.Collections.Generic.IAsyncEnumerable! -DataCore.Adapter.Services.ScopedKeyValueStore.ScopedKeyValueStore(DataCore.Adapter.Services.KVKey prefix, DataCore.Adapter.Services.IKeyValueStore! inner) -> void DataCore.Adapter.Tags.ITagConfiguration DataCore.Adapter.Tags.ITagConfiguration.CreateTagAsync(DataCore.Adapter.IAdapterCallContext! context, DataCore.Adapter.Tags.CreateTagRequest! request, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task! DataCore.Adapter.Tags.ITagConfiguration.DeleteTagAsync(DataCore.Adapter.IAdapterCallContext! context, DataCore.Adapter.Tags.DeleteTagRequest! request, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task! diff --git a/src/DataCore.Adapter.Abstractions/PublicAPI.Unshipped.txt b/src/DataCore.Adapter.Abstractions/PublicAPI.Unshipped.txt index 5ba20ad8..3e759d76 100644 --- a/src/DataCore.Adapter.Abstractions/PublicAPI.Unshipped.txt +++ b/src/DataCore.Adapter.Abstractions/PublicAPI.Unshipped.txt @@ -1,4 +1,5 @@ #nullable enable +abstract DataCore.Adapter.Services.KeyValueStore.ExistsAsync(DataCore.Adapter.Services.KVKey key) -> System.Threading.Tasks.ValueTask abstract DataCore.Adapter.Services.KeyValueStore.GetSerializer() -> DataCore.Adapter.Services.IKeyValueStoreSerializer! abstract DataCore.Adapter.Services.KeyValueStore.ReadAsync(DataCore.Adapter.Services.KVKey key) -> System.Threading.Tasks.ValueTask abstract DataCore.Adapter.Services.KeyValueStore.WriteAsync(DataCore.Adapter.Services.KVKey key, T value) -> System.Threading.Tasks.ValueTask @@ -52,6 +53,7 @@ DataCore.Adapter.Common.HostInfoBuilder.WithProperty(string! name, DataCore.Adap DataCore.Adapter.Common.HostInfoBuilder.WithVendor(DataCore.Adapter.Common.VendorInfo? vendor) -> DataCore.Adapter.Common.HostInfoBuilder! DataCore.Adapter.Common.HostInfoBuilder.WithVersion(string? version) -> DataCore.Adapter.Common.HostInfoBuilder! DataCore.Adapter.IAdapterCallContext.Services.get -> System.IServiceProvider! +DataCore.Adapter.Services.IKeyValueStore.ExistsAsync(DataCore.Adapter.Services.KVKey key) -> System.Threading.Tasks.ValueTask DataCore.Adapter.Services.IKeyValueStore.ReadAsync(DataCore.Adapter.Services.KVKey key) -> System.Threading.Tasks.ValueTask DataCore.Adapter.Services.IKeyValueStore.WriteAsync(DataCore.Adapter.Services.KVKey key, T value) -> System.Threading.Tasks.ValueTask DataCore.Adapter.Services.IKeyValueStoreSerializer @@ -94,15 +96,13 @@ DataCore.Adapter.Services.KeyValueStoreWriteBufferOptions.SizeLimit.get -> int DataCore.Adapter.Services.KeyValueStoreWriteBufferOptions.SizeLimit.set -> void DataCore.Adapter.Services.RawKeyValueStore DataCore.Adapter.Services.RawKeyValueStore.RawKeyValueStore(TOptions? options, Microsoft.Extensions.Logging.ILogger? logger = null) -> void -DataCore.Adapter.Services.ScopedKeyValueStore.ReadAsync(DataCore.Adapter.Services.KVKey key) -> System.Threading.Tasks.ValueTask -DataCore.Adapter.Services.ScopedKeyValueStore.WriteAsync(DataCore.Adapter.Services.KVKey key, T value) -> System.Threading.Tasks.ValueTask override DataCore.Adapter.Services.KVKey.ToString() -> string! override sealed DataCore.Adapter.Services.KeyValueStore.GetCompressionLevel() -> System.IO.Compression.CompressionLevel override sealed DataCore.Adapter.Services.KeyValueStore.GetSerializer() -> DataCore.Adapter.Services.IKeyValueStoreSerializer! static DataCore.Adapter.AdapterExtensions.CreateExtendedAdapterDescriptorBuilder(this DataCore.Adapter.IAdapter! adapter) -> DataCore.Adapter.Common.AdapterDescriptorBuilder! static DataCore.Adapter.Services.JsonKeyValueStoreSerializer.Default.get -> DataCore.Adapter.Services.IKeyValueStoreSerializer! -static DataCore.Adapter.Services.KeyValueStoreExtensions.BulkCopyFromAsync(this DataCore.Adapter.Services.IRawKeyValueStore! destination, DataCore.Adapter.Services.IRawKeyValueStore! source, DataCore.Adapter.Services.KVKey? keyPrefix = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! -static DataCore.Adapter.Services.KeyValueStoreExtensions.BulkCopyToAsync(this DataCore.Adapter.Services.IRawKeyValueStore! source, DataCore.Adapter.Services.IRawKeyValueStore! destination, DataCore.Adapter.Services.KVKey? keyPrefix = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +static DataCore.Adapter.Services.KeyValueStoreExtensions.BulkCopyFromAsync(this DataCore.Adapter.Services.IRawKeyValueStore! destination, DataCore.Adapter.Services.IRawKeyValueStore! source, DataCore.Adapter.Services.KVKey? keyPrefix = null, bool overwrite = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +static DataCore.Adapter.Services.KeyValueStoreExtensions.BulkCopyToAsync(this DataCore.Adapter.Services.IRawKeyValueStore! source, DataCore.Adapter.Services.IRawKeyValueStore! destination, DataCore.Adapter.Services.KVKey? keyPrefix = null, bool overwrite = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! static DataCore.Adapter.Services.KeyValueStoreExtensions.CopyFromAsync(this DataCore.Adapter.Services.IRawKeyValueStore! destination, DataCore.Adapter.Services.IRawKeyValueStore! source, System.Collections.Generic.IEnumerable! keys, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! static DataCore.Adapter.Services.KeyValueStoreExtensions.CopyToAsync(this DataCore.Adapter.Services.IRawKeyValueStore! source, DataCore.Adapter.Services.IRawKeyValueStore! destination, System.Collections.Generic.IEnumerable! keys, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! static DataCore.Adapter.Services.KeyValueStoreExtensions.GetKeysAsStringsAsync(this DataCore.Adapter.Services.IKeyValueStore! store) -> System.Collections.Generic.IAsyncEnumerable! diff --git a/src/DataCore.Adapter.Abstractions/Services/IKeyValueStore.cs b/src/DataCore.Adapter.Abstractions/Services/IKeyValueStore.cs index 8374bdec..00536a59 100644 --- a/src/DataCore.Adapter.Abstractions/Services/IKeyValueStore.cs +++ b/src/DataCore.Adapter.Abstractions/Services/IKeyValueStore.cs @@ -61,6 +61,18 @@ public interface IKeyValueStore { ValueTask ReadAsync(KVKey key); + /// + /// Tests if a key exists in the store. + /// + /// + /// The key to test. + /// + /// + /// if the key exists; otherwise, . + /// + ValueTask ExistsAsync(KVKey key); + + /// /// Deletes a value from the store. /// diff --git a/src/DataCore.Adapter.Abstractions/Services/InMemoryKeyValueStore.cs b/src/DataCore.Adapter.Abstractions/Services/InMemoryKeyValueStore.cs index 86a1cd99..f868e34b 100644 --- a/src/DataCore.Adapter.Abstractions/Services/InMemoryKeyValueStore.cs +++ b/src/DataCore.Adapter.Abstractions/Services/InMemoryKeyValueStore.cs @@ -51,6 +51,13 @@ protected override ValueTask WriteAsync(KVKey key, T value) { } + /// + protected override ValueTask ExistsAsync(KVKey key) { + var keyAsString = System.Text.Encoding.UTF8.GetString(key); + return new ValueTask(_values.ContainsKey(keyAsString)); + } + + /// protected override ValueTask DeleteAsync(KVKey key) { var keyAsString = System.Text.Encoding.UTF8.GetString(key); diff --git a/src/DataCore.Adapter.Abstractions/Services/KeyValueStore.cs b/src/DataCore.Adapter.Abstractions/Services/KeyValueStore.cs index 6194e8df..d63891e6 100644 --- a/src/DataCore.Adapter.Abstractions/Services/KeyValueStore.cs +++ b/src/DataCore.Adapter.Abstractions/Services/KeyValueStore.cs @@ -53,6 +53,15 @@ async ValueTask IKeyValueStore.WriteAsync(KVKey key, T value) { } + /// + async ValueTask IKeyValueStore.ExistsAsync(KVKey key) { + if (key.Length == 0) { + throw new ArgumentException(AbstractionsResources.Error_KeyValueStore_InvalidKey, nameof(key)); + } + return await ExistsAsync(key).ConfigureAwait(false); + } + + /// async ValueTask IKeyValueStore.DeleteAsync(KVKey key) { if (key.Length == 0) { @@ -104,6 +113,18 @@ IAsyncEnumerable IKeyValueStore.GetKeysAsync(KVKey? prefix) { protected abstract ValueTask ReadAsync(KVKey key); + /// + /// Tests if a key exists in the store. + /// + /// + /// The key to test. + /// + /// + /// if the key exists; otherwise, . + /// + protected abstract ValueTask ExistsAsync(KVKey key); + + /// /// Deletes a value from the store. /// diff --git a/src/DataCore.Adapter.Abstractions/Services/KeyValueStoreExtensions.cs b/src/DataCore.Adapter.Abstractions/Services/KeyValueStoreExtensions.cs index 7ab882c4..9a4c8b69 100644 --- a/src/DataCore.Adapter.Abstractions/Services/KeyValueStoreExtensions.cs +++ b/src/DataCore.Adapter.Abstractions/Services/KeyValueStoreExtensions.cs @@ -49,12 +49,8 @@ public static IKeyValueStore CreateScopedStore(this IKeyValueStore store, KVKey throw new ArgumentException(AbstractionsResources.Error_KeyValueStore_InvalidKey, nameof(prefix)); } - if (store is ScopedKeyValueStore scoped) { - // This store is already an instance of ScopedKeyValueStore. Instead of wrapping - // the scoped store and recursively applying key prefixes in every operation, we - // will wrap the inner store and concatenate the prefix for this store with the - // prefix passed to this method. - return new ScopedKeyValueStore(KeyValueStore.AddPrefix(scoped.Prefix, prefix), scoped.Inner); + if (store is IRawKeyValueStore raw) { + return new ScopedRawKeyValueStore(prefix, raw); } return new ScopedKeyValueStore(prefix, store); @@ -318,6 +314,9 @@ public static async ValueTask WriteJsonAsync(this IKeyValueStore store, /// /// The filter to apply to keys read from the source store. /// + /// + /// Specifies if existing keys in the destination store should be overwritten. + /// /// /// The cancellation token for the operation. /// @@ -330,7 +329,7 @@ public static async ValueTask WriteJsonAsync(this IKeyValueStore store, /// /// is . /// - public static async Task BulkCopyToAsync(this IRawKeyValueStore source, IRawKeyValueStore destination, KVKey? keyPrefix = null, CancellationToken cancellationToken = default) { + public static async Task BulkCopyToAsync(this IRawKeyValueStore source, IRawKeyValueStore destination, KVKey? keyPrefix = null, bool overwrite = false, CancellationToken cancellationToken = default) { if (source == null) { throw new ArgumentNullException(nameof(source)); } @@ -348,6 +347,13 @@ public static async Task BulkCopyToAsync(this IRawKeyValueStore source, IRa continue; } + if (!overwrite) { + var exists = await destination.ExistsAsync(key).ConfigureAwait(false); + if (exists) { + continue; + } + } + await destination.WriteRawAsync(key, value).ConfigureAwait(false); count++; } @@ -368,6 +374,9 @@ public static async Task BulkCopyToAsync(this IRawKeyValueStore source, IRa /// /// The filter to apply to keys read from the source store. /// + /// + /// Specifies if existing keys in the destination store should be overwritten. + /// /// /// The cancellation token for the operation. /// @@ -380,8 +389,8 @@ public static async Task BulkCopyToAsync(this IRawKeyValueStore source, IRa /// /// is . /// - public static async Task BulkCopyFromAsync(this IRawKeyValueStore destination, IRawKeyValueStore source, KVKey? keyPrefix = null, CancellationToken cancellationToken = default) { - return await source.BulkCopyToAsync(destination, keyPrefix, cancellationToken).ConfigureAwait(false); + public static async Task BulkCopyFromAsync(this IRawKeyValueStore destination, IRawKeyValueStore source, KVKey? keyPrefix = null, bool overwrite = false, CancellationToken cancellationToken = default) { + return await source.BulkCopyToAsync(destination, keyPrefix, overwrite, cancellationToken).ConfigureAwait(false); } @@ -412,6 +421,10 @@ public static async Task BulkCopyFromAsync(this IRawKeyValueStore destinati /// /// is . /// + /// + /// Unlike , will always + /// overwrite keys in the destination store. + /// public static async Task CopyToAsync(this IRawKeyValueStore source, IRawKeyValueStore destination, IEnumerable keys, CancellationToken cancellationToken = default) { if (source == null) { throw new ArgumentNullException(nameof(source)); @@ -469,6 +482,10 @@ public static async Task CopyToAsync(this IRawKeyValueStore source, IRawKey /// /// is . /// + /// + /// Unlike , will always + /// overwrite keys in the destination store. + /// public static async Task CopyFromAsync(this IRawKeyValueStore destination, IRawKeyValueStore source, IEnumerable keys, CancellationToken cancellationToken = default) { return await source.CopyToAsync(destination, keys, cancellationToken).ConfigureAwait(false); } diff --git a/src/DataCore.Adapter.Abstractions/Services/ScopedKeyValueStore.cs b/src/DataCore.Adapter.Abstractions/Services/ScopedKeyValueStore.cs index ed9a3181..33ed78de 100644 --- a/src/DataCore.Adapter.Abstractions/Services/ScopedKeyValueStore.cs +++ b/src/DataCore.Adapter.Abstractions/Services/ScopedKeyValueStore.cs @@ -8,12 +8,7 @@ namespace DataCore.Adapter.Services { /// that wraps an existing and /// automatically modifies keys passed in or out of the store using a scoped prefix. /// - /// - /// You can simplify creating instances of this class by using the - /// - /// extension method. - /// - public class ScopedKeyValueStore : IKeyValueStore { + internal class ScopedKeyValueStore : IKeyValueStore { /// /// The prefix to apply to keys read from or written to the inner store. @@ -42,8 +37,18 @@ public ScopedKeyValueStore(KVKey prefix, IKeyValueStore inner) { if (prefix.Value.Length == 0) { throw new ArgumentException(AbstractionsResources.Error_KeyValueStore_InvalidKey, nameof(prefix)); } - Prefix = prefix; - Inner = inner ?? throw new ArgumentNullException(nameof(inner)); + if (inner == null) { + throw new ArgumentNullException(nameof(inner)); + } + + if (inner is ScopedKeyValueStore scoped) { + Prefix = KeyValueStore.AddPrefix(scoped.Prefix, prefix); + Inner = scoped.Inner; + } + else { + Prefix = prefix; + Inner = inner; + } } @@ -61,6 +66,13 @@ public ValueTask WriteAsync(KVKey key, T value) { } + /// + public ValueTask ExistsAsync(KVKey key) { + var k = KeyValueStore.AddPrefix(Prefix, key); + return Inner.ExistsAsync(k); + } + + /// public ValueTask DeleteAsync(KVKey key) { var k = KeyValueStore.AddPrefix(Prefix, key); diff --git a/src/DataCore.Adapter.Abstractions/Services/ScopedRawKeyValueStore.cs b/src/DataCore.Adapter.Abstractions/Services/ScopedRawKeyValueStore.cs new file mode 100644 index 00000000..5a679a77 --- /dev/null +++ b/src/DataCore.Adapter.Abstractions/Services/ScopedRawKeyValueStore.cs @@ -0,0 +1,44 @@ +using System.Threading.Tasks; + +namespace DataCore.Adapter.Services { + + /// + /// that wraps an existing and + /// automatically modifies keys passed in or out of the store using a scoped prefix. + /// + internal class ScopedRawKeyValueStore : ScopedKeyValueStore, IRawKeyValueStore { + + /// + /// The inner . + /// + internal new IRawKeyValueStore Inner => (IRawKeyValueStore) base.Inner; + + + /// + /// Creates a new object. + /// + /// + /// The key prefix for the store. + /// + /// + /// The inner to wrap. + /// + public ScopedRawKeyValueStore(KVKey prefix, IRawKeyValueStore inner) + : base(prefix, inner) { } + + + /// + public ValueTask ReadRawAsync(KVKey key) { + var k = KeyValueStore.AddPrefix(Prefix, key); + return Inner.ReadRawAsync(k); + } + + + /// + public ValueTask WriteRawAsync(KVKey key, byte[] value) { + var k = KeyValueStore.AddPrefix(Prefix, key); + return Inner.WriteRawAsync(k, value); + } + + } +} diff --git a/src/DataCore.Adapter.KeyValueStore.FASTER/CacheSizeTracker.cs b/src/DataCore.Adapter.KeyValueStore.FASTER/CacheSizeTracker.cs new file mode 100644 index 00000000..d03e0615 --- /dev/null +++ b/src/DataCore.Adapter.KeyValueStore.FASTER/CacheSizeTracker.cs @@ -0,0 +1,114 @@ +using System; + +using FASTER.core; + +namespace DataCore.Adapter.KeyValueStore.FASTER { + + /// + /// Tracks memory usage in a . + /// + internal class CacheSizeTracker : IDisposable { + + /// + /// Flags if the object has been disposed. + /// + private bool _disposed; + + /// + /// The underlying FASTER store. + /// + private readonly FasterKV _store; + + /// + /// The size tracker for the FASTER log. + /// + private readonly LogSizeTracker _logSizeTracker; + + /// + /// The size tracker for the FASTER read cache. + /// + private readonly LogSizeTracker? _readCacheSizeTracker; + + + /// + /// Creates a new instance. + /// + /// + /// The underlying FASTER store. + /// + public CacheSizeTracker(FasterKV store) { + _store = store; + + _logSizeTracker = new LogSizeTracker(_store.Log); + if (_store.ReadCache != null) { + _readCacheSizeTracker = new LogSizeTracker(_store.ReadCache); + } + } + + + /// + /// Gets the total in-memory size of the FASTER store. + /// + /// + /// The total in-memory size of the FASTER store, in bytes. + /// + internal long GetTotalSize() => GetIndexSize() + GetLogSize() + GetReadCacheSize(); + + /// + /// Gets the size of the FASTER index. + /// + /// + /// The size of the FASTER index, in bytes. + /// + internal long GetIndexSize() => (_store.IndexSize * 64) + (_store.OverflowBucketCount * 64); + + /// + /// Gets the size of the in-memory portion of the FASTER log. + /// + /// + /// The size of the in-memory portion of the FASTER log, in bytes. + /// + internal long GetLogSize() => _logSizeTracker.TotalMemorySize; + + /// + /// Gets the size of the FASTER read cache. + /// + /// + /// The size of the FASTER read cache, in bytes. + /// + internal long GetReadCacheSize() => _readCacheSizeTracker?.TotalMemorySize ?? 0; + + + /// + /// Updates the heap size of the FASTER store. + /// + /// + /// The change in heap size. + /// + /// + /// if the change is for the read cache; otherwise, . + /// + internal void UpdateHeapSize(int delta, bool isReadCache = false) { + if (isReadCache) { + _readCacheSizeTracker?.UpdateHeapSize(delta); + } + else { + _logSizeTracker.UpdateHeapSize(delta); + } + } + + + /// + public void Dispose() { + if (_disposed) { + return; + } + + _logSizeTracker.Dispose(); + _readCacheSizeTracker?.Dispose(); + + _disposed = true; + } + } + +} diff --git a/src/DataCore.Adapter.KeyValueStore.FASTER/FasterKeyValueStore.cs b/src/DataCore.Adapter.KeyValueStore.FASTER/FasterKeyValueStore.cs index 371063ae..b927a305 100644 --- a/src/DataCore.Adapter.KeyValueStore.FASTER/FasterKeyValueStore.cs +++ b/src/DataCore.Adapter.KeyValueStore.FASTER/FasterKeyValueStore.cs @@ -1,12 +1,14 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics.Metrics; using System.IO; +using System.Linq; using System.Runtime.InteropServices; using System.Threading; -using System.Threading.Channels; using System.Threading.Tasks; +using DataCore.Adapter.Diagnostics; using DataCore.Adapter.Services; using FASTER.core; @@ -16,10 +18,90 @@ namespace DataCore.Adapter.KeyValueStore.FASTER { /// - /// Default implementation. + /// implementation that uses Microsoft FASTER as its backing store. /// public sealed partial class FasterKeyValueStore : RawKeyValueStore, IAsyncDisposable { + /// + /// Active instances of . These are tracked to allow + /// metrics to be reported for each instance. + /// + private static readonly List s_instances = new List(); + + /// + /// Lock for accessing . + /// + private static readonly Nito.AsyncEx.AsyncReaderWriterLock s_instancesLock = new Nito.AsyncEx.AsyncReaderWriterLock(); + + /// + /// Creates a metric tag that identifies the instance + /// that a measurement was observed on. + /// + /// + /// The instance. + /// + /// + /// A new that can be added to the metric + /// measurement. + /// + private static KeyValuePair CreateInstanceIdTag(FasterKeyValueStore instance) => new KeyValuePair("data_core.instance_id", instance._instanceName); + + /// + /// Instrument for observing the total in-memory footprint for + /// instances. + /// + private static readonly ObservableGauge s_totalSizeGauge = Telemetry.Meter.CreateObservableGauge( + "KeyValueStore.FASTER.Size.Total", + () => { + using (s_instancesLock.ReaderLock()) { + return s_instances.Select(x => new Measurement(x._sizeTracker.GetTotalSize(), CreateInstanceIdTag(x))).ToArray(); + } + }, + "By", + "Total size of the FASTER index, in-memory log and read cache."); + + /// + /// Instrument for observing the size of the in-memory index for + /// instances. + /// + private static readonly ObservableGauge s_indexSizeGauge = Telemetry.Meter.CreateObservableGauge( + "KeyValueStore.FASTER.Size.Index", + () => { + using (s_instancesLock.ReaderLock()) { + return s_instances.Select(x => new Measurement(x._sizeTracker.GetIndexSize(), CreateInstanceIdTag(x))).ToArray(); + } + }, + "By", + "Size of the FASTER index."); + + /// + /// Instrument for observing the size of the in-memory log for + /// instances. + /// + private static readonly ObservableGauge s_logSize = Telemetry.Meter.CreateObservableGauge( + "KeyValueStore.FASTER.Size.Log", + () => { + using (s_instancesLock.ReaderLock()) { + return s_instances.Select(x => new Measurement(x._sizeTracker.GetLogSize(), CreateInstanceIdTag(x))).ToArray(); + } + }, + "By", + "Size of the in-memory portion of the FASTER log."); + + /// + /// Instrument for observing the size of the reach cache for + /// instances. + /// + private static readonly ObservableGauge s_readCacheSize = Telemetry.Meter.CreateObservableGauge( + "KeyValueStore.FASTER.Size.ReadCache", + () => { + using (s_instancesLock.ReaderLock()) { + return s_instances.Select(x => new Measurement(x._sizeTracker.GetReadCacheSize(), CreateInstanceIdTag(x))).ToArray(); + } + }, + "By", + "Size of the FASTER read cache."); + /// /// Flags if the object has been disposed. /// @@ -31,11 +113,21 @@ public sealed partial class FasterKeyValueStore : RawKeyValueStore private readonly CancellationTokenSource _disposedTokenSource = new CancellationTokenSource(); + /// + /// The name of the store instance. This name is used in metrics. + /// + private readonly string _instanceName; + /// /// The underlying FASTER store. /// private readonly FasterKV _fasterKVStore; + /// + /// The size tracker for the im-memory portion and read cache for the FASTER store. + /// + private readonly CacheSizeTracker _sizeTracker; + /// /// The FASTER device used to store the on-disk portion of the log. /// @@ -116,6 +208,10 @@ public FasterKeyValueStore(FasterKeyValueStoreOptions options, ILogger()); + _clientSessionBuilder = _fasterKVStore.For(new SizeTrackingSpanByteFunctions(_sizeTracker)); _readOnly = options.ReadOnly; if (checkpointManager != null) { @@ -164,6 +261,10 @@ public FasterKeyValueStore(FasterKeyValueStoreOptions options, ILogger /// The raw bytes, or if the key does not exist. /// - private async ValueTask ReadCoreAsync(KVKey key) { + private async ValueTask ReadCoreAsync(byte[] key) { ThrowIfDisposed(); Status status; SpanByteAndMemory spanByteAndMemory; var session = GetPooledSession(); + var keyHandle = GCHandle.Alloc(key, GCHandleType.Pinned); + try { - var keySpanByte = SpanByte.FromFixedSpan((byte[]) key); + var keySpanByte = new SpanByte(key.Length, keyHandle.AddrOfPinnedObject()); SpanByte valueSpanByte = default; var result = await session.ReadAsync(ref keySpanByte, ref valueSpanByte).ConfigureAwait(false); @@ -571,6 +674,7 @@ private async ValueTask WriteCoreAsync(byte[] key, byte[] value) { (status, spanByteAndMemory) = result.Complete(); } finally { + keyHandle.Free(); ReturnPooledSession(session); } @@ -584,6 +688,46 @@ private async ValueTask WriteCoreAsync(byte[] key, byte[] value) { } + /// + protected override async ValueTask ExistsAsync(KVKey key) { + return await ExistsCoreAsync(key).ConfigureAwait(false); + } + + + /// + /// Tests if a key exists in the store. + /// + /// + /// The key to test. + /// + /// + /// if the key exists; otherwise, . + /// + private async ValueTask ExistsCoreAsync(byte[] key) { + ThrowIfDisposed(); + + Status status; + + var session = GetPooledSession(); + var keyHandle = GCHandle.Alloc(key, GCHandleType.Pinned); + + try { + var keySpanByte = new SpanByte(key.Length, keyHandle.AddrOfPinnedObject()); + SpanByte valueSpanByte = default; + + var result = await session.ReadAsync(ref keySpanByte, ref valueSpanByte).ConfigureAwait(false); + + (status, _) = result.Complete(); + } + finally { + keyHandle.Free(); + ReturnPooledSession(session); + } + + return status.Found; + } + + /// protected override async ValueTask DeleteAsync(KVKey key) { ThrowIfDisposed(); @@ -679,6 +823,10 @@ public async ValueTask DisposeAsync() { return; } + using (await s_instancesLock.WriterLockAsync().ConfigureAwait(false)) { + s_instances.Remove(this); + } + _disposedTokenSource.Cancel(); // Record final checkpoint. @@ -732,117 +880,5 @@ public async ValueTask DisposeAsync() { [LoggerMessage(107, LogLevel.Error, "Error while performing FASTER log compaction.")] static partial void LogCompactionError(ILogger logger, Exception e); - - /// - /// implementation that allows us to use - /// FASTER's "push" key iteration: https://microsoft.github.io/FASTER/docs/fasterkv-basics/#key-iteration - /// - private struct ScanIteratorFunctions : IScanIteratorFunctions { - - /// - /// The channel to publish keys to as they are iterated over. - /// - private readonly Channel _channel; - - /// - /// Specifies if the iterator should include record values. - /// - private readonly bool _includeValues; - - /// - /// The channel reader. - /// - public ChannelReader Reader => _channel?.Reader!; - - internal ScanIteratorFunctions(bool includeValues) { - _includeValues = includeValues; - _channel = Channel.CreateUnbounded(new UnboundedChannelOptions() { - AllowSynchronousContinuations = false, - SingleReader = true, - SingleWriter = true - }); - } - - - /// - public bool OnStart(long beginAddress, long endAddress) => true; - - - /// - public bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords) { - return _channel.Writer.TryWrite(new FasterRecord(key.ToByteArray(), recordMetadata, false, _includeValues ? new ReadOnlyMemory(value.ToByteArray()) : default)); - } - - - /// - public bool ConcurrentReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords) { - return _channel.Writer.TryWrite(new FasterRecord(key.ToByteArray(), recordMetadata, true, _includeValues ? new ReadOnlyMemory(value.ToByteArray()) : default)); - } - - - /// - public void OnStop(bool completed, long numberOfRecords) { - _channel.Writer.TryComplete(); - } - - - /// - public void OnException(Exception exception, long numberOfRecords) { - _channel.Writer.TryComplete(exception); - } - - } - - - /// - /// A record in the FASTER store. - /// - public readonly struct FasterRecord { - - /// - /// The key. - /// - public KVKey Key { get; } - - /// - /// The metadata for the record. - /// - public RecordMetadata Metadata { get; } - - /// - /// Specifies if the record is located in the mutable portion of the FASTER log. - /// - public bool Mutable { get; } - - /// - /// The value for the record. - /// - public ReadOnlyMemory Value { get; } - - - /// - /// Creates a new instance - /// - /// - /// The key. - /// - /// - /// The metadata for the record. - /// - /// - /// Specifies if the record is located in the mutable portion of the FASTER log. - /// - /// - /// The record value. - /// - internal FasterRecord(KVKey key, RecordMetadata metadata, bool mutable, ReadOnlyMemory value) { - Key = key; - Metadata = metadata; - Mutable = mutable; - Value = value; - } - - } - } } diff --git a/src/DataCore.Adapter.KeyValueStore.FASTER/FasterKeyValueStoreOptions.cs b/src/DataCore.Adapter.KeyValueStore.FASTER/FasterKeyValueStoreOptions.cs index c257b461..ebdefc65 100644 --- a/src/DataCore.Adapter.KeyValueStore.FASTER/FasterKeyValueStoreOptions.cs +++ b/src/DataCore.Adapter.KeyValueStore.FASTER/FasterKeyValueStoreOptions.cs @@ -1,4 +1,5 @@ using System; +using System.ComponentModel.DataAnnotations; using FASTER.core; @@ -9,6 +10,16 @@ namespace DataCore.Adapter.KeyValueStore.FASTER { /// public class FasterKeyValueStoreOptions : Services.KeyValueStoreOptions { + /// + /// The name for the instance. + /// + /// + /// The name is used to identify the store in telemetry data. If not specified, a + /// default name will be used. + /// + [MaxLength(50)] + public string? Name { get; set; } + /// /// Specifies if the is read-only. /// diff --git a/src/DataCore.Adapter.KeyValueStore.FASTER/FasterRecord.cs b/src/DataCore.Adapter.KeyValueStore.FASTER/FasterRecord.cs new file mode 100644 index 00000000..6a805e1c --- /dev/null +++ b/src/DataCore.Adapter.KeyValueStore.FASTER/FasterRecord.cs @@ -0,0 +1,57 @@ +using System; + +using DataCore.Adapter.Services; + +using FASTER.core; + +namespace DataCore.Adapter.KeyValueStore.FASTER { + /// + /// A record in the FASTER store. + /// + public readonly struct FasterRecord { + + /// + /// The key. + /// + public KVKey Key { get; } + + /// + /// The metadata for the record. + /// + public RecordMetadata Metadata { get; } + + /// + /// Specifies if the record is located in the mutable portion of the FASTER log. + /// + public bool Mutable { get; } + + /// + /// The value for the record. + /// + public ReadOnlyMemory Value { get; } + + + /// + /// Creates a new instance + /// + /// + /// The key. + /// + /// + /// The metadata for the record. + /// + /// + /// Specifies if the record is located in the mutable portion of the FASTER log. + /// + /// + /// The record value. + /// + internal FasterRecord(KVKey key, RecordMetadata metadata, bool mutable, ReadOnlyMemory value) { + Key = key; + Metadata = metadata; + Mutable = mutable; + Value = value; + } + + } +} diff --git a/src/DataCore.Adapter.KeyValueStore.FASTER/LogSizeTracker.cs b/src/DataCore.Adapter.KeyValueStore.FASTER/LogSizeTracker.cs new file mode 100644 index 00000000..09068c40 --- /dev/null +++ b/src/DataCore.Adapter.KeyValueStore.FASTER/LogSizeTracker.cs @@ -0,0 +1,116 @@ +using System; +using System.Threading; + +using FASTER.core; + +namespace DataCore.Adapter.KeyValueStore.FASTER { + + /// + /// Tracks memory usage in a FASTER . + /// + internal class LogSizeTracker : IObserver>, IDisposable { + + /// + /// Flags if the object has been disposed. + /// + private bool _disposed; + + /// + /// The to track. + /// + private readonly LogAccessor _log; + + /// + /// The subscription that receives eviction notifications from the . + /// + private readonly IDisposable _subscription; + + /// + /// The size of the heap allocated by the . + /// + private int _heapSize; + + /// + /// The total size of the and heap. + /// + public long TotalMemorySize => _log.MemorySizeBytes + _heapSize; + + + /// + /// Creates a new instance. + /// + /// + /// The to track. + /// + public LogSizeTracker(LogAccessor log) { + _log = log; + _subscription = _log.SubscribeEvictions(this); + } + + + /// + public void OnCompleted() { + // No-op + } + + + /// + public void OnError(Exception error) { + // No-op + } + + + /// + public void OnNext(IFasterScanIterator iterator) { + // We are only subscribed to be notified when items are evicted from the log, so we + // will always decrement the tracked log size. + + var size = 0; + + while (iterator.GetNext(out var recordInfo, out var key, out var value)) { + size += key.TotalSize; + + // If the record has not been deleted (i.e. it has been replaced by an upsert + // operation), we need to account for the size of the value that was replaced in + // addition to the size of the key. + // + // If the record is being deleted then the size of the evicted value is already + // reported by SizeTrackingSpanByteFunctions.ConcurrentDeleter so we only need to + // deduct the size of the evicted key here. + + if (!recordInfo.Tombstone) { + size += value.TotalSize; + } + } + + UpdateHeapSize(-size); + } + + + /// + /// Updates the known heap size of the . + /// + /// + /// The change in heap size, in bytes. + /// + internal void UpdateHeapSize(int delta) { + if (delta == 0) { + return; + } + Interlocked.Add(ref _heapSize, delta); + } + + + /// + public void Dispose() { + if (_disposed) { + return; + } + + _subscription.Dispose(); + _disposed = true; + } + + } + +} diff --git a/src/DataCore.Adapter.KeyValueStore.FASTER/PublicAPI.Unshipped.txt b/src/DataCore.Adapter.KeyValueStore.FASTER/PublicAPI.Unshipped.txt index ab647d82..5dd5f8d1 100644 --- a/src/DataCore.Adapter.KeyValueStore.FASTER/PublicAPI.Unshipped.txt +++ b/src/DataCore.Adapter.KeyValueStore.FASTER/PublicAPI.Unshipped.txt @@ -1,13 +1,15 @@ #nullable enable -DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.FasterRecord -DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.FasterRecord.FasterRecord() -> void -DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.FasterRecord.Key.get -> DataCore.Adapter.Services.KVKey -DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.FasterRecord.Metadata.get -> FASTER.core.RecordMetadata -DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.FasterRecord.Mutable.get -> bool -DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.FasterRecord.Value.get -> System.ReadOnlyMemory -DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.GetRecordsAsync(DataCore.Adapter.Services.KVKey? prefix = null) -> System.Collections.Generic.IAsyncEnumerable! +DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.GetRecordsAsync(DataCore.Adapter.Services.KVKey? prefix = null) -> System.Collections.Generic.IAsyncEnumerable! DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStore.TakeIncrementalCheckpointAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStoreOptions.EnableRawWrites.get -> bool DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStoreOptions.EnableRawWrites.set -> void DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStoreOptions.IncrementalCheckpointInterval.get -> System.TimeSpan? DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStoreOptions.IncrementalCheckpointInterval.set -> void +DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStoreOptions.Name.get -> string? +DataCore.Adapter.KeyValueStore.FASTER.FasterKeyValueStoreOptions.Name.set -> void +DataCore.Adapter.KeyValueStore.FASTER.FasterRecord +DataCore.Adapter.KeyValueStore.FASTER.FasterRecord.FasterRecord() -> void +DataCore.Adapter.KeyValueStore.FASTER.FasterRecord.Key.get -> DataCore.Adapter.Services.KVKey +DataCore.Adapter.KeyValueStore.FASTER.FasterRecord.Metadata.get -> FASTER.core.RecordMetadata +DataCore.Adapter.KeyValueStore.FASTER.FasterRecord.Mutable.get -> bool +DataCore.Adapter.KeyValueStore.FASTER.FasterRecord.Value.get -> System.ReadOnlyMemory diff --git a/src/DataCore.Adapter.KeyValueStore.FASTER/ScanIteratorFunctions.cs b/src/DataCore.Adapter.KeyValueStore.FASTER/ScanIteratorFunctions.cs new file mode 100644 index 00000000..bcc4dce7 --- /dev/null +++ b/src/DataCore.Adapter.KeyValueStore.FASTER/ScanIteratorFunctions.cs @@ -0,0 +1,75 @@ +using System; +using System.Threading.Channels; + +using FASTER.core; + +namespace DataCore.Adapter.KeyValueStore.FASTER { + + /// + /// implementation that allows us to use + /// FASTER's "push" key iteration: https://microsoft.github.io/FASTER/docs/fasterkv-basics/#key-iteration + /// + internal struct ScanIteratorFunctions : IScanIteratorFunctions { + + /// + /// The channel to publish keys to as they are iterated over. + /// + private readonly Channel _channel; + + /// + /// Specifies if the iterator should include record values. + /// + private readonly bool _includeValues; + + /// + /// The channel reader. + /// + public ChannelReader Reader => _channel?.Reader!; + + + /// + /// Creates a new instance. + /// + /// + /// Specifies if the iterator should include record values. + /// + internal ScanIteratorFunctions(bool includeValues) { + _includeValues = includeValues; + _channel = Channel.CreateUnbounded(new UnboundedChannelOptions() { + AllowSynchronousContinuations = false, + SingleReader = true, + SingleWriter = true + }); + } + + + /// + public bool OnStart(long beginAddress, long endAddress) => true; + + + /// + public bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords) { + return _channel.Writer.TryWrite(new FasterRecord(key.ToByteArray(), recordMetadata, false, _includeValues ? new ReadOnlyMemory(value.ToByteArray()) : default)); + } + + + /// + public bool ConcurrentReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords) { + return _channel.Writer.TryWrite(new FasterRecord(key.ToByteArray(), recordMetadata, true, _includeValues ? new ReadOnlyMemory(value.ToByteArray()) : default)); + } + + + /// + public void OnStop(bool completed, long numberOfRecords) { + _channel.Writer.TryComplete(); + } + + + /// + public void OnException(Exception exception, long numberOfRecords) { + _channel.Writer.TryComplete(exception); + } + + } + +} diff --git a/src/DataCore.Adapter.KeyValueStore.FASTER/SizeTrackingSpanByteFunctions.cs b/src/DataCore.Adapter.KeyValueStore.FASTER/SizeTrackingSpanByteFunctions.cs new file mode 100644 index 00000000..9b3476d7 --- /dev/null +++ b/src/DataCore.Adapter.KeyValueStore.FASTER/SizeTrackingSpanByteFunctions.cs @@ -0,0 +1,67 @@ +using FASTER.core; + +namespace DataCore.Adapter.KeyValueStore.FASTER { + + /// + /// Extends to notify a + /// instance about upserts, deletes and copies to the FASTER read cache. + /// + internal sealed class SizeTrackingSpanByteFunctions : SpanByteFunctions { + + /// + /// The that tracks memory usage. + /// + private readonly CacheSizeTracker _sizeTracker; + + + /// + /// Creates a new instance. + /// + /// + /// The that tracks memory usage. + /// + public SizeTrackingSpanByteFunctions(CacheSizeTracker sizeTracker) { + _sizeTracker = sizeTracker; + } + + + /// + public override bool ConcurrentWriter(ref SpanByte key, ref SpanByte input, ref SpanByte src, ref SpanByte dst, ref SpanByteAndMemory output, ref UpsertInfo upsertInfo) { + var delta = src.TotalSize - dst.TotalSize; + if (base.ConcurrentWriter(ref key, ref input, ref src, ref dst, ref output, ref upsertInfo)) { + _sizeTracker.UpdateHeapSize(delta); + return true; + } + + return false; + } + + + /// + public override void PostSingleWriter(ref SpanByte key, ref SpanByte input, ref SpanByte src, ref SpanByte dst, ref SpanByteAndMemory output, ref UpsertInfo upsertInfo, WriteReason reason) { + var delta = key.TotalSize + src.TotalSize; + base.PostSingleWriter(ref key, ref input, ref src, ref dst, ref output, ref upsertInfo, reason); + _sizeTracker.UpdateHeapSize(delta, reason == WriteReason.CopyToReadCache); + } + + + /// + public override bool ConcurrentDeleter(ref SpanByte key, ref SpanByte value, ref DeleteInfo deleteInfo) { + var delta = value.TotalSize; + if (base.ConcurrentDeleter(ref key, ref value, ref deleteInfo)) { + _sizeTracker.UpdateHeapSize(-delta); + if (deleteInfo.RecordInfo.Invalid) { + // Record was marked as invalid. FASTER example code indicates that this means + // that the record was not inserted, so deduct the size of the key from the + // heap as well. + _sizeTracker.UpdateHeapSize(-key.TotalSize); + } + return true; + } + + return false; + } + + } + +} diff --git a/src/DataCore.Adapter.KeyValueStore.FileSystem/FileSystemKeyValueStore.cs b/src/DataCore.Adapter.KeyValueStore.FileSystem/FileSystemKeyValueStore.cs index ebdb0000..ca7b8e8f 100644 --- a/src/DataCore.Adapter.KeyValueStore.FileSystem/FileSystemKeyValueStore.cs +++ b/src/DataCore.Adapter.KeyValueStore.FileSystem/FileSystemKeyValueStore.cs @@ -451,6 +451,22 @@ protected override async ValueTask WriteAsync(KVKey key, T value) { } + /// + protected override async ValueTask ExistsAsync(KVKey key) { + if (UseWriteBuffer) { + var readResult = await _writeBuffer!.ReadAsync(key).ConfigureAwait(false); + if (readResult.Found) { + return readResult.Value != null; + } + } + + await _indexLoader.Value.ConfigureAwait(false); + using (await _lock.ReaderLockAsync().ConfigureAwait(false)) { + return TryGetFileNameForKey(key, out _); + } + } + + /// protected override async ValueTask DeleteAsync(KVKey key) { if (UseWriteBuffer) { diff --git a/src/DataCore.Adapter.KeyValueStore.Sqlite/PublicAPI.Unshipped.txt b/src/DataCore.Adapter.KeyValueStore.Sqlite/PublicAPI.Unshipped.txt index d80fd437..cf23bc37 100644 --- a/src/DataCore.Adapter.KeyValueStore.Sqlite/PublicAPI.Unshipped.txt +++ b/src/DataCore.Adapter.KeyValueStore.Sqlite/PublicAPI.Unshipped.txt @@ -12,6 +12,7 @@ DataCore.Adapter.KeyValueStore.Sqlite.SqliteKeyValueStoreWriteBufferOptions.Enab DataCore.Adapter.KeyValueStore.Sqlite.SqliteKeyValueStoreWriteBufferOptions.Enabled.set -> void DataCore.Adapter.KeyValueStore.Sqlite.SqliteKeyValueStoreWriteBufferOptions.SqliteKeyValueStoreWriteBufferOptions() -> void override DataCore.Adapter.KeyValueStore.Sqlite.SqliteKeyValueStore.AllowRawWrites.get -> bool +override DataCore.Adapter.KeyValueStore.Sqlite.SqliteKeyValueStore.ExistsAsync(DataCore.Adapter.Services.KVKey key) -> System.Threading.Tasks.ValueTask override DataCore.Adapter.KeyValueStore.Sqlite.SqliteKeyValueStore.ReadAsync(DataCore.Adapter.Services.KVKey key) -> System.Threading.Tasks.ValueTask override DataCore.Adapter.KeyValueStore.Sqlite.SqliteKeyValueStore.ReadRawAsync(DataCore.Adapter.Services.KVKey key) -> System.Threading.Tasks.ValueTask override DataCore.Adapter.KeyValueStore.Sqlite.SqliteKeyValueStore.WriteAsync(DataCore.Adapter.Services.KVKey key, T value) -> System.Threading.Tasks.ValueTask diff --git a/src/DataCore.Adapter.KeyValueStore.Sqlite/SqliteKeyValueStore.cs b/src/DataCore.Adapter.KeyValueStore.Sqlite/SqliteKeyValueStore.cs index 36399d49..50ac1fea 100644 --- a/src/DataCore.Adapter.KeyValueStore.Sqlite/SqliteKeyValueStore.cs +++ b/src/DataCore.Adapter.KeyValueStore.Sqlite/SqliteKeyValueStore.cs @@ -346,6 +346,40 @@ private async ValueTask WriteCoreAsync(SqliteConnection connection, SqliteTransa } + /// + protected override async ValueTask ExistsAsync(KVKey key) { + if (_disposed) { + throw new ObjectDisposedException(GetType().FullName); + } + + using (await _lock.ReaderLockAsync().ConfigureAwait(false)) { + if (_disposed) { + return false; + } + + // Check pending writes first. + if (UseWriteBuffer) { + var pendingValue = await _writeBuffer!.ReadAsync(key, _disposedTokenSource.Token).ConfigureAwait(false); + if (pendingValue.Found) { + return pendingValue.Value != null; + } + } + + using (var connection = new SqliteConnection(_connectionString)) { + connection.Open(); + + using (var command = connection.CreateCommand()) { + command.CommandText = "SELECT COUNT(*) FROM kvstore WHERE key = $key"; + command.Parameters.AddWithValue("$key", ConvertBytesToHexString(key)); + + var count = Convert.ToInt32(await command.ExecuteScalarAsync(_disposedTokenSource.Token).ConfigureAwait(false)); + return count != 0; + } + } + } + } + + /// protected override async ValueTask DeleteAsync(KVKey key) { if (_disposed) { diff --git a/test/DataCore.Adapter.Tests/FasterKeyValueStoreTests.cs b/test/DataCore.Adapter.Tests/FasterKeyValueStoreTests.cs index dfdf861c..17dd5b2a 100644 --- a/test/DataCore.Adapter.Tests/FasterKeyValueStoreTests.cs +++ b/test/DataCore.Adapter.Tests/FasterKeyValueStoreTests.cs @@ -18,7 +18,11 @@ namespace DataCore.Adapter.Tests { [TestClass] public class FasterKeyValueStoreTests : KeyValueStoreTests { protected override FasterKeyValueStore CreateStore(CompressionLevel compressionLevel, bool enableRawWrites = false) { - return new FasterKeyValueStore(new FasterKeyValueStoreOptions() { CompressionLevel = compressionLevel, EnableRawWrites = enableRawWrites }); + return new FasterKeyValueStore(new FasterKeyValueStoreOptions() { + Name = TestContext.TestName, + CompressionLevel = compressionLevel, + EnableRawWrites = enableRawWrites + }); } diff --git a/test/DataCore.Adapter.Tests/KeyValueStoreTests.cs b/test/DataCore.Adapter.Tests/KeyValueStoreTests.cs index 9bb3434a..6255fb64 100644 --- a/test/DataCore.Adapter.Tests/KeyValueStoreTests.cs +++ b/test/DataCore.Adapter.Tests/KeyValueStoreTests.cs @@ -31,7 +31,10 @@ public async Task ShouldWriteValueToStore(CompressionLevel compressionLevel) { await store.WriteAsync(TestContext.TestName, now); } finally { - if (store is IDisposable disposable) { + if (store is IAsyncDisposable asyncDisposable) { + await asyncDisposable.DisposeAsync(); + } + else if (store is IDisposable disposable) { disposable.Dispose(); } } @@ -67,6 +70,26 @@ public async Task ShouldUpdateValue(CompressionLevel compressionLevel) { } + [TestMethod] + public async Task KeyShouldExistInStore() { + var now = DateTime.UtcNow; + + var store = CreateStore(CompressionLevel.NoCompression); + try { + await store.WriteAsync(TestContext.TestName, now); + Assert.IsTrue(await store.ExistsAsync(TestContext.TestName)); + } + finally { + if (store is IAsyncDisposable asyncDisposable) { + await asyncDisposable.DisposeAsync(); + } + else if (store is IDisposable disposable) { + disposable.Dispose(); + } + } + } + + [DataTestMethod] [DataRow(CompressionLevel.NoCompression)] [DataRow(CompressionLevel.Fastest)] @@ -85,7 +108,10 @@ public async Task ShouldReadValueFromStore(CompressionLevel compressionLevel) { Assert.AreEqual(now, value); } finally { - if (store is IDisposable disposable) { + if (store is IAsyncDisposable asyncDisposable) { + await asyncDisposable.DisposeAsync(); + } + else if (store is IDisposable disposable) { disposable.Dispose(); } } @@ -116,7 +142,10 @@ public async Task ShouldDeleteValueFromStore(CompressionLevel compressionLevel) Assert.AreEqual(default(DateTime), value2); } finally { - if (store is IDisposable disposable) { + if (store is IAsyncDisposable asyncDisposable) { + await asyncDisposable.DisposeAsync(); + } + else if (store is IDisposable disposable) { disposable.Dispose(); } } @@ -147,7 +176,10 @@ public async Task ShouldListKeys(CompressionLevel compressionLevel) { } } finally { - if (store is IDisposable disposable) { + if (store is IAsyncDisposable asyncDisposable) { + await asyncDisposable.DisposeAsync(); + } + else if (store is IDisposable disposable) { disposable.Dispose(); } } @@ -179,7 +211,10 @@ public async Task ScopedStoreShouldAddKeyPrefix(CompressionLevel compressionLeve Assert.AreEqual(now, value2); } finally { - if (store is IDisposable disposable) { + if (store is IAsyncDisposable asyncDisposable) { + await asyncDisposable.DisposeAsync(); + } + else if (store is IDisposable disposable) { disposable.Dispose(); } } @@ -206,7 +241,10 @@ public async Task ScopedStoreShouldRemoveKeyPrefix(CompressionLevel compressionL Assert.IsTrue(keys.Contains(TestContext.TestName)); } finally { - if (store is IDisposable disposable) { + if (store is IAsyncDisposable asyncDisposable) { + await asyncDisposable.DisposeAsync(); + } + else if (store is IDisposable disposable) { disposable.Dispose(); } } @@ -246,7 +284,10 @@ public async Task ShouldHandleParallelWrites(CompressionLevel compressionLevel) } } finally { - if (store is IDisposable disposable) { + if (store is IAsyncDisposable asyncDisposable) { + await asyncDisposable.DisposeAsync(); + } + else if (store is IDisposable disposable) { disposable.Dispose(); } } @@ -277,7 +318,10 @@ public async Task ShouldWriteRawValueToStore(CompressionLevel compressionLevel) Assert.AreEqual(now, deserialized); } finally { - if (store is IDisposable disposable) { + if (store is IAsyncDisposable asyncDisposable) { + await asyncDisposable.DisposeAsync(); + } + else if (store is IDisposable disposable) { disposable.Dispose(); } } @@ -308,7 +352,10 @@ public async Task ShouldReadRawValueFromStore(CompressionLevel compressionLevel) Assert.IsTrue(raw.SequenceEqual(raw2)); } finally { - if (store is IDisposable disposable) { + if (store is IAsyncDisposable asyncDisposable) { + await asyncDisposable.DisposeAsync(); + } + else if (store is IDisposable disposable) { disposable.Dispose(); } } @@ -330,7 +377,10 @@ public async Task RawWriteShouldThrowException() { await Assert.ThrowsExceptionAsync(async () => await rawStore.WriteRawAsync(TestContext.TestName, raw)); } finally { - if (store is IDisposable disposable) { + if (store is IAsyncDisposable asyncDisposable) { + await asyncDisposable.DisposeAsync(); + } + else if (store is IDisposable disposable) { disposable.Dispose(); } }