diff --git a/libs/common/IKeyInputSerializer.cs b/libs/common/IKeyInputSerializer.cs
deleted file mode 100644
index 66813b7c6b..0000000000
--- a/libs/common/IKeyInputSerializer.cs
+++ /dev/null
@@ -1,20 +0,0 @@
-// Copyright (c) Microsoft Corporation.
-// Licensed under the MIT license.
-
-namespace Garnet.common
-{
- ///
- /// Serializer interface for keys, needed for pub-sub
- ///
- /// Key
- /// Input
- public unsafe interface IKeyInputSerializer : IKeySerializer
- {
- ///
- /// Read input by reference, from given location
- ///
- /// Memory location
- /// Input
- ref Input ReadInputByRef(ref byte* src);
- }
-}
\ No newline at end of file
diff --git a/libs/host/GarnetServer.cs b/libs/host/GarnetServer.cs
index 7e908cad40..173404dcc1 100644
--- a/libs/host/GarnetServer.cs
+++ b/libs/host/GarnetServer.cs
@@ -27,7 +27,6 @@ public class GarnetServer : IDisposable
private TsavoriteKV objectStore;
private IDevice aofDevice;
private TsavoriteLog appendOnlyFile;
- private SubscribeKVBroker> kvBroker;
private SubscribeBroker> broker;
private LogSettings logSettings, objLogSettings;
private INamedDeviceFactory logFactory;
@@ -239,7 +238,6 @@ private void InitializeServer()
if (!opts.DisablePubSub)
{
- kvBroker = new SubscribeKVBroker>(new SpanByteKeySerializer(), null, opts.PubSubPageSizeBytes(), true);
broker = new SubscribeBroker>(new SpanByteKeySerializer(), null, opts.PubSubPageSizeBytes(), true);
}
@@ -281,7 +279,7 @@ private void InitializeServer()
storeWrapper = new StoreWrapper(version, redisProtocolVersion, server, store, objectStore, objectStoreSizeTracker, customCommandManager, appendOnlyFile, opts, clusterFactory: clusterFactory, loggerFactory: loggerFactory);
// Create session provider for Garnet
- Provider = new GarnetProvider(storeWrapper, kvBroker, broker);
+ Provider = new GarnetProvider(storeWrapper, broker);
// Create user facing API endpoints
Metrics = new MetricsApi(Provider);
@@ -335,7 +333,6 @@ private void InternalDispose()
Provider?.Dispose();
server.Dispose();
broker?.Dispose();
- kvBroker?.Dispose();
store.Dispose();
appendOnlyFile?.Dispose();
aofDevice?.Dispose();
diff --git a/libs/server/Providers/GarnetProvider.cs b/libs/server/Providers/GarnetProvider.cs
index 8e16f6a13c..5ec0dd08ac 100644
--- a/libs/server/Providers/GarnetProvider.cs
+++ b/libs/server/Providers/GarnetProvider.cs
@@ -24,14 +24,12 @@ public sealed class GarnetProvider : TsavoriteKVProviderBase
///
- ///
///
///
public GarnetProvider(StoreWrapper storeWrapper,
- SubscribeKVBroker> kvBroker = null,
SubscribeBroker> broker = null,
MaxSizeSettings maxSizeSettings = default)
- : base(storeWrapper.store, new(), kvBroker, broker, false, maxSizeSettings)
+ : base(storeWrapper.store, new(), broker, false, maxSizeSettings)
{
this.storeWrapper = storeWrapper;
}
diff --git a/libs/server/Providers/TsavoriteKVProviderBase.cs b/libs/server/Providers/TsavoriteKVProviderBase.cs
index d3d07675ca..461941b622 100644
--- a/libs/server/Providers/TsavoriteKVProviderBase.cs
+++ b/libs/server/Providers/TsavoriteKVProviderBase.cs
@@ -25,11 +25,6 @@ public abstract class TsavoriteKVProviderBase
protected readonly ParameterSerializer serializer;
- ///
- /// KV broker
- ///
- protected readonly SubscribeKVBroker> kvBroker;
-
///
/// Broker
///
@@ -45,11 +40,10 @@ public abstract class TsavoriteKVProviderBase
///
///
- ///
///
///
///
- public TsavoriteKVProviderBase(TsavoriteKV store, ParameterSerializer serializer, SubscribeKVBroker> kvBroker = null, SubscribeBroker> broker = null, bool recoverStore = false, MaxSizeSettings maxSizeSettings = default)
+ public TsavoriteKVProviderBase(TsavoriteKV store, ParameterSerializer serializer, SubscribeBroker> broker = null, bool recoverStore = false, MaxSizeSettings maxSizeSettings = default)
{
this.store = store;
if (recoverStore)
@@ -61,7 +55,6 @@ public TsavoriteKVProviderBase(TsavoriteKV store, ParameterSerialize
catch
{ }
}
- this.kvBroker = kvBroker;
this.broker = broker;
this.serializer = serializer;
this.maxSizeSettings = maxSizeSettings ?? new MaxSizeSettings();
diff --git a/libs/server/PubSub/SpanByteKeySerializer.cs b/libs/server/PubSub/SpanByteKeySerializer.cs
index 5cd2c7f6ae..cc5702690b 100644
--- a/libs/server/PubSub/SpanByteKeySerializer.cs
+++ b/libs/server/PubSub/SpanByteKeySerializer.cs
@@ -9,9 +9,9 @@
namespace Garnet.server
{
///
- /// Serializer for SpanByte. Used only on server-side.
+ /// Serializer for SpanByte.
///
- public sealed unsafe class SpanByteKeySerializer : IKeyInputSerializer
+ public sealed unsafe class SpanByteKeySerializer : IKeySerializer
{
///
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -22,15 +22,6 @@ public sealed unsafe class SpanByteKeySerializer : IKeyInputSerializer
- [MethodImpl(MethodImplOptions.AggressiveInlining)]
- public ref SpanByte ReadInputByRef(ref byte* src)
- {
- ref var ret = ref Unsafe.AsRef(src);
- src += ret.TotalSize;
- return ref ret;
- }
-
///
public bool Match(ref SpanByte k, bool asciiKey, ref SpanByte pattern, bool asciiPattern)
{
diff --git a/libs/server/PubSub/SubscribeKVBroker.cs b/libs/server/PubSub/SubscribeKVBroker.cs
deleted file mode 100644
index c8a6c77d50..0000000000
--- a/libs/server/PubSub/SubscribeKVBroker.cs
+++ /dev/null
@@ -1,273 +0,0 @@
-// Copyright (c) Microsoft Corporation.
-// Licensed under the MIT license.
-
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Threading;
-using System.Threading.Tasks;
-using Garnet.common;
-using Garnet.networking;
-using Tsavorite.core;
-
-namespace Garnet.server
-{
- ///
- /// Broker used for PUB-SUB to Tsavorite KV store. There is a broker per TsavoriteKV instance.
- /// A single broker can be used with multiple TsavoriteKVProviders.
- ///
- ///
- ///
- ///
- ///
- public sealed class SubscribeKVBroker : IDisposable
- where KeyInputSerializer : IKeyInputSerializer
- {
- private int sid = 0;
- private ConcurrentDictionary> subscriptions;
- private ConcurrentDictionary> prefixSubscriptions;
- private AsyncQueue publishQueue;
- readonly IKeyInputSerializer keyInputSerializer;
- readonly TsavoriteLog log;
- readonly IDevice device;
- readonly CancellationTokenSource cts = new();
- readonly ManualResetEvent done = new(true);
- bool disposed = false;
-
- ///
- /// Constructor
- ///
- /// Serializer for Prefix Match and serializing Key and Input
- /// Directory where the log will be stored
- /// Page size of log used for pub/sub
- /// start the log from scratch, do not continue
- public SubscribeKVBroker(IKeyInputSerializer keyInputSerializer, string logDir, long pageSize, bool startFresh = true)
- {
- this.keyInputSerializer = keyInputSerializer;
- device = logDir == null ? new NullDevice() : Devices.CreateLogDevice(logDir + "/pubsubkv", preallocateFile: false);
- device.Initialize((long)(1 << 30) * 64);
- log = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSize = pageSize, MemorySize = pageSize * 4, AutoRefreshSafeTailAddress = true });
- if (startFresh)
- log.TruncateUntil(log.CommittedUntilAddress);
- }
-
- ///
- /// Remove all subscriptions for a session,
- /// called during dispose of server session
- ///
- /// server session
- public void RemoveSubscription(IMessageConsumer session)
- {
- if (subscriptions != null)
- {
- foreach (var kvp in subscriptions)
- {
- foreach (var sub in kvp.Value)
- {
- if (sub.Value.Item1 == session)
- {
- kvp.Value.TryRemove(sub.Key, out _);
- break;
- }
- }
- }
- }
-
- if (prefixSubscriptions != null)
- {
- foreach (var kvp in prefixSubscriptions)
- {
- foreach (var sub in kvp.Value)
- {
- if (sub.Value.Item1 == session)
- {
- kvp.Value.TryRemove(sub.Key, out _);
- break;
- }
- }
- }
- }
- }
-
- internal async Task Start(CancellationToken cancellationToken = default)
- {
- try
- {
- var uniqueKeys = new HashSet(new ByteArrayComparer());
- var uniqueKeySubscriptions = new List<(ServerSessionBase, int, bool)>();
- long truncateUntilAddress = log.BeginAddress;
-
- while (true)
- {
- if (disposed)
- break;
-
- using var iter = log.Scan(log.BeginAddress, long.MaxValue, scanUncommitted: true);
- await iter.WaitAsync(cancellationToken).ConfigureAwait(false);
- while (iter.GetNext(out byte[] subscriptionKey, out int entryLength, out long currentAddress, out long nextAddress))
- {
- if (currentAddress >= long.MaxValue) return;
- uniqueKeys.Add(subscriptionKey);
- truncateUntilAddress = nextAddress;
- }
-
- if (truncateUntilAddress > log.BeginAddress)
- log.TruncateUntil(truncateUntilAddress);
-
- unsafe
- {
- foreach (var keyBytes in uniqueKeys)
- {
- fixed (byte* ptr = &keyBytes[0])
- {
- byte* keyPtr = ptr;
- bool foundSubscription = subscriptions.TryGetValue(keyBytes, out var subscriptionServerSessionDict);
- if (foundSubscription)
- {
- foreach (var sub in subscriptionServerSessionDict)
- {
- byte* keyBytePtr = ptr;
- var serverSession = sub.Value.Item1;
- byte* nullBytePtr = null;
-
- fixed (byte* inputPtr = &sub.Value.Item2[0])
- {
- byte* inputBytePtr = inputPtr;
- serverSession.Publish(ref keyBytePtr, keyBytes.Length, ref nullBytePtr, 0, ref inputBytePtr, sub.Key);
- }
- }
- }
-
- foreach (var kvp in prefixSubscriptions)
- {
- var subscribedPrefixBytes = kvp.Key;
- var prefixSubscriptionServerSessionDict = kvp.Value;
- fixed (byte* subscribedPrefixPtr = &subscribedPrefixBytes[0])
- {
- byte* subPrefixPtr = subscribedPrefixPtr;
- byte* reqKeyPtr = ptr;
-
- bool match = keyInputSerializer.Match(ref keyInputSerializer.ReadKeyByRef(ref reqKeyPtr), false,
- ref keyInputSerializer.ReadKeyByRef(ref subPrefixPtr), false);
- if (match)
- {
- foreach (var sub in prefixSubscriptionServerSessionDict)
- {
- byte* keyBytePtr = ptr;
- var serverSession = sub.Value.Item1;
- byte* nullBytrPtr = null;
-
- fixed (byte* inputPtr = &sub.Value.Item2[0])
- {
- byte* inputBytePtr = inputPtr;
- serverSession.PrefixPublish(subPrefixPtr, kvp.Key.Length, ref keyBytePtr, keyBytes.Length, ref nullBytrPtr, 0, ref inputBytePtr, sub.Key);
- }
- }
- }
- }
- }
- }
- uniqueKeySubscriptions.Clear();
- }
- uniqueKeys.Clear();
- }
- }
- }
- finally
- {
- done.Set();
- }
- }
-
- ///
- /// Subscribe to a particular Key
- ///
- /// Key to subscribe to
- /// Input from subscriber
- /// Server session
- ///
- public unsafe int Subscribe(ref byte* key, ref byte* input, ServerSessionBase session)
- {
- var start = key;
- var inputStart = input;
- keyInputSerializer.ReadKeyByRef(ref key);
- keyInputSerializer.ReadInputByRef(ref input);
- var id = Interlocked.Increment(ref sid);
- if (Interlocked.CompareExchange(ref publishQueue, new AsyncQueue(), null) == null)
- {
- done.Reset();
- subscriptions = new ConcurrentDictionary>(new ByteArrayComparer());
- prefixSubscriptions = new ConcurrentDictionary>(new ByteArrayComparer());
- Task.Run(() => Start(cts.Token));
- }
- else
- {
- while (prefixSubscriptions == null) Thread.Yield();
- }
- var subscriptionKey = new Span(start, (int)(key - start)).ToArray();
- var subscriptionInput = new Span(inputStart, (int)(input - inputStart)).ToArray();
- subscriptions.TryAdd(subscriptionKey, new ConcurrentDictionary());
- if (subscriptions.TryGetValue(subscriptionKey, out var val))
- val.TryAdd(id, (session, subscriptionInput));
- return id;
- }
-
- ///
- /// Subscribe to a particular prefix
- ///
- /// prefix to subscribe to
- /// Input from subscriber
- /// Server session
- ///
- public unsafe int PSubscribe(ref byte* prefix, ref byte* input, ServerSessionBase session)
- {
- var start = prefix;
- var inputStart = input;
- keyInputSerializer.ReadKeyByRef(ref prefix);
- keyInputSerializer.ReadInputByRef(ref input);
- var id = Interlocked.Increment(ref sid);
- if (Interlocked.CompareExchange(ref publishQueue, new AsyncQueue(), null) == null)
- {
- done.Reset();
- subscriptions = new ConcurrentDictionary>(new ByteArrayComparer());
- prefixSubscriptions = new ConcurrentDictionary>(new ByteArrayComparer());
- Task.Run(() => Start(cts.Token));
- }
- else
- {
- while (prefixSubscriptions == null) Thread.Yield();
- }
- var subscriptionPrefix = new Span(start, (int)(prefix - start)).ToArray();
- var subscriptionInput = new Span(inputStart, (int)(input - inputStart)).ToArray();
- prefixSubscriptions.TryAdd(subscriptionPrefix, new ConcurrentDictionary());
- if (prefixSubscriptions.TryGetValue(subscriptionPrefix, out var val))
- val.TryAdd(id, (session, subscriptionInput));
- return id;
- }
-
- ///
- /// Publish the update made to key to all the subscribers
- ///
- /// key that has been updated
- public unsafe void Publish(byte* key)
- {
- if (subscriptions == null && prefixSubscriptions == null) return;
-
- var start = key;
- ref Key k = ref keyInputSerializer.ReadKeyByRef(ref key);
- log.Enqueue(new Span(start, (int)(key - start)));
- }
-
- ///
- public void Dispose()
- {
- disposed = true;
- cts.Cancel();
- done.WaitOne();
- subscriptions?.Clear();
- prefixSubscriptions?.Clear();
- log.Dispose();
- device.Dispose();
- }
- }
-}
\ No newline at end of file
diff --git a/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableContext.cs b/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableContext.cs
index 0666c0d8b6..5732dd1389 100644
--- a/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableContext.cs
+++ b/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableContext.cs
@@ -21,6 +21,8 @@ namespace Tsavorite.core
/// Indicates whether this struct has been initialized
public bool IsNull => clientSession is null;
+ const int KeyLockMaxRetryAttempts = 1000;
+
internal LockableContext(ClientSession clientSession)
{
this.clientSession = clientSession;
@@ -55,7 +57,7 @@ internal LockableContext(ClientSession(TLockableKey[] keys, int start, int count) where TLockableKey : ILockableKey => clientSession.SortKeyHashes(keys, start, count);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- internal static void DoInternalLock(TsavoriteSession tsavoriteSession, ClientSession clientSession,
+ internal static bool DoInternalLock(TsavoriteSession tsavoriteSession, ClientSession clientSession,
TLockableKey[] keys, int start, int count)
where TsavoriteSession : ITsavoriteSession
where TLockableKey : ILockableKey
@@ -65,6 +67,7 @@ internal LockableContext(ClientSession(status, tsavoriteSession);
+ retryCount++;
+ if (retryCount >= KeyLockMaxRetryAttempts)
+ {
+ return false;
+ }
goto Retry;
}
}
// We reached the end of the list, possibly after a duplicate keyhash; all locks were successful.
- return;
+ return true;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -224,15 +232,18 @@ public void Lock(TLockableKey[] keys, int start, int count)
{
clientSession.CheckIsAcquiredLockable();
Debug.Assert(!clientSession.store.epoch.ThisInstanceProtected(), "Trying to protect an already-protected epoch for LockableUnsafeContext.Lock()");
-
- clientSession.UnsafeResumeThread();
- try
+ bool lockAquired = false;
+ while (!lockAquired)
{
- DoInternalLock(TsavoriteSession, clientSession, keys, start, count);
- }
- finally
- {
- clientSession.UnsafeSuspendThread();
+ clientSession.UnsafeResumeThread();
+ try
+ {
+ lockAquired = DoInternalLock(TsavoriteSession, clientSession, keys, start, count);
+ }
+ finally
+ {
+ clientSession.UnsafeSuspendThread();
+ }
}
}
diff --git a/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableUnsafeContext.cs b/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableUnsafeContext.cs
index 901e2e3ae6..45a6d2e644 100644
--- a/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableUnsafeContext.cs
+++ b/libs/storage/Tsavorite/cs/src/core/ClientSession/LockableUnsafeContext.cs
@@ -74,8 +74,16 @@ public void Lock(TLockableKey[] keys, int start, int count)
{
clientSession.CheckIsAcquiredLockable();
Debug.Assert(clientSession.store.epoch.ThisInstanceProtected(), "Epoch protection required for LockableUnsafeContext.Lock()");
-
- LockableContext.DoInternalLock(TsavoriteSession, clientSession, keys, start, count);
+ while (true)
+ {
+ if (LockableContext.DoInternalLock(TsavoriteSession, clientSession, keys, start, count))
+ {
+ break;
+ }
+ // Suspend and resume epoch protection to give others a fair chance to progress
+ clientSession.store.epoch.Suspend();
+ clientSession.store.epoch.Resume();
+ }
}
///