From 625602683636bd9c29a86ff07ee0143f8d8c50f0 Mon Sep 17 00:00:00 2001 From: Vasileios Zois <96085550+vazois@users.noreply.github.com> Date: Thu, 16 Apr 2026 12:55:42 -0700 Subject: [PATCH] expose VectorSetReplayTaskCount parameter (#1703) --- libs/host/Configuration/Options.cs | 5 +++++ libs/host/GarnetServer.cs | 2 +- libs/host/defaults.conf | 5 ++++- libs/server/Resp/Vector/VectorManager.cs | 14 +++++++------- libs/server/Servers/GarnetServerOptions.cs | 5 +++++ test/Garnet.test.cluster/ClusterTestContext.cs | 18 ++++++++++++++---- test/Garnet.test/TestUtils.cs | 16 ++++++++++++---- 7 files changed, 48 insertions(+), 17 deletions(-) diff --git a/libs/host/Configuration/Options.cs b/libs/host/Configuration/Options.cs index 4c8de57c12a..2a9ee004632 100644 --- a/libs/host/Configuration/Options.cs +++ b/libs/host/Configuration/Options.cs @@ -672,6 +672,10 @@ public IEnumerable LuaAllowedFunctions [Option("enable-vector-set-preview", Required = false, HelpText = "Enable Vector Sets (preview) - this feature (and associated commands) are incomplete, unstable, and subject to change while still in preview")] public bool EnableVectorSetPreview { get; set; } + [IntRangeValidation(0, int.MaxValue, isRequired: false)] + [Option("vector-set-replay-task-count", Required = false, HelpText = "Configure how many replay tasks are used to replay VectorSet operations at the replica (default: 0 uses the machine CPU count)")] + public int VectorSetReplayTaskCount { get; set; } + /// /// This property contains all arguments that were not parsed by the command line argument parser /// @@ -965,6 +969,7 @@ endpoint is IPEndPoint listenEp && clusterAnnounceEndpoint[0] is IPEndPoint anno ClusterReplicationReestablishmentTimeout = ClusterReplicationReestablishmentTimeout, ClusterReplicaResumeWithData = ClusterReplicaResumeWithData, EnableVectorSetPreview = EnableVectorSetPreview, + VectorSetReplayTaskCount = VectorSetReplayTaskCount }; } diff --git a/libs/host/GarnetServer.cs b/libs/host/GarnetServer.cs index e35e97fae42..977d28a3d17 100644 --- a/libs/host/GarnetServer.cs +++ b/libs/host/GarnetServer.cs @@ -324,8 +324,8 @@ private GarnetDatabase CreateDatabase(int dbId, GarnetServerOptions serverOption var (aofDevice, aof) = CreateAOF(dbId); var vectorManager = new VectorManager( - serverOptions.EnableVectorSetPreview, dbId, + serverOptions, () => Provider.GetSession(WireFormat.ASCII, null), loggerFactory ); diff --git a/libs/host/defaults.conf b/libs/host/defaults.conf index 64d99da5a37..e6eae41292b 100644 --- a/libs/host/defaults.conf +++ b/libs/host/defaults.conf @@ -453,5 +453,8 @@ "ClusterReplicaResumeWithData": false, /* Enable Vector Sets (preview) - this feature (and associated commands) are incomplete, unstable, and subject to change while still in preview */ - "EnableVectorSetPreview": false + "EnableVectorSetPreview": false, + + /* Configure how many replay tasks are used to replay VectorSet operations at the replica (default: 0 uses the machine CPU count) */ + "VectorSetReplayTaskCount": 0 } \ No newline at end of file diff --git a/libs/server/Resp/Vector/VectorManager.cs b/libs/server/Resp/Vector/VectorManager.cs index 28952996a8e..e67be9210fd 100644 --- a/libs/server/Resp/Vector/VectorManager.cs +++ b/libs/server/Resp/Vector/VectorManager.cs @@ -75,11 +75,11 @@ public sealed partial class VectorManager : IDisposable private readonly int dbId; - public VectorManager(bool enabled, int dbId, Func getCleanupSession, ILoggerFactory loggerFactory) + public VectorManager(int dbId, GarnetServerOptions serverOptions, Func getCleanupSession, ILoggerFactory loggerFactory) { this.dbId = dbId; - IsEnabled = enabled; + IsEnabled = serverOptions.EnableVectorSetPreview; // Include DB and id so we correlate to what's actually stored in the log logger = loggerFactory?.CreateLogger($"{nameof(VectorManager)}:{dbId}:{processInstanceId}"); @@ -87,16 +87,16 @@ public VectorManager(bool enabled, int dbId, Func getCleanupSe replicationBlockEvent = CountingEventSlim.Create(); replicationReplayChannel = Channel.CreateUnbounded(new() { SingleWriter = true, SingleReader = false, AllowSynchronousContinuations = false }); - // TODO: Pull this off a config or something - replicationReplayTasks = new Task[Environment.ProcessorCount]; + if (serverOptions.VectorSetReplayTaskCount < 0 || serverOptions.VectorSetReplayTaskCount > Environment.ProcessorCount) + throw new GarnetException($"VectorSetReplayTaskCount should be in range [0,{Environment.ProcessorCount}]!"); + var vectorSetReplayCount = serverOptions.VectorSetReplayTaskCount == 0 ? Environment.ProcessorCount : serverOptions.VectorSetReplayTaskCount; + replicationReplayTasks = new Task[vectorSetReplayCount]; for (var i = 0; i < replicationReplayTasks.Length; i++) { replicationReplayTasks[i] = Task.CompletedTask; } - // TODO: Probably configurable? - // For now, just number of processors - vectorSetLocks = new(Environment.ProcessorCount); + vectorSetLocks = new(vectorSetReplayCount); this.getCleanupSession = getCleanupSession; cleanupTaskChannel = Channel.CreateUnbounded(new() { SingleWriter = false, SingleReader = true, AllowSynchronousContinuations = false }); diff --git a/libs/server/Servers/GarnetServerOptions.cs b/libs/server/Servers/GarnetServerOptions.cs index f3669cf070b..d4c24e5dca6 100644 --- a/libs/server/Servers/GarnetServerOptions.cs +++ b/libs/server/Servers/GarnetServerOptions.cs @@ -539,6 +539,11 @@ public class GarnetServerOptions : ServerOptions /// public bool EnableVectorSetPreview = false; + /// + /// Configure how many replay tasks are used to replay VectorSet operations at the replica (default: 0 uses the machine CPU count). + /// + public int VectorSetReplayTaskCount = 0; + /// /// Get the directory name for database checkpoints /// diff --git a/test/Garnet.test.cluster/ClusterTestContext.cs b/test/Garnet.test.cluster/ClusterTestContext.cs index 449799d5444..b74f935cb75 100644 --- a/test/Garnet.test.cluster/ClusterTestContext.cs +++ b/test/Garnet.test.cluster/ClusterTestContext.cs @@ -245,6 +245,7 @@ public void RegisterCustomTxn(string name, Func proc /// /// /// + /// public void CreateInstances( int shards, bool enableCluster = true, @@ -293,7 +294,9 @@ public void CreateInstances( int replicaSyncTimeout = 60, int expiredObjectCollectionFrequencySecs = 0, ClusterPreferredEndpointType clusterPreferredEndpointType = ClusterPreferredEndpointType.Ip, - bool useClusterAnnounceHostname = false) + bool useClusterAnnounceHostname = false, + int vectorSetReplayTaskCount = 0, + int threadPoolMinIOCompletionThreads = 0) { var ipAddress = IPAddress.Loopback; TestUtils.EndPoint = new IPEndPoint(ipAddress, 7000); @@ -350,7 +353,9 @@ public void CreateInstances( replicaSyncTimeout: replicaSyncTimeout, expiredObjectCollectionFrequencySecs: expiredObjectCollectionFrequencySecs, clusterPreferredEndpointType: clusterPreferredEndpointType, - clusterAnnounceHostname: useClusterAnnounceHostname ? "localhost" : null); + clusterAnnounceHostname: useClusterAnnounceHostname ? "localhost" : null, + vectorSetReplayTaskCount: vectorSetReplayTaskCount, + threadPoolMinIOCompletionThreads: threadPoolMinIOCompletionThreads); foreach (var node in nodes) node.Start(); @@ -364,6 +369,7 @@ public void CreateInstances( /// /// /// + /// /// /// /// @@ -383,8 +389,10 @@ public void CreateInstances( /// /// /// - /// + /// + /// /// + /// /// public GarnetServer CreateInstance( EndPoint endpoint, @@ -410,6 +418,7 @@ public GarnetServer CreateInstance( bool useTLS = false, bool useAcl = false, bool asyncReplay = false, + int vectorSetReplayTaskCount = 0, EndPoint clusterAnnounceEndpoint = null, X509CertificateCollection certificates = null, ServerCredential clusterCreds = new ServerCredential()) @@ -445,7 +454,8 @@ public GarnetServer CreateInstance( authUsername: clusterCreds.user, authPassword: clusterCreds.password, certificates: certificates, - clusterAnnounceEndpoint: clusterAnnounceEndpoint); + clusterAnnounceEndpoint: clusterAnnounceEndpoint, + vectorSetReplayTaskCount: vectorSetReplayTaskCount); return new GarnetServer(opts, loggerFactory); } diff --git a/test/Garnet.test/TestUtils.cs b/test/Garnet.test/TestUtils.cs index 66fa38f0954..747d1a1c623 100644 --- a/test/Garnet.test/TestUtils.cs +++ b/test/Garnet.test/TestUtils.cs @@ -525,7 +525,9 @@ public static (GarnetServer[] Nodes, GarnetServerOptions[] Options) CreateGarnet int replicaSyncTimeout = 60, int expiredObjectCollectionFrequencySecs = 0, ClusterPreferredEndpointType clusterPreferredEndpointType = ClusterPreferredEndpointType.Ip, - string clusterAnnounceHostname = null) + string clusterAnnounceHostname = null, + int vectorSetReplayTaskCount = 0, + int threadPoolMinIOCompletionThreads = 0) { if (UseAzureStorage) IgnoreIfNotRunningAzureTests(); @@ -590,7 +592,9 @@ public static (GarnetServer[] Nodes, GarnetServerOptions[] Options) CreateGarnet replicaSyncTimeout: replicaSyncTimeout, expiredObjectCollectionFrequencySecs: expiredObjectCollectionFrequencySecs, clusterPreferredEndpointType: clusterPreferredEndpointType, - clusterAnnounceHostname: clusterAnnounceHostname); + clusterAnnounceHostname: clusterAnnounceHostname, + vectorSetReplayTaskCount: vectorSetReplayTaskCount, + threadPoolMinIOCompletionThreads: threadPoolMinIOCompletionThreads); ClassicAssert.IsNotNull(opts); @@ -670,7 +674,9 @@ public static GarnetServerOptions GetGarnetServerOptions( int expiredObjectCollectionFrequencySecs = 0, ClusterPreferredEndpointType clusterPreferredEndpointType = ClusterPreferredEndpointType.Ip, string clusterAnnounceHostname = null, - bool enableVectorSetPreview = true) + bool enableVectorSetPreview = true, + int vectorSetReplayTaskCount = 0, + int threadPoolMinIOCompletionThreads = 0) { if (useAzureStorage) IgnoreIfNotRunningAzureTests(); @@ -795,7 +801,9 @@ public static GarnetServerOptions GetGarnetServerOptions( ClusterReplicaResumeWithData = clusterReplicaResumeWithData, ReplicaSyncTimeout = replicaSyncTimeout <= 0 ? Timeout.InfiniteTimeSpan : TimeSpan.FromSeconds(replicaSyncTimeout), EnableVectorSetPreview = enableVectorSetPreview, - ExpiredObjectCollectionFrequencySecs = expiredObjectCollectionFrequencySecs + VectorSetReplayTaskCount = vectorSetReplayTaskCount, + ExpiredObjectCollectionFrequencySecs = expiredObjectCollectionFrequencySecs, + ThreadPoolMinIOCompletionThreads = threadPoolMinIOCompletionThreads, }; if (lowMemory)