Skip to content

Commit

Permalink
Check for existing key and add FASTER size tracking metrics (#367)
Browse files Browse the repository at this point in the history
* Create ScopedRawKeyValueStore.cs

New wrapper class for a scoped `IRawKeyValueStore`.

* Update IKeyValueStore.cs

Add a new `ExistsAsync` method for detecting if a key exists without reading its value.

* Implement ExistsAsync

* Update ScopedKeyValueStore.cs

Implement ExistsAsync and make class internal

* Update KeyValueStoreExtensions.cs

`BulkCopyToAsync`/`BulkCopyFromAsync` now include an `overwrite` parameter that controls if existing keys in the destination store should be overwritten.

`CreateScopedStore` has been updated to return a `ScopedRawKeyValueStore` if the store to be wrapped implements `IRawKeyValueStore`.

* Update API declarations

* Implement ExistsAsync/add size tracking metrics

Implements `ExistsAsync` in the FASTER key/value store.

Adds functionality (based on the FASTER GitHub samples) for tracking the memory footprint of a FASTER key/value store.

Adds metrics to `FasterKeyValueStore` for reporting memory usage.

* Update unit tests

Adds tests for `ExistsAsync` and disposes stores using `IAsyncDisposable.DisposeAsync` in preference to `IDisposable.Dispose` if defined on a store implementation.

* Size tracking optimisations
  • Loading branch information
wazzamatazz committed Nov 23, 2023
1 parent 5fd07c0 commit 44d873b
Show file tree
Hide file tree
Showing 22 changed files with 881 additions and 160 deletions.
29 changes: 29 additions & 0 deletions THIRD_PARTY_LICENSES.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,35 @@ SOFTWARE.
* * *


# Microsoft Corporation

Some code in this repository is based on [Microsoft FASTER](https://github.com/microsoft/FASTER).

MIT License

Copyright (c) Microsoft Corporation. All rights reserved.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

* * *


# Jering.KeyValueStore

Some code in this repository is based on [Jering.KeyValueStore](https://github.com/JeringTech/KeyValueStore).
Expand Down
4 changes: 0 additions & 4 deletions src/DataCore.Adapter.Abstractions/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,6 @@ DataCore.Adapter.Services.KVKey.KVKey() -> void
DataCore.Adapter.Services.KVKey.KVKey(byte[]? value) -> void
DataCore.Adapter.Services.KVKey.Length.get -> int
DataCore.Adapter.Services.KVKey.Value.get -> byte[]!
DataCore.Adapter.Services.ScopedKeyValueStore
DataCore.Adapter.Services.ScopedKeyValueStore.DeleteAsync(DataCore.Adapter.Services.KVKey key) -> System.Threading.Tasks.ValueTask<bool>
DataCore.Adapter.Services.ScopedKeyValueStore.GetKeysAsync(DataCore.Adapter.Services.KVKey? prefix) -> System.Collections.Generic.IAsyncEnumerable<DataCore.Adapter.Services.KVKey>!
DataCore.Adapter.Services.ScopedKeyValueStore.ScopedKeyValueStore(DataCore.Adapter.Services.KVKey prefix, DataCore.Adapter.Services.IKeyValueStore! inner) -> void
DataCore.Adapter.Tags.ITagConfiguration
DataCore.Adapter.Tags.ITagConfiguration.CreateTagAsync(DataCore.Adapter.IAdapterCallContext! context, DataCore.Adapter.Tags.CreateTagRequest! request, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task<DataCore.Adapter.Tags.TagDefinition!>!
DataCore.Adapter.Tags.ITagConfiguration.DeleteTagAsync(DataCore.Adapter.IAdapterCallContext! context, DataCore.Adapter.Tags.DeleteTagRequest! request, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task<bool>!
Expand Down
8 changes: 4 additions & 4 deletions src/DataCore.Adapter.Abstractions/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#nullable enable
abstract DataCore.Adapter.Services.KeyValueStore.ExistsAsync(DataCore.Adapter.Services.KVKey key) -> System.Threading.Tasks.ValueTask<bool>
abstract DataCore.Adapter.Services.KeyValueStore.GetSerializer() -> DataCore.Adapter.Services.IKeyValueStoreSerializer!
abstract DataCore.Adapter.Services.KeyValueStore.ReadAsync<T>(DataCore.Adapter.Services.KVKey key) -> System.Threading.Tasks.ValueTask<T?>
abstract DataCore.Adapter.Services.KeyValueStore.WriteAsync<T>(DataCore.Adapter.Services.KVKey key, T value) -> System.Threading.Tasks.ValueTask
Expand Down Expand Up @@ -52,6 +53,7 @@ DataCore.Adapter.Common.HostInfoBuilder.WithProperty(string! name, DataCore.Adap
DataCore.Adapter.Common.HostInfoBuilder.WithVendor(DataCore.Adapter.Common.VendorInfo? vendor) -> DataCore.Adapter.Common.HostInfoBuilder!
DataCore.Adapter.Common.HostInfoBuilder.WithVersion(string? version) -> DataCore.Adapter.Common.HostInfoBuilder!
DataCore.Adapter.IAdapterCallContext.Services.get -> System.IServiceProvider!
DataCore.Adapter.Services.IKeyValueStore.ExistsAsync(DataCore.Adapter.Services.KVKey key) -> System.Threading.Tasks.ValueTask<bool>
DataCore.Adapter.Services.IKeyValueStore.ReadAsync<T>(DataCore.Adapter.Services.KVKey key) -> System.Threading.Tasks.ValueTask<T?>
DataCore.Adapter.Services.IKeyValueStore.WriteAsync<T>(DataCore.Adapter.Services.KVKey key, T value) -> System.Threading.Tasks.ValueTask
DataCore.Adapter.Services.IKeyValueStoreSerializer
Expand Down Expand Up @@ -94,15 +96,13 @@ DataCore.Adapter.Services.KeyValueStoreWriteBufferOptions.SizeLimit.get -> int
DataCore.Adapter.Services.KeyValueStoreWriteBufferOptions.SizeLimit.set -> void
DataCore.Adapter.Services.RawKeyValueStore<TOptions>
DataCore.Adapter.Services.RawKeyValueStore<TOptions>.RawKeyValueStore(TOptions? options, Microsoft.Extensions.Logging.ILogger? logger = null) -> void
DataCore.Adapter.Services.ScopedKeyValueStore.ReadAsync<T>(DataCore.Adapter.Services.KVKey key) -> System.Threading.Tasks.ValueTask<T?>
DataCore.Adapter.Services.ScopedKeyValueStore.WriteAsync<T>(DataCore.Adapter.Services.KVKey key, T value) -> System.Threading.Tasks.ValueTask
override DataCore.Adapter.Services.KVKey.ToString() -> string!
override sealed DataCore.Adapter.Services.KeyValueStore<TOptions>.GetCompressionLevel() -> System.IO.Compression.CompressionLevel
override sealed DataCore.Adapter.Services.KeyValueStore<TOptions>.GetSerializer() -> DataCore.Adapter.Services.IKeyValueStoreSerializer!
static DataCore.Adapter.AdapterExtensions.CreateExtendedAdapterDescriptorBuilder(this DataCore.Adapter.IAdapter! adapter) -> DataCore.Adapter.Common.AdapterDescriptorBuilder!
static DataCore.Adapter.Services.JsonKeyValueStoreSerializer.Default.get -> DataCore.Adapter.Services.IKeyValueStoreSerializer!
static DataCore.Adapter.Services.KeyValueStoreExtensions.BulkCopyFromAsync(this DataCore.Adapter.Services.IRawKeyValueStore! destination, DataCore.Adapter.Services.IRawKeyValueStore! source, DataCore.Adapter.Services.KVKey? keyPrefix = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<int>!
static DataCore.Adapter.Services.KeyValueStoreExtensions.BulkCopyToAsync(this DataCore.Adapter.Services.IRawKeyValueStore! source, DataCore.Adapter.Services.IRawKeyValueStore! destination, DataCore.Adapter.Services.KVKey? keyPrefix = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<int>!
static DataCore.Adapter.Services.KeyValueStoreExtensions.BulkCopyFromAsync(this DataCore.Adapter.Services.IRawKeyValueStore! destination, DataCore.Adapter.Services.IRawKeyValueStore! source, DataCore.Adapter.Services.KVKey? keyPrefix = null, bool overwrite = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<int>!
static DataCore.Adapter.Services.KeyValueStoreExtensions.BulkCopyToAsync(this DataCore.Adapter.Services.IRawKeyValueStore! source, DataCore.Adapter.Services.IRawKeyValueStore! destination, DataCore.Adapter.Services.KVKey? keyPrefix = null, bool overwrite = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<int>!
static DataCore.Adapter.Services.KeyValueStoreExtensions.CopyFromAsync(this DataCore.Adapter.Services.IRawKeyValueStore! destination, DataCore.Adapter.Services.IRawKeyValueStore! source, System.Collections.Generic.IEnumerable<DataCore.Adapter.Services.KVKey>! keys, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<int>!
static DataCore.Adapter.Services.KeyValueStoreExtensions.CopyToAsync(this DataCore.Adapter.Services.IRawKeyValueStore! source, DataCore.Adapter.Services.IRawKeyValueStore! destination, System.Collections.Generic.IEnumerable<DataCore.Adapter.Services.KVKey>! keys, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<int>!
static DataCore.Adapter.Services.KeyValueStoreExtensions.GetKeysAsStringsAsync(this DataCore.Adapter.Services.IKeyValueStore! store) -> System.Collections.Generic.IAsyncEnumerable<string!>!
Expand Down
12 changes: 12 additions & 0 deletions src/DataCore.Adapter.Abstractions/Services/IKeyValueStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ public interface IKeyValueStore {
ValueTask<T?> ReadAsync<T>(KVKey key);


/// <summary>
/// Tests if a key exists in the store.
/// </summary>
/// <param name="key">
/// The key to test.
/// </param>
/// <returns>
/// <see langword="true"/> if the key exists; otherwise, <see langword="false"/>.
/// </returns>
ValueTask<bool> ExistsAsync(KVKey key);


/// <summary>
/// Deletes a value from the store.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ public sealed class InMemoryKeyValueStore : KeyValueStore, IDisposable {
}


/// <inheritdoc/>
protected override ValueTask<bool> ExistsAsync(KVKey key) {
var keyAsString = System.Text.Encoding.UTF8.GetString(key);
return new ValueTask<bool>(_values.ContainsKey(keyAsString));
}


/// <inheritdoc/>
protected override ValueTask<bool> DeleteAsync(KVKey key) {
var keyAsString = System.Text.Encoding.UTF8.GetString(key);
Expand Down
21 changes: 21 additions & 0 deletions src/DataCore.Adapter.Abstractions/Services/KeyValueStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ public abstract class KeyValueStore : IKeyValueStore {
}


/// <inheritdoc/>
async ValueTask<bool> IKeyValueStore.ExistsAsync(KVKey key) {
if (key.Length == 0) {
throw new ArgumentException(AbstractionsResources.Error_KeyValueStore_InvalidKey, nameof(key));
}
return await ExistsAsync(key).ConfigureAwait(false);
}


/// <inheritdoc/>
async ValueTask<bool> IKeyValueStore.DeleteAsync(KVKey key) {
if (key.Length == 0) {
Expand Down Expand Up @@ -104,6 +113,18 @@ public abstract class KeyValueStore : IKeyValueStore {
protected abstract ValueTask<T?> ReadAsync<T>(KVKey key);


/// <summary>
/// Tests if a key exists in the store.
/// </summary>
/// <param name="key">
/// The key to test.
/// </param>
/// <returns>
/// <see langword="true"/> if the key exists; otherwise, <see langword="false"/>.
/// </returns>
protected abstract ValueTask<bool> ExistsAsync(KVKey key);


/// <summary>
/// Deletes a value from the store.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,8 @@ public static class KeyValueStoreExtensions {
throw new ArgumentException(AbstractionsResources.Error_KeyValueStore_InvalidKey, nameof(prefix));
}

if (store is ScopedKeyValueStore scoped) {
// This store is already an instance of ScopedKeyValueStore. Instead of wrapping
// the scoped store and recursively applying key prefixes in every operation, we
// will wrap the inner store and concatenate the prefix for this store with the
// prefix passed to this method.
return new ScopedKeyValueStore(KeyValueStore.AddPrefix(scoped.Prefix, prefix), scoped.Inner);
if (store is IRawKeyValueStore raw) {
return new ScopedRawKeyValueStore(prefix, raw);
}

return new ScopedKeyValueStore(prefix, store);
Expand Down Expand Up @@ -318,6 +314,9 @@ public static class KeyValueStoreExtensions {
/// <param name="keyPrefix">
/// The filter to apply to keys read from the source store.
/// </param>
/// <param name="overwrite">
/// Specifies if existing keys in the destination store should be overwritten.
/// </param>
/// <param name="cancellationToken">
/// The cancellation token for the operation.
/// </param>
Expand All @@ -330,7 +329,7 @@ public static class KeyValueStoreExtensions {
/// <exception cref="ArgumentNullException">
/// <paramref name="destination"/> is <see langword="null"/>.
/// </exception>
public static async Task<int> BulkCopyToAsync(this IRawKeyValueStore source, IRawKeyValueStore destination, KVKey? keyPrefix = null, CancellationToken cancellationToken = default) {
public static async Task<int> BulkCopyToAsync(this IRawKeyValueStore source, IRawKeyValueStore destination, KVKey? keyPrefix = null, bool overwrite = false, CancellationToken cancellationToken = default) {
if (source == null) {
throw new ArgumentNullException(nameof(source));
}
Expand All @@ -348,6 +347,13 @@ public static class KeyValueStoreExtensions {
continue;
}

if (!overwrite) {
var exists = await destination.ExistsAsync(key).ConfigureAwait(false);
if (exists) {
continue;
}
}

await destination.WriteRawAsync(key, value).ConfigureAwait(false);
count++;
}
Expand All @@ -368,6 +374,9 @@ public static class KeyValueStoreExtensions {
/// <param name="keyPrefix">
/// The filter to apply to keys read from the source store.
/// </param>
/// <param name="overwrite">
/// Specifies if existing keys in the destination store should be overwritten.
/// </param>
/// <param name="cancellationToken">
/// The cancellation token for the operation.
/// </param>
Expand All @@ -380,8 +389,8 @@ public static class KeyValueStoreExtensions {
/// <exception cref="ArgumentNullException">
/// <paramref name="destination"/> is <see langword="null"/>.
/// </exception>
public static async Task<int> BulkCopyFromAsync(this IRawKeyValueStore destination, IRawKeyValueStore source, KVKey? keyPrefix = null, CancellationToken cancellationToken = default) {
return await source.BulkCopyToAsync(destination, keyPrefix, cancellationToken).ConfigureAwait(false);
public static async Task<int> BulkCopyFromAsync(this IRawKeyValueStore destination, IRawKeyValueStore source, KVKey? keyPrefix = null, bool overwrite = false, CancellationToken cancellationToken = default) {
return await source.BulkCopyToAsync(destination, keyPrefix, overwrite, cancellationToken).ConfigureAwait(false);
}


Expand Down Expand Up @@ -412,6 +421,10 @@ public static class KeyValueStoreExtensions {
/// <exception cref="ArgumentNullException">
/// <paramref name="keys"/> is <see langword="null"/>.
/// </exception>
/// <remarks>
/// Unlike <see cref="BulkCopyToAsync"/>, <see cref="CopyToAsync"/> will always
/// overwrite keys in the destination store.
/// </remarks>
public static async Task<int> CopyToAsync(this IRawKeyValueStore source, IRawKeyValueStore destination, IEnumerable<KVKey> keys, CancellationToken cancellationToken = default) {
if (source == null) {
throw new ArgumentNullException(nameof(source));
Expand Down Expand Up @@ -469,6 +482,10 @@ public static class KeyValueStoreExtensions {
/// <exception cref="ArgumentNullException">
/// <paramref name="keys"/> is <see langword="null"/>.
/// </exception>
/// <remarks>
/// Unlike <see cref="BulkCopyFromAsync"/>, <see cref="CopyFromAsync"/> will always
/// overwrite keys in the destination store.
/// </remarks>
public static async Task<int> CopyFromAsync(this IRawKeyValueStore destination, IRawKeyValueStore source, IEnumerable<KVKey> keys, CancellationToken cancellationToken = default) {
return await source.CopyToAsync(destination, keys, cancellationToken).ConfigureAwait(false);
}
Expand Down
28 changes: 20 additions & 8 deletions src/DataCore.Adapter.Abstractions/Services/ScopedKeyValueStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,7 @@ namespace DataCore.Adapter.Services {
/// <see cref="IKeyValueStore"/> that wraps an existing <see cref="IKeyValueStore"/> and
/// automatically modifies keys passed in or out of the store using a scoped prefix.
/// </summary>
/// <remarks>
/// You can simplify creating instances of this class by using the
/// <see cref="KeyValueStoreExtensions.CreateScopedStore(IKeyValueStore, KVKey)"/>
/// extension method.
/// </remarks>
public class ScopedKeyValueStore : IKeyValueStore {
internal class ScopedKeyValueStore : IKeyValueStore {

/// <summary>
/// The prefix to apply to keys read from or written to the inner store.
Expand Down Expand Up @@ -42,8 +37,18 @@ public class ScopedKeyValueStore : IKeyValueStore {
if (prefix.Value.Length == 0) {
throw new ArgumentException(AbstractionsResources.Error_KeyValueStore_InvalidKey, nameof(prefix));
}
Prefix = prefix;
Inner = inner ?? throw new ArgumentNullException(nameof(inner));
if (inner == null) {
throw new ArgumentNullException(nameof(inner));
}

if (inner is ScopedKeyValueStore scoped) {
Prefix = KeyValueStore.AddPrefix(scoped.Prefix, prefix);
Inner = scoped.Inner;
}
else {
Prefix = prefix;
Inner = inner;
}
}


Expand All @@ -61,6 +66,13 @@ public class ScopedKeyValueStore : IKeyValueStore {
}


/// <inheritdoc/>
public ValueTask<bool> ExistsAsync(KVKey key) {
var k = KeyValueStore.AddPrefix(Prefix, key);
return Inner.ExistsAsync(k);
}


/// <inheritdoc/>
public ValueTask<bool> DeleteAsync(KVKey key) {
var k = KeyValueStore.AddPrefix(Prefix, key);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System.Threading.Tasks;

namespace DataCore.Adapter.Services {

/// <summary>
/// <see cref="IRawKeyValueStore"/> that wraps an existing <see cref="IRawKeyValueStore"/> and
/// automatically modifies keys passed in or out of the store using a scoped prefix.
/// </summary>
internal class ScopedRawKeyValueStore : ScopedKeyValueStore, IRawKeyValueStore {

/// <summary>
/// The inner <see cref="IRawKeyValueStore"/>.
/// </summary>
internal new IRawKeyValueStore Inner => (IRawKeyValueStore) base.Inner;


/// <summary>
/// Creates a new <see cref="ScopedRawKeyValueStore"/> object.
/// </summary>
/// <param name="prefix">
/// The key prefix for the store.
/// </param>
/// <param name="inner">
/// The inner <see cref="IRawKeyValueStore"/> to wrap.
/// </param>
public ScopedRawKeyValueStore(KVKey prefix, IRawKeyValueStore inner)
: base(prefix, inner) { }


/// <inheritdoc/>
public ValueTask<byte[]?> ReadRawAsync(KVKey key) {
var k = KeyValueStore.AddPrefix(Prefix, key);
return Inner.ReadRawAsync(k);
}


/// <inheritdoc/>
public ValueTask WriteRawAsync(KVKey key, byte[] value) {
var k = KeyValueStore.AddPrefix(Prefix, key);
return Inner.WriteRawAsync(k, value);
}

}
}
Loading

0 comments on commit 44d873b

Please sign in to comment.