Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/microsoft/garnet into pubsu…
Browse files Browse the repository at this point in the history
…b_fix
  • Loading branch information
vbandikatla committed Apr 25, 2024
2 parents 7f0de3b + 02649c9 commit f21deaa
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 58 deletions.
20 changes: 0 additions & 20 deletions libs/common/IKeyInputSerializer.cs

This file was deleted.

5 changes: 1 addition & 4 deletions libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public class GarnetServer : IDisposable
private TsavoriteKV<byte[], IGarnetObject> objectStore;
private IDevice aofDevice;
private TsavoriteLog appendOnlyFile;
private SubscribeKVBroker<SpanByte, SpanByte, SpanByte, IKeyInputSerializer<SpanByte, SpanByte>> kvBroker;
private SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>> broker;
private LogSettings logSettings, objLogSettings;
private INamedDeviceFactory logFactory;
Expand Down Expand Up @@ -239,7 +238,6 @@ private void InitializeServer()

if (!opts.DisablePubSub)
{
kvBroker = new SubscribeKVBroker<SpanByte, SpanByte, SpanByte, IKeyInputSerializer<SpanByte, SpanByte>>(new SpanByteKeySerializer(), null, opts.PubSubPageSizeBytes(), true);
broker = new SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>>(new SpanByteKeySerializer(), null, opts.PubSubPageSizeBytes(), true);
}

Expand Down Expand Up @@ -281,7 +279,7 @@ private void InitializeServer()
storeWrapper = new StoreWrapper(version, redisProtocolVersion, server, store, objectStore, objectStoreSizeTracker, customCommandManager, appendOnlyFile, opts, clusterFactory: clusterFactory, loggerFactory: loggerFactory);

// Create session provider for Garnet
Provider = new GarnetProvider(storeWrapper, kvBroker, broker);
Provider = new GarnetProvider(storeWrapper, broker);

// Create user facing API endpoints
Metrics = new MetricsApi(Provider);
Expand Down Expand Up @@ -335,7 +333,6 @@ private void InternalDispose()
Provider?.Dispose();
server.Dispose();
broker?.Dispose();
kvBroker?.Dispose();
store.Dispose();
appendOnlyFile?.Dispose();
aofDevice?.Dispose();
Expand Down
4 changes: 1 addition & 3 deletions libs/server/Providers/GarnetProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ public sealed class GarnetProvider : TsavoriteKVProviderBase<SpanByte, SpanByte,
/// Create SpanByte TsavoriteKV backend for Garnet
/// </summary>
/// <param name="storeWrapper"></param>
/// <param name="kvBroker"></param>
/// <param name="broker"></param>
/// <param name="maxSizeSettings"></param>
public GarnetProvider(StoreWrapper storeWrapper,
SubscribeKVBroker<SpanByte, SpanByte, SpanByte, IKeyInputSerializer<SpanByte, SpanByte>> kvBroker = null,
SubscribeBroker<SpanByte, SpanByte, IKeySerializer<SpanByte>> broker = null,
MaxSizeSettings maxSizeSettings = default)
: base(storeWrapper.store, new(), kvBroker, broker, false, maxSizeSettings)
: base(storeWrapper.store, new(), broker, false, maxSizeSettings)
{
this.storeWrapper = storeWrapper;
}
Expand Down
9 changes: 1 addition & 8 deletions libs/server/Providers/TsavoriteKVProviderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ public abstract class TsavoriteKVProviderBase<Key, Value, Input, Output, Functio
/// </summary>
protected readonly ParameterSerializer serializer;

/// <summary>
/// KV broker
/// </summary>
protected readonly SubscribeKVBroker<Key, Value, Input, IKeyInputSerializer<Key, Input>> kvBroker;

/// <summary>
/// Broker
/// </summary>
Expand All @@ -45,11 +40,10 @@ public abstract class TsavoriteKVProviderBase<Key, Value, Input, Output, Functio
/// </summary>
/// <param name="store"></param>
/// <param name="serializer"></param>
/// <param name="kvBroker"></param>
/// <param name="broker"></param>
/// <param name="recoverStore"></param>
/// <param name="maxSizeSettings"></param>
public TsavoriteKVProviderBase(TsavoriteKV<Key, Value> store, ParameterSerializer serializer, SubscribeKVBroker<Key, Value, Input, IKeyInputSerializer<Key, Input>> kvBroker = null, SubscribeBroker<Key, Value, IKeySerializer<Key>> broker = null, bool recoverStore = false, MaxSizeSettings maxSizeSettings = default)
public TsavoriteKVProviderBase(TsavoriteKV<Key, Value> store, ParameterSerializer serializer, SubscribeBroker<Key, Value, IKeySerializer<Key>> broker = null, bool recoverStore = false, MaxSizeSettings maxSizeSettings = default)
{
this.store = store;
if (recoverStore)
Expand All @@ -61,7 +55,6 @@ public TsavoriteKVProviderBase(TsavoriteKV<Key, Value> store, ParameterSerialize
catch
{ }
}
this.kvBroker = kvBroker;
this.broker = broker;
this.serializer = serializer;
this.maxSizeSettings = maxSizeSettings ?? new MaxSizeSettings();
Expand Down
13 changes: 2 additions & 11 deletions libs/server/PubSub/SpanByteKeySerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
namespace Garnet.server
{
/// <summary>
/// Serializer for SpanByte. Used only on server-side.
/// Serializer for SpanByte.
/// </summary>
public sealed unsafe class SpanByteKeySerializer : IKeyInputSerializer<SpanByte, SpanByte>
public sealed unsafe class SpanByteKeySerializer : IKeySerializer<SpanByte>
{
/// <inheritdoc />
[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand All @@ -22,15 +22,6 @@ public sealed unsafe class SpanByteKeySerializer : IKeyInputSerializer<SpanByte,
return ref ret;
}

/// <inheritdoc />
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ref SpanByte ReadInputByRef(ref byte* src)
{
ref var ret = ref Unsafe.AsRef<SpanByte>(src);
src += ret.TotalSize;
return ref ret;
}

/// <inheritdoc />
public bool Match(ref SpanByte k, bool asciiKey, ref SpanByte pattern, bool asciiPattern)
{
Expand Down
31 changes: 21 additions & 10 deletions libs/storage/Tsavorite/cs/src/core/ClientSession/LockableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ namespace Tsavorite.core
/// <summary>Indicates whether this struct has been initialized</summary>
public bool IsNull => clientSession is null;

const int KeyLockMaxRetryAttempts = 1000;

internal LockableContext(ClientSession<Key, Value, Input, Output, Context, Functions> clientSession)
{
this.clientSession = clientSession;
Expand Down Expand Up @@ -55,7 +57,7 @@ internal LockableContext(ClientSession<Key, Value, Input, Output, Context, Funct
public void SortKeyHashes<TLockableKey>(TLockableKey[] keys, int start, int count) where TLockableKey : ILockableKey => clientSession.SortKeyHashes(keys, start, count);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static void DoInternalLock<TsavoriteSession, TLockableKey>(TsavoriteSession tsavoriteSession, ClientSession<Key, Value, Input, Output, Context, Functions> clientSession,
internal static bool DoInternalLock<TsavoriteSession, TLockableKey>(TsavoriteSession tsavoriteSession, ClientSession<Key, Value, Input, Output, Context, Functions> clientSession,
TLockableKey[] keys, int start, int count)
where TsavoriteSession : ITsavoriteSession<Key, Value, Input, Output, Context>
where TLockableKey : ILockableKey
Expand All @@ -65,6 +67,7 @@ internal LockableContext(ClientSession<Key, Value, Input, Output, Context, Funct
// This is the same as DoInternalTryLock but without timeout; it will keep trying until it acquires all locks.
var end = start + count - 1;

int retryCount = 0;
Retry:
long prevBucketIndex = -1;

Expand All @@ -84,12 +87,17 @@ internal LockableContext(ClientSession<Key, Value, Input, Output, Context, Funct

// We've released our locks so this refresh will let other threads advance and release their locks, and we will retry with a full timeout.
clientSession.store.HandleImmediateNonPendingRetryStatus<Input, Output, Context, TsavoriteSession>(status, tsavoriteSession);
retryCount++;
if (retryCount >= KeyLockMaxRetryAttempts)
{
return false;
}
goto Retry;
}
}

// We reached the end of the list, possibly after a duplicate keyhash; all locks were successful.
return;
return true;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down Expand Up @@ -224,15 +232,18 @@ public void Lock<TLockableKey>(TLockableKey[] keys, int start, int count)
{
clientSession.CheckIsAcquiredLockable();
Debug.Assert(!clientSession.store.epoch.ThisInstanceProtected(), "Trying to protect an already-protected epoch for LockableUnsafeContext.Lock()");

clientSession.UnsafeResumeThread();
try
bool lockAquired = false;
while (!lockAquired)
{
DoInternalLock(TsavoriteSession, clientSession, keys, start, count);
}
finally
{
clientSession.UnsafeSuspendThread();
clientSession.UnsafeResumeThread();
try
{
lockAquired = DoInternalLock(TsavoriteSession, clientSession, keys, start, count);
}
finally
{
clientSession.UnsafeSuspendThread();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,16 @@ public void Lock<TLockableKey>(TLockableKey[] keys, int start, int count)
{
clientSession.CheckIsAcquiredLockable();
Debug.Assert(clientSession.store.epoch.ThisInstanceProtected(), "Epoch protection required for LockableUnsafeContext.Lock()");

LockableContext<Key, Value, Input, Output, Context, Functions>.DoInternalLock(TsavoriteSession, clientSession, keys, start, count);
while (true)
{
if (LockableContext<Key, Value, Input, Output, Context, Functions>.DoInternalLock(TsavoriteSession, clientSession, keys, start, count))
{
break;
}
// Suspend and resume epoch protection to give others a fair chance to progress
clientSession.store.epoch.Suspend();
clientSession.store.epoch.Resume();
}
}

/// <inheritdoc/>
Expand Down

0 comments on commit f21deaa

Please sign in to comment.