Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions libs/host/Configuration/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,10 @@ public IEnumerable<string> LuaAllowedFunctions
[Option("enable-vector-set-preview", Required = false, HelpText = "Enable Vector Sets (preview) - this feature (and associated commands) are incomplete, unstable, and subject to change while still in preview")]
public bool EnableVectorSetPreview { get; set; }

[IntRangeValidation(0, int.MaxValue, isRequired: false)]
[Option("vector-set-replay-task-count", Required = false, HelpText = "Configure how many replay tasks are used to replay VectorSet operations at the replica (default: 0 uses the machine CPU count)")]
public int VectorSetReplayTaskCount { get; set; }
Comment thread
vazois marked this conversation as resolved.

Comment thread
vazois marked this conversation as resolved.
/// <summary>
/// This property contains all arguments that were not parsed by the command line argument parser
/// </summary>
Expand Down Expand Up @@ -965,6 +969,7 @@ endpoint is IPEndPoint listenEp && clusterAnnounceEndpoint[0] is IPEndPoint anno
ClusterReplicationReestablishmentTimeout = ClusterReplicationReestablishmentTimeout,
ClusterReplicaResumeWithData = ClusterReplicaResumeWithData,
EnableVectorSetPreview = EnableVectorSetPreview,
VectorSetReplayTaskCount = VectorSetReplayTaskCount
};
}

Expand Down
2 changes: 1 addition & 1 deletion libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,8 @@ private GarnetDatabase CreateDatabase(int dbId, GarnetServerOptions serverOption
var (aofDevice, aof) = CreateAOF(dbId);

var vectorManager = new VectorManager(
serverOptions.EnableVectorSetPreview,
dbId,
serverOptions,
() => Provider.GetSession(WireFormat.ASCII, null),
loggerFactory
);
Expand Down
5 changes: 4 additions & 1 deletion libs/host/defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -453,5 +453,8 @@
"ClusterReplicaResumeWithData": false,

/* Enable Vector Sets (preview) - this feature (and associated commands) are incomplete, unstable, and subject to change while still in preview */
"EnableVectorSetPreview": false
"EnableVectorSetPreview": false,

/* Configure how many replay tasks are used to replay VectorSet operations at the replica (default: 0 uses the machine CPU count) */
"VectorSetReplayTaskCount": 0
}
14 changes: 7 additions & 7 deletions libs/server/Resp/Vector/VectorManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,28 +75,28 @@ public sealed partial class VectorManager : IDisposable

private readonly int dbId;

public VectorManager(bool enabled, int dbId, Func<IMessageConsumer> getCleanupSession, ILoggerFactory loggerFactory)
public VectorManager(int dbId, GarnetServerOptions serverOptions, Func<IMessageConsumer> getCleanupSession, ILoggerFactory loggerFactory)
{
this.dbId = dbId;

IsEnabled = enabled;
IsEnabled = serverOptions.EnableVectorSetPreview;

// Include DB and id so we correlate to what's actually stored in the log
logger = loggerFactory?.CreateLogger($"{nameof(VectorManager)}:{dbId}:{processInstanceId}");

replicationBlockEvent = CountingEventSlim.Create();
replicationReplayChannel = Channel.CreateUnbounded<VADDReplicationState>(new() { SingleWriter = true, SingleReader = false, AllowSynchronousContinuations = false });

// TODO: Pull this off a config or something
replicationReplayTasks = new Task[Environment.ProcessorCount];
if (serverOptions.VectorSetReplayTaskCount < 0 || serverOptions.VectorSetReplayTaskCount > Environment.ProcessorCount)
throw new GarnetException($"VectorSetReplayTaskCount should be in range [0,{Environment.ProcessorCount}]!");
var vectorSetReplayCount = serverOptions.VectorSetReplayTaskCount == 0 ? Environment.ProcessorCount : serverOptions.VectorSetReplayTaskCount;
Comment thread
vazois marked this conversation as resolved.
replicationReplayTasks = new Task[vectorSetReplayCount];
for (var i = 0; i < replicationReplayTasks.Length; i++)
{
replicationReplayTasks[i] = Task.CompletedTask;
}

// TODO: Probably configurable?
// For now, just number of processors
vectorSetLocks = new(Environment.ProcessorCount);
vectorSetLocks = new(vectorSetReplayCount);

this.getCleanupSession = getCleanupSession;
cleanupTaskChannel = Channel.CreateUnbounded<object>(new() { SingleWriter = false, SingleReader = true, AllowSynchronousContinuations = false });
Expand Down
5 changes: 5 additions & 0 deletions libs/server/Servers/GarnetServerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,11 @@ public class GarnetServerOptions : ServerOptions
/// </summary>
public bool EnableVectorSetPreview = false;

/// <summary>
/// Configure how many replay tasks are used to replay VectorSet operations at the replica (default: 0 uses the machine CPU count).
Comment thread
vazois marked this conversation as resolved.
/// </summary>
public int VectorSetReplayTaskCount = 0;

/// <summary>
/// Get the directory name for database checkpoints
/// </summary>
Expand Down
12 changes: 10 additions & 2 deletions test/Garnet.test.cluster/ClusterTestContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ public void RegisterCustomTxn(string name, Func<CustomTransactionProcedure> proc
/// <param name="expiredObjectCollectionFrequencySecs"></param>
/// <param name="clusterPreferredEndpointType"></param>
/// <param name="useClusterAnnounceHostname"></param>
/// <param name="vectorSetReplayTaskCount"></param>
public void CreateInstances(
int shards,
bool enableCluster = true,
Expand Down Expand Up @@ -294,6 +295,7 @@ public void CreateInstances(
int expiredObjectCollectionFrequencySecs = 0,
ClusterPreferredEndpointType clusterPreferredEndpointType = ClusterPreferredEndpointType.Ip,
bool useClusterAnnounceHostname = false,
int vectorSetReplayTaskCount = 0,
int threadPoolMinIOCompletionThreads = 0)
{
var ipAddress = IPAddress.Loopback;
Expand Down Expand Up @@ -352,6 +354,7 @@ public void CreateInstances(
expiredObjectCollectionFrequencySecs: expiredObjectCollectionFrequencySecs,
clusterPreferredEndpointType: clusterPreferredEndpointType,
clusterAnnounceHostname: useClusterAnnounceHostname ? "localhost" : null,
vectorSetReplayTaskCount: vectorSetReplayTaskCount,
threadPoolMinIOCompletionThreads: threadPoolMinIOCompletionThreads);

foreach (var node in nodes)
Expand All @@ -366,6 +369,7 @@ public void CreateInstances(
/// <param name="endpoint"></param>
/// <param name="enableCluster"></param>
/// <param name="cleanClusterConfig"></param>
/// <param name="disableEpochCollision"></param>
/// <param name="tryRecover"></param>
/// <param name="disableObjects"></param>
/// <param name="lowMemory"></param>
Expand All @@ -385,8 +389,10 @@ public void CreateInstances(
/// <param name="useTLS"></param>
/// <param name="useAcl"></param>
/// <param name="asyncReplay"></param>
/// <param name="clusterCreds"></param>
/// <param name="vectorSetReplayTaskCount"></param>
/// <param name="clusterAnnounceEndpoint"></param>
/// <param name="certificates"></param>
/// <param name="clusterCreds"></param>
/// <returns></returns>
public GarnetServer CreateInstance(
EndPoint endpoint,
Expand All @@ -412,6 +418,7 @@ public GarnetServer CreateInstance(
bool useTLS = false,
bool useAcl = false,
bool asyncReplay = false,
int vectorSetReplayTaskCount = 0,
EndPoint clusterAnnounceEndpoint = null,
X509CertificateCollection certificates = null,
ServerCredential clusterCreds = new ServerCredential())
Expand Down Expand Up @@ -447,7 +454,8 @@ public GarnetServer CreateInstance(
authUsername: clusterCreds.user,
authPassword: clusterCreds.password,
certificates: certificates,
clusterAnnounceEndpoint: clusterAnnounceEndpoint);
clusterAnnounceEndpoint: clusterAnnounceEndpoint,
vectorSetReplayTaskCount: vectorSetReplayTaskCount);

return new GarnetServer(opts, loggerFactory);
}
Expand Down
6 changes: 5 additions & 1 deletion test/Garnet.test/TestUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ public static (GarnetServer[] Nodes, GarnetServerOptions[] Options) CreateGarnet
int expiredObjectCollectionFrequencySecs = 0,
ClusterPreferredEndpointType clusterPreferredEndpointType = ClusterPreferredEndpointType.Ip,
string clusterAnnounceHostname = null,
int vectorSetReplayTaskCount = 0,
int threadPoolMinIOCompletionThreads = 0)
{
if (UseAzureStorage)
Expand Down Expand Up @@ -592,6 +593,7 @@ public static (GarnetServer[] Nodes, GarnetServerOptions[] Options) CreateGarnet
expiredObjectCollectionFrequencySecs: expiredObjectCollectionFrequencySecs,
clusterPreferredEndpointType: clusterPreferredEndpointType,
clusterAnnounceHostname: clusterAnnounceHostname,
vectorSetReplayTaskCount: vectorSetReplayTaskCount,
threadPoolMinIOCompletionThreads: threadPoolMinIOCompletionThreads);

ClassicAssert.IsNotNull(opts);
Expand Down Expand Up @@ -673,6 +675,7 @@ public static GarnetServerOptions GetGarnetServerOptions(
ClusterPreferredEndpointType clusterPreferredEndpointType = ClusterPreferredEndpointType.Ip,
string clusterAnnounceHostname = null,
bool enableVectorSetPreview = true,
int vectorSetReplayTaskCount = 0,
int threadPoolMinIOCompletionThreads = 0)
{
if (useAzureStorage)
Expand Down Expand Up @@ -798,8 +801,9 @@ public static GarnetServerOptions GetGarnetServerOptions(
ClusterReplicaResumeWithData = clusterReplicaResumeWithData,
ReplicaSyncTimeout = replicaSyncTimeout <= 0 ? Timeout.InfiniteTimeSpan : TimeSpan.FromSeconds(replicaSyncTimeout),
EnableVectorSetPreview = enableVectorSetPreview,
VectorSetReplayTaskCount = vectorSetReplayTaskCount,
ExpiredObjectCollectionFrequencySecs = expiredObjectCollectionFrequencySecs,
ThreadPoolMinIOCompletionThreads = threadPoolMinIOCompletionThreads
ThreadPoolMinIOCompletionThreads = threadPoolMinIOCompletionThreads,
};

if (lowMemory)
Expand Down
Loading