From cfe7074b681c3b8b6d6b9f956215f821126f9d84 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Tue, 14 Apr 2026 13:07:25 -0700 Subject: [PATCH] expose VectorSetReplayTaskCount parameter --- libs/host/Configuration/Options.cs | 5 +++++ libs/host/GarnetServer.cs | 2 +- libs/host/defaults.conf | 5 ++++- libs/server/Resp/Vector/VectorManager.cs | 14 +++++++------- libs/server/Servers/GarnetServerOptions.cs | 5 +++++ test/Garnet.test.cluster/ClusterTestContext.cs | 16 ++++++++++++---- test/Garnet.test/TestUtils.cs | 10 +++++++--- 7 files changed, 41 insertions(+), 16 deletions(-) diff --git a/libs/host/Configuration/Options.cs b/libs/host/Configuration/Options.cs index 6cae1bebed0..351bb159967 100644 --- a/libs/host/Configuration/Options.cs +++ b/libs/host/Configuration/Options.cs @@ -629,6 +629,10 @@ public IEnumerable LuaAllowedFunctions [Option("enable-vector-set-preview", Required = false, HelpText = "Enable Vector Sets (preview) - this feature (and associated commands) are incomplete, unstable, and subject to change while still in preview")] public bool EnableVectorSetPreview { get; set; } + [IntRangeValidation(0, int.MaxValue, isRequired: false)] + [Option("vector-set-replay-task-count", Required = false, HelpText = "Configure how many replay tasks are used to replay VectorSet operations at the replica (default: 0 uses the machine CPU count)")] + public int VectorSetReplayTaskCount { get; set; } + /// /// This property contains all arguments that were not parsed by the command line argument parser /// @@ -908,6 +912,7 @@ endpoint is IPEndPoint listenEp && clusterAnnounceEndpoint[0] is IPEndPoint anno ClusterReplicationReestablishmentTimeout = ClusterReplicationReestablishmentTimeout, ClusterReplicaResumeWithData = ClusterReplicaResumeWithData, EnableVectorSetPreview = EnableVectorSetPreview, + VectorSetReplayTaskCount = VectorSetReplayTaskCount }; } diff --git a/libs/host/GarnetServer.cs b/libs/host/GarnetServer.cs index 2289c9d1dd3..00f2eba9afe 100644 --- a/libs/host/GarnetServer.cs +++ b/libs/host/GarnetServer.cs @@ -314,8 +314,8 @@ private GarnetDatabase CreateDatabase(int dbId, GarnetServerOptions serverOption var (aofDevice, aof) = CreateAOF(dbId); var vectorManager = new VectorManager( - serverOptions.EnableVectorSetPreview, dbId, + serverOptions, () => Provider.GetSession(WireFormat.ASCII, null), loggerFactory ); diff --git a/libs/host/defaults.conf b/libs/host/defaults.conf index a07ba05813b..4092c821502 100644 --- a/libs/host/defaults.conf +++ b/libs/host/defaults.conf @@ -423,5 +423,8 @@ "ClusterReplicaResumeWithData": false, /* Enable Vector Sets (preview) - this feature (and associated commands) are incomplete, unstable, and subject to change while still in preview */ - "EnableVectorSetPreview": false + "EnableVectorSetPreview": false, + + /* Configure how many replay tasks are used to replay VectorSet operations at the replica (default: 0 uses the machine CPU count) */ + "VectorSetReplayTaskCount": 0 } \ No newline at end of file diff --git a/libs/server/Resp/Vector/VectorManager.cs b/libs/server/Resp/Vector/VectorManager.cs index e7b8d5e9d4f..0d08de4062b 100644 --- a/libs/server/Resp/Vector/VectorManager.cs +++ b/libs/server/Resp/Vector/VectorManager.cs @@ -81,11 +81,11 @@ public sealed partial class VectorManager : IDisposable private readonly int dbId; - public VectorManager(bool enabled, int dbId, Func getCleanupSession, ILoggerFactory loggerFactory) + public VectorManager(int dbId, GarnetServerOptions serverOptions, Func getCleanupSession, ILoggerFactory loggerFactory) { this.dbId = dbId; - IsEnabled = enabled; + IsEnabled = serverOptions.EnableVectorSetPreview; // Include DB and id so we correlate to what's actually stored in the log logger = loggerFactory?.CreateLogger($"{nameof(VectorManager)}:{dbId}:{processInstanceId}"); @@ -93,16 +93,16 @@ public VectorManager(bool enabled, int dbId, Func getCleanupSe replicationBlockEvent = CountingEventSlim.Create(); replicationReplayChannel = Channel.CreateUnbounded(new() { SingleWriter = true, SingleReader = false, AllowSynchronousContinuations = false }); - // TODO: Pull this off a config or something - replicationReplayTasks = new Task[Environment.ProcessorCount]; + if (serverOptions.VectorSetReplayTaskCount < 0 || serverOptions.VectorSetReplayTaskCount > Environment.ProcessorCount) + throw new GarnetException($"VectorSetReplayTaskCount should be in range [0,{Environment.ProcessorCount}]!"); + var vectorSetReplayCount = serverOptions.VectorSetReplayTaskCount == 0 ? Environment.ProcessorCount : serverOptions.VectorSetReplayTaskCount; + replicationReplayTasks = new Task[vectorSetReplayCount]; for (var i = 0; i < replicationReplayTasks.Length; i++) { replicationReplayTasks[i] = Task.CompletedTask; } - // TODO: Probably configurable? - // For now, just number of processors - vectorSetLocks = new(Environment.ProcessorCount); + vectorSetLocks = new(vectorSetReplayCount); this.getCleanupSession = getCleanupSession; cleanupTaskChannel = Channel.CreateUnbounded(new() { SingleWriter = false, SingleReader = true, AllowSynchronousContinuations = false }); diff --git a/libs/server/Servers/GarnetServerOptions.cs b/libs/server/Servers/GarnetServerOptions.cs index ce6a6d187b8..86ef1efc4f6 100644 --- a/libs/server/Servers/GarnetServerOptions.cs +++ b/libs/server/Servers/GarnetServerOptions.cs @@ -502,6 +502,11 @@ public class GarnetServerOptions : ServerOptions /// public bool EnableVectorSetPreview = false; + /// + /// Configure how many replay tasks are used to replay VectorSet operations at the replica (default: 0 uses the machine CPU count). + /// + public int VectorSetReplayTaskCount = 0; + /// /// Get the directory name for database checkpoints /// diff --git a/test/Garnet.test.cluster/ClusterTestContext.cs b/test/Garnet.test.cluster/ClusterTestContext.cs index b310e54ef46..4832d46e8a2 100644 --- a/test/Garnet.test.cluster/ClusterTestContext.cs +++ b/test/Garnet.test.cluster/ClusterTestContext.cs @@ -245,6 +245,7 @@ public void RegisterCustomTxn(string name, Func proc /// /// /// + /// public void CreateInstances( int shards, bool enableCluster = true, @@ -293,7 +294,8 @@ public void CreateInstances( int replicaSyncTimeout = 60, int expiredObjectCollectionFrequencySecs = 0, ClusterPreferredEndpointType clusterPreferredEndpointType = ClusterPreferredEndpointType.Ip, - bool useClusterAnnounceHostname = false) + bool useClusterAnnounceHostname = false, + int vectorSetReplayTaskCount = 0) { var ipAddress = IPAddress.Loopback; TestUtils.EndPoint = new IPEndPoint(ipAddress, 7000); @@ -350,7 +352,8 @@ public void CreateInstances( replicaSyncTimeout: replicaSyncTimeout, expiredObjectCollectionFrequencySecs: expiredObjectCollectionFrequencySecs, clusterPreferredEndpointType: clusterPreferredEndpointType, - clusterAnnounceHostname: useClusterAnnounceHostname ? "localhost" : null); + clusterAnnounceHostname: useClusterAnnounceHostname ? "localhost" : null, + vectorSetReplayTaskCount: vectorSetReplayTaskCount); foreach (var node in nodes) node.Start(); @@ -364,6 +367,7 @@ public void CreateInstances( /// /// /// + /// /// /// /// @@ -383,8 +387,10 @@ public void CreateInstances( /// /// /// - /// + /// + /// /// + /// /// public GarnetServer CreateInstance( EndPoint endpoint, @@ -410,6 +416,7 @@ public GarnetServer CreateInstance( bool useTLS = false, bool useAcl = false, bool asyncReplay = false, + int vectorSetReplayTaskCount = 0, EndPoint clusterAnnounceEndpoint = null, X509CertificateCollection certificates = null, ServerCredential clusterCreds = new ServerCredential()) @@ -445,7 +452,8 @@ public GarnetServer CreateInstance( authUsername: clusterCreds.user, authPassword: clusterCreds.password, certificates: certificates, - clusterAnnounceEndpoint: clusterAnnounceEndpoint); + clusterAnnounceEndpoint: clusterAnnounceEndpoint, + vectorSetReplayTaskCount: vectorSetReplayTaskCount); return new GarnetServer(opts, loggerFactory); } diff --git a/test/Garnet.test/TestUtils.cs b/test/Garnet.test/TestUtils.cs index a430b60095f..ba39953845e 100644 --- a/test/Garnet.test/TestUtils.cs +++ b/test/Garnet.test/TestUtils.cs @@ -519,7 +519,8 @@ public static (GarnetServer[] Nodes, GarnetServerOptions[] Options) CreateGarnet int replicaSyncTimeout = 60, int expiredObjectCollectionFrequencySecs = 0, ClusterPreferredEndpointType clusterPreferredEndpointType = ClusterPreferredEndpointType.Ip, - string clusterAnnounceHostname = null) + string clusterAnnounceHostname = null, + int vectorSetReplayTaskCount = 0) { if (UseAzureStorage) IgnoreIfNotRunningAzureTests(); @@ -584,7 +585,8 @@ public static (GarnetServer[] Nodes, GarnetServerOptions[] Options) CreateGarnet replicaSyncTimeout: replicaSyncTimeout, expiredObjectCollectionFrequencySecs: expiredObjectCollectionFrequencySecs, clusterPreferredEndpointType: clusterPreferredEndpointType, - clusterAnnounceHostname: clusterAnnounceHostname); + clusterAnnounceHostname: clusterAnnounceHostname, + vectorSetReplayTaskCount: vectorSetReplayTaskCount); ClassicAssert.IsNotNull(opts); @@ -664,7 +666,8 @@ public static GarnetServerOptions GetGarnetServerOptions( int expiredObjectCollectionFrequencySecs = 0, ClusterPreferredEndpointType clusterPreferredEndpointType = ClusterPreferredEndpointType.Ip, string clusterAnnounceHostname = null, - bool enableVectorSetPreview = true) + bool enableVectorSetPreview = true, + int vectorSetReplayTaskCount = 0) { if (useAzureStorage) IgnoreIfNotRunningAzureTests(); @@ -787,6 +790,7 @@ public static GarnetServerOptions GetGarnetServerOptions( ClusterReplicaResumeWithData = clusterReplicaResumeWithData, ReplicaSyncTimeout = replicaSyncTimeout <= 0 ? Timeout.InfiniteTimeSpan : TimeSpan.FromSeconds(replicaSyncTimeout), EnableVectorSetPreview = enableVectorSetPreview, + VectorSetReplayTaskCount = vectorSetReplayTaskCount, ExpiredObjectCollectionFrequencySecs = expiredObjectCollectionFrequencySecs, };