From fe468eef1df807eb6826c08654579cea3b080930 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 24 Apr 2024 16:38:07 -0400 Subject: [PATCH] cli: add --max-concurrency flag to `debug recover` commands Informs #122639. This commit adds a `--max-concurrency` flag to the following `debug recover` commands: - `debug recover collect-info` - `debug recover make-plan` - `debug recover apply-plan` - `debug recover verify` The flag controls the maximum concurrency when fanning out RPCs to nodes in the cluster while servicing the command. It defaults to `2 x num_cpus`, which is a decent proxy for how much fanout the process can tolerate without overloading itself. Some plumbing is performed, but the flags are currently unused. Release note (cli change): Added `--max-concurrency` flag to `debug recover` commands to control the maximum concurrency when fanning out RPCs to nodes in the cluster. The flag defaults to `2 x num_cpus`. --- docs/generated/http/full.md | 6 ++ pkg/cli/debug.go | 10 ++ pkg/cli/debug_recover_loss_of_quorum.go | 40 +++++-- pkg/cli/debug_recover_loss_of_quorum_test.go | 2 + pkg/kv/kvserver/loqrecovery/collect.go | 15 ++- .../loqrecovery/collect_raft_log_test.go | 2 +- .../loqrecovery/server_integration_test.go | 100 +++++++++++++----- pkg/server/serverpb/admin.proto | 15 ++- 8 files changed, 152 insertions(+), 38 deletions(-) diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index 0bdf6235d48e..2fcae31458ca 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -7689,6 +7689,10 @@ Support status: [reserved](#support-status) +| Field | Type | Label | Description | Support status | +| ----- | ---- | ----- | ----------- | -------------- | +| max_concurrency | [int32](#cockroach.server.serverpb.RecoveryCollectReplicaInfoRequest-int32) | | MaxConcurrency is the maximum parallelism that will be used when fanning out RPCs to nodes in the cluster while servicing this request. A value of 0 disables concurrency. A negative value configures no limit for concurrency. | [reserved](#support-status) | + @@ -7799,6 +7803,7 @@ Support status: [reserved](#support-status) | all_nodes | [bool](#cockroach.server.serverpb.RecoveryStagePlanRequest-bool) | | If all nodes is true, then receiver should act as a coordinator and perform a fan-out to stage plan on all nodes of the cluster. | [reserved](#support-status) | | force_plan | [bool](#cockroach.server.serverpb.RecoveryStagePlanRequest-bool) | | ForcePlan tells receiver to ignore any plan already staged on the node if it is present and replace it with new plan (including empty one). | [reserved](#support-status) | | force_local_internal_version | [bool](#cockroach.server.serverpb.RecoveryStagePlanRequest-bool) | | ForceLocalInternalVersion tells server to update internal component of plan version to the one of active cluster version. This option needs to be set if target cluster is stuck in recovery where only part of nodes were successfully migrated. | [reserved](#support-status) | +| max_concurrency | [int32](#cockroach.server.serverpb.RecoveryStagePlanRequest-int32) | | MaxConcurrency is the maximum parallelism that will be used when fanning out RPCs to nodes in the cluster while servicing this request. A value of 0 disables concurrency. A negative value configures no limit for concurrency. | [reserved](#support-status) | @@ -7888,6 +7893,7 @@ Support status: [reserved](#support-status) | plan_id | [bytes](#cockroach.server.serverpb.RecoveryVerifyRequest-bytes) | | PlanID is ID of the plan to verify. | [reserved](#support-status) | | decommissioned_node_ids | [int32](#cockroach.server.serverpb.RecoveryVerifyRequest-int32) | repeated | DecommissionedNodeIDs is a set of nodes that should be marked as decommissioned in the cluster when loss of quorum recovery successfully applies. | [reserved](#support-status) | | max_reported_ranges | [int32](#cockroach.server.serverpb.RecoveryVerifyRequest-int32) | | MaxReportedRanges is the maximum number of failed ranges to report. If more unhealthy ranges are found, error will be returned alongside range to indicate that ranges were cut short. | [reserved](#support-status) | +| max_concurrency | [int32](#cockroach.server.serverpb.RecoveryVerifyRequest-int32) | | MaxConcurrency is the maximum parallelism that will be used when fanning out RPCs to nodes in the cluster while servicing this request. A value of 0 disables concurrency. A negative value configures no limit for concurrency. | [reserved](#support-status) | diff --git a/pkg/cli/debug.go b/pkg/cli/debug.go index 1c0195ddafbe..ac9e2f2324bc 100644 --- a/pkg/cli/debug.go +++ b/pkg/cli/debug.go @@ -1515,6 +1515,8 @@ func init() { f = debugRecoverCollectInfoCmd.Flags() f.VarP(&debugRecoverCollectInfoOpts.Stores, cliflags.RecoverStore.Name, cliflags.RecoverStore.Shorthand, cliflags.RecoverStore.Usage()) + f.IntVarP(&debugRecoverCollectInfoOpts.maxConcurrency, "max-concurrency", "c", debugRecoverDefaultMaxConcurrency, + "maximum concurrency when fanning out RPCs to nodes in the cluster") f = debugRecoverPlanCmd.Flags() f.StringVarP(&debugRecoverPlanOpts.outputFileName, "plan", "o", "", @@ -1528,6 +1530,8 @@ func init() { f.BoolVar(&debugRecoverPlanOpts.force, "force", false, "force creation of plan even when problems were encountered; applying this plan may "+ "result in additional problems and should be done only with care and as a last resort") + f.IntVarP(&debugRecoverPlanOpts.maxConcurrency, "max-concurrency", "c", debugRecoverDefaultMaxConcurrency, + "maximum concurrency when fanning out RPCs to nodes in the cluster") f.UintVar(&formatHelper.maxPrintedKeyLength, cliflags.PrintKeyLength.Name, formatHelper.maxPrintedKeyLength, cliflags.PrintKeyLength.Usage()) @@ -1539,6 +1543,12 @@ func init() { formatHelper.maxPrintedKeyLength, cliflags.PrintKeyLength.Usage()) f.BoolVar(&debugRecoverExecuteOpts.ignoreInternalVersion, cliflags.RecoverIgnoreInternalVersion.Name, debugRecoverExecuteOpts.ignoreInternalVersion, cliflags.RecoverIgnoreInternalVersion.Usage()) + f.IntVarP(&debugRecoverExecuteOpts.maxConcurrency, "max-concurrency", "c", debugRecoverDefaultMaxConcurrency, + "maximum concurrency when fanning out RPCs to nodes in the cluster") + + f = debugRecoverVerifyCmd.Flags() + f.IntVarP(&debugRecoverVerifyOpts.maxConcurrency, "max-concurrency", "c", debugRecoverDefaultMaxConcurrency, + "maximum concurrency when fanning out RPCs to nodes in the cluster") f = debugMergeLogsCmd.Flags() f.Var(flagutil.Time(&debugMergeLogsOpts.from), "from", diff --git a/pkg/cli/debug_recover_loss_of_quorum.go b/pkg/cli/debug_recover_loss_of_quorum.go index fc2e4a137d2d..25110e51b2dd 100644 --- a/pkg/cli/debug_recover_loss_of_quorum.go +++ b/pkg/cli/debug_recover_loss_of_quorum.go @@ -17,6 +17,7 @@ import ( "io" "os" "path" + "runtime" "sort" "strings" @@ -297,7 +298,8 @@ See debug recover command help for more details on how to use this command. } var debugRecoverCollectInfoOpts struct { - Stores base.StoreSpecList + Stores base.StoreSpecList + maxConcurrency int } func runDebugDeadReplicaCollect(cmd *cobra.Command, args []string) error { @@ -316,7 +318,7 @@ func runDebugDeadReplicaCollect(cmd *cobra.Command, args []string) error { return errors.Wrapf(err, "failed to get admin connection to cluster") } defer finish() - replicaInfo, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c) + replicaInfo, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c, debugRecoverCollectInfoOpts.maxConcurrency) if err != nil { return errors.WithHint(errors.Wrap(err, "failed to retrieve replica info from cluster"), @@ -402,6 +404,7 @@ var debugRecoverPlanOpts struct { deadNodeIDs []int confirmAction confirmActionFlag force bool + maxConcurrency int } var planSpecificFlags = map[string]struct{}{ @@ -432,7 +435,7 @@ func runDebugPlanReplicaRemoval(cmd *cobra.Command, args []string) error { return errors.Wrapf(err, "failed to get admin connection to cluster") } defer finish() - replicas, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c) + replicas, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c, debugRecoverPlanOpts.maxConcurrency) if err != nil { return errors.Wrapf(err, "failed to retrieve replica info from cluster") } @@ -643,6 +646,7 @@ var debugRecoverExecuteOpts struct { Stores base.StoreSpecList confirmAction confirmActionFlag ignoreInternalVersion bool + maxConcurrency int } // runDebugExecuteRecoverPlan is using the following pattern when performing command @@ -670,7 +674,7 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error { if len(debugRecoverExecuteOpts.Stores.Specs) == 0 { return stageRecoveryOntoCluster(ctx, cmd, planFile, nodeUpdates, - debugRecoverExecuteOpts.ignoreInternalVersion) + debugRecoverExecuteOpts.ignoreInternalVersion, debugRecoverExecuteOpts.maxConcurrency) } return applyRecoveryToLocalStore(ctx, nodeUpdates, debugRecoverExecuteOpts.ignoreInternalVersion) } @@ -681,6 +685,7 @@ func stageRecoveryOntoCluster( planFile string, plan loqrecoverypb.ReplicaUpdatePlan, ignoreInternalVersion bool, + maxConcurrency int, ) error { c, finish, err := getAdminClient(ctx, serverCfg) if err != nil { @@ -693,7 +698,9 @@ func stageRecoveryOntoCluster( nodeID roachpb.NodeID planID string } - vr, err := c.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + vr, err := c.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{ + MaxConcurrency: int32(maxConcurrency), + }) if err != nil { return errors.Wrap(err, "failed to retrieve loss of quorum recovery status from cluster") } @@ -754,7 +761,11 @@ func stageRecoveryOntoCluster( // We don't want to combine removing old plan and adding new one since it // could produce cluster with multiple plans at the same time which could // make situation worse. - res, err := c.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{AllNodes: true, ForcePlan: true}) + res, err := c.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ + AllNodes: true, + ForcePlan: true, + MaxConcurrency: int32(maxConcurrency), + }) if err := maybeWrapStagingError("failed removing existing loss of quorum replica recovery plan from cluster", res, err); err != nil { return err } @@ -763,6 +774,7 @@ func stageRecoveryOntoCluster( Plan: &plan, AllNodes: true, ForceLocalInternalVersion: ignoreInternalVersion, + MaxConcurrency: int32(maxConcurrency), }) if err := maybeWrapStagingError("failed to stage loss of quorum recovery plan on cluster", sr, err); err != nil { @@ -919,6 +931,10 @@ See debug recover command help for more details on how to use this command. RunE: runDebugVerify, } +var debugRecoverVerifyOpts struct { + maxConcurrency int +} + func runDebugVerify(cmd *cobra.Command, args []string) error { // We must have cancellable context here to obtain grpc client connection. ctx, cancel := context.WithCancel(cmd.Context()) @@ -952,6 +968,7 @@ func runDebugVerify(cmd *cobra.Command, args []string) error { req := serverpb.RecoveryVerifyRequest{ DecommissionedNodeIDs: updatePlan.DecommissionedNodeIDs, MaxReportedRanges: 20, + MaxConcurrency: int32(debugRecoverVerifyOpts.maxConcurrency), } // Maybe switch to non-nullable? if !updatePlan.PlanID.Equal(uuid.UUID{}) { @@ -1155,14 +1172,23 @@ func getCLIClusterFlags(fromCfg bool, cmd *cobra.Command, filter func(flag strin return buf.String() } +// debugRecoverDefaultMaxConcurrency is the default concurrency that will be +// used when fanning out RPCs to nodes in the cluster while servicing a debug +// recover command. +var debugRecoverDefaultMaxConcurrency = 2 * runtime.GOMAXPROCS(0) + // setDebugRecoverContextDefaults resets values of command line flags to // their default values to ensure tests don't interfere with each other. func setDebugRecoverContextDefaults() { debugRecoverCollectInfoOpts.Stores.Specs = nil + debugRecoverCollectInfoOpts.maxConcurrency = debugRecoverDefaultMaxConcurrency debugRecoverPlanOpts.outputFileName = "" debugRecoverPlanOpts.confirmAction = prompt debugRecoverPlanOpts.deadStoreIDs = nil - debugRecoverPlanOpts.deadStoreIDs = nil + debugRecoverPlanOpts.deadNodeIDs = nil + debugRecoverPlanOpts.maxConcurrency = debugRecoverDefaultMaxConcurrency debugRecoverExecuteOpts.Stores.Specs = nil debugRecoverExecuteOpts.confirmAction = prompt + debugRecoverExecuteOpts.maxConcurrency = debugRecoverDefaultMaxConcurrency + debugRecoverVerifyOpts.maxConcurrency = debugRecoverDefaultMaxConcurrency } diff --git a/pkg/cli/debug_recover_loss_of_quorum_test.go b/pkg/cli/debug_recover_loss_of_quorum_test.go index 9b2c023db323..67e6e3d41e64 100644 --- a/pkg/cli/debug_recover_loss_of_quorum_test.go +++ b/pkg/cli/debug_recover_loss_of_quorum_test.go @@ -392,6 +392,7 @@ func TestStageVersionCheck(t *testing.T) { AllNodes: true, ForcePlan: false, ForceLocalInternalVersion: false, + MaxConcurrency: -1, // no limit }) require.ErrorContains(t, err, "doesn't match cluster active version") // Enable "stuck upgrade bypass" to stage plan on the cluster. @@ -400,6 +401,7 @@ func TestStageVersionCheck(t *testing.T) { AllNodes: true, ForcePlan: false, ForceLocalInternalVersion: true, + MaxConcurrency: -1, // no limit }) require.NoError(t, err, "force local must fix incorrect version") // Check that stored plan has version matching cluster version. diff --git a/pkg/kv/kvserver/loqrecovery/collect.go b/pkg/kv/kvserver/loqrecovery/collect.go index 4056dc406139..7508c246e0ec 100644 --- a/pkg/kv/kvserver/loqrecovery/collect.go +++ b/pkg/kv/kvserver/loqrecovery/collect.go @@ -35,10 +35,21 @@ type CollectionStats struct { Descriptors int } +// CollectRemoteReplicaInfo retrieves information about: +// 1. range descriptors contained in cluster meta ranges if meta ranges +// are readable; +// 2. replica information from all live nodes that have connection to +// the target node. +// +// maxConcurrency is the maximum parallelism that will be used when fanning out +// RPCs to nodes in the cluster. A value of 0 disables concurrency. A negative +// value configures no limit for concurrency. func CollectRemoteReplicaInfo( - ctx context.Context, c serverpb.AdminClient, + ctx context.Context, c serverpb.AdminClient, maxConcurrency int, ) (loqrecoverypb.ClusterReplicaInfo, CollectionStats, error) { - cc, err := c.RecoveryCollectReplicaInfo(ctx, &serverpb.RecoveryCollectReplicaInfoRequest{}) + cc, err := c.RecoveryCollectReplicaInfo(ctx, &serverpb.RecoveryCollectReplicaInfoRequest{ + MaxConcurrency: int32(maxConcurrency), + }) if err != nil { return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, err } diff --git a/pkg/kv/kvserver/loqrecovery/collect_raft_log_test.go b/pkg/kv/kvserver/loqrecovery/collect_raft_log_test.go index 886bebdbb1a4..ef39ac658748 100644 --- a/pkg/kv/kvserver/loqrecovery/collect_raft_log_test.go +++ b/pkg/kv/kvserver/loqrecovery/collect_raft_log_test.go @@ -255,7 +255,7 @@ func TestCollectLeaseholderStatus(t *testing.T) { // Note: we need to retry because replica collection is not atomic and // leaseholder could move around so we could see none or more than one. testutils.SucceedsSoon(t, func() error { - replicas, _, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm) + replicas, _, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */) require.NoError(t, err, "failed to collect replica info") foundLeaseholders := 0 diff --git a/pkg/kv/kvserver/loqrecovery/server_integration_test.go b/pkg/kv/kvserver/loqrecovery/server_integration_test.go index c30ac9fc6432..a4abf6b5a2f2 100644 --- a/pkg/kv/kvserver/loqrecovery/server_integration_test.go +++ b/pkg/kv/kvserver/loqrecovery/server_integration_test.go @@ -87,7 +87,7 @@ func TestReplicaCollection(t *testing.T) { var replicas loqrecoverypb.ClusterReplicaInfo var stats loqrecovery.CollectionStats - replicas, stats, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm) + replicas, stats, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */) require.NoError(t, err, "failed to retrieve replica info") // Check counters on retrieved replica info. @@ -159,7 +159,7 @@ func TestStreamRestart(t *testing.T) { var replicas loqrecoverypb.ClusterReplicaInfo var stats loqrecovery.CollectionStats - replicas, stats, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm) + replicas, stats, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */) require.NoError(t, err, "failed to retrieve replica info") // Check counters on retrieved replica info. @@ -209,7 +209,7 @@ func TestGetPlanStagingState(t *testing.T) { adm := tc.GetAdminClient(t, 0) - resp, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + resp, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{MaxConcurrency: -1 /* no limit */}) require.NoError(t, err) for _, s := range resp.Statuses { require.Nil(t, s.PendingPlanID, "no pending plan") @@ -222,7 +222,7 @@ func TestGetPlanStagingState(t *testing.T) { } // First we test that plans are successfully picked up by status call. - resp, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + resp, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{MaxConcurrency: -1 /* no limit */}) require.NoError(t, err) statuses := aggregateStatusByNode(resp) require.Equal(t, &plan.PlanID, statuses[1].PendingPlanID, "incorrect plan id on node 1") @@ -233,7 +233,7 @@ func TestGetPlanStagingState(t *testing.T) { tc.StopServer(1) testutils.SucceedsSoon(t, func() error { - resp, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + resp, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{MaxConcurrency: -1 /* no limit */}) if err != nil { return err } @@ -269,7 +269,7 @@ func TestStageRecoveryPlans(t *testing.T) { adm := tc.GetAdminClient(t, 0) - resp, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + resp, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{MaxConcurrency: -1 /* no limit */}) require.NoError(t, err) for _, s := range resp.Statuses { require.Nil(t, s.PendingPlanID, "no pending plan") @@ -284,12 +284,16 @@ func TestStageRecoveryPlans(t *testing.T) { createRecoveryForRange(t, tc, sk, 3), } plan.StaleLeaseholderNodeIDs = []roachpb.NodeID{1} - res, err := adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + res, err := adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ + Plan: &plan, + AllNodes: true, + MaxConcurrency: -1, // no limit + }) require.NoError(t, err, "failed to stage plan") require.Empty(t, res.Errors, "unexpected errors in stage response") // First we test that plans are successfully picked up by status call. - resp, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + resp, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{MaxConcurrency: -1 /* no limit */}) require.NoError(t, err) statuses := aggregateStatusByNode(resp) require.Equal(t, &plan.PlanID, statuses[1].PendingPlanID, "incorrect plan id on node 1") @@ -316,11 +320,19 @@ func TestStageBadVersions(t *testing.T) { plan.Version = clusterversion.MinSupported.Version() plan.Version.Major -= 1 - _, err := adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + _, err := adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ + Plan: &plan, + AllNodes: true, + MaxConcurrency: -1, // no limit + }) require.Error(t, err, "shouldn't stage plan with old version") plan.Version.Major += 2 - _, err = adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + _, err = adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ + Plan: &plan, + AllNodes: true, + MaxConcurrency: -1, // no limit + }) require.Error(t, err, "shouldn't stage plan with future version") } @@ -335,7 +347,7 @@ func TestStageConflictingPlans(t *testing.T) { adm := tc.GetAdminClient(t, 0) - resp, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + resp, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{MaxConcurrency: -1 /* no limit */}) require.NoError(t, err) for _, s := range resp.Statuses { require.Nil(t, s.PendingPlanID, "no pending plan") @@ -348,7 +360,11 @@ func TestStageConflictingPlans(t *testing.T) { plan.Updates = []loqrecoverypb.ReplicaUpdate{ createRecoveryForRange(t, tc, sk, 3), } - res, err := adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + res, err := adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ + Plan: &plan, + AllNodes: true, + MaxConcurrency: -1, // no limit + }) require.NoError(t, err, "failed to stage plan") require.Empty(t, res.Errors, "unexpected errors in stage response") @@ -356,7 +372,11 @@ func TestStageConflictingPlans(t *testing.T) { plan2.Updates = []loqrecoverypb.ReplicaUpdate{ createRecoveryForRange(t, tc, sk, 2), } - _, err = adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan2, AllNodes: true}) + _, err = adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ + Plan: &plan2, + AllNodes: true, + MaxConcurrency: -1, // no limit + }) require.ErrorContains(t, err, fmt.Sprintf("plan %s is already staged on node n3", plan.PlanID.String()), "conflicting plans must not be allowed") @@ -373,7 +393,7 @@ func TestForcePlanUpdate(t *testing.T) { adm := tc.GetAdminClient(t, 0) - resV, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + resV, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{MaxConcurrency: -1 /* no limit */}) require.NoError(t, err) for _, s := range resV.Statuses { require.Nil(t, s.PendingPlanID, "no pending plan") @@ -386,15 +406,23 @@ func TestForcePlanUpdate(t *testing.T) { plan.Updates = []loqrecoverypb.ReplicaUpdate{ createRecoveryForRange(t, tc, sk, 3), } - resS, err := adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + resS, err := adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ + Plan: &plan, + AllNodes: true, + MaxConcurrency: -1, // no limit + }) require.NoError(t, err, "failed to stage plan") require.Empty(t, resS.Errors, "unexpected errors in stage response") - _, err = adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{AllNodes: true, ForcePlan: true}) + _, err = adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ + AllNodes: true, + ForcePlan: true, + MaxConcurrency: -1, // no limit + }) require.NoError(t, err, "force plan should reset previous plans") // Verify that plan was successfully replaced by an empty one. - resV, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + resV, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{MaxConcurrency: -1 /* no limit */}) require.NoError(t, err) statuses := aggregateStatusByNode(resV) require.Nil(t, statuses[1].PendingPlanID, "unexpected plan id on node 1") @@ -418,8 +446,11 @@ func TestNodeDecommissioned(t *testing.T) { plan := makeTestRecoveryPlan(ctx, t, adm) plan.DecommissionedNodeIDs = []roachpb.NodeID{roachpb.NodeID(3)} testutils.SucceedsSoon(t, func() error { - res, err := adm.RecoveryStagePlan(ctx, - &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + res, err := adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ + Plan: &plan, + AllNodes: true, + MaxConcurrency: -1, // no limit + }) if err != nil { return err } @@ -446,8 +477,11 @@ func TestRejectDecommissionReachableNode(t *testing.T) { plan := makeTestRecoveryPlan(ctx, t, adm) plan.DecommissionedNodeIDs = []roachpb.NodeID{roachpb.NodeID(3)} - _, err := adm.RecoveryStagePlan(ctx, - &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + _, err := adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ + Plan: &plan, + AllNodes: true, + MaxConcurrency: -1, // no limit + }) require.ErrorContains(t, err, "was planned for decommission, but is present in cluster", "staging plan decommissioning live nodes must not be allowed") } @@ -463,7 +497,7 @@ func TestStageRecoveryPlansToWrongCluster(t *testing.T) { adm := tc.GetAdminClient(t, 0) - resp, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + resp, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{MaxConcurrency: -1 /* no limit */}) require.NoError(t, err) for _, s := range resp.Statuses { require.Nil(t, s.PendingPlanID, "no pending plan") @@ -478,7 +512,11 @@ func TestStageRecoveryPlansToWrongCluster(t *testing.T) { plan.Updates = []loqrecoverypb.ReplicaUpdate{ createRecoveryForRange(t, tc, sk, 3), } - _, err = adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + _, err = adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ + Plan: &plan, + AllNodes: true, + MaxConcurrency: -1, // no limit + }) require.ErrorContains(t, err, "attempting to stage plan from cluster", "failed to stage plan") } @@ -513,6 +551,7 @@ func TestRetrieveRangeStatus(t *testing.T) { r, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{ DecommissionedNodeIDs: []roachpb.NodeID{rs[0].NodeID, rs[1].NodeID}, MaxReportedRanges: 999, + MaxConcurrency: -1, // no limit }) require.NoError(t, err, "failed to get range status") @@ -532,6 +571,7 @@ func TestRetrieveRangeStatus(t *testing.T) { r, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{ DecommissionedNodeIDs: []roachpb.NodeID{rs[0].NodeID, rs[1].NodeID}, MaxReportedRanges: 1, + MaxConcurrency: -1, // no limit }) require.NoError(t, err, "failed to get range status") require.Equal(t, r.UnavailableRanges.Error, "found more failed ranges than limit 1") @@ -570,13 +610,17 @@ func TestRetrieveApplyStatus(t *testing.T) { var replicas loqrecoverypb.ClusterReplicaInfo testutils.SucceedsSoon(t, func() error { var err error - replicas, _, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm) + replicas, _, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */) return err }) plan, planDetails, err := loqrecovery.PlanReplicas(ctx, replicas, nil, nil, uuid.DefaultGenerator) require.NoError(t, err, "failed to create a plan") testutils.SucceedsSoon(t, func() error { - res, err := adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{Plan: &plan, AllNodes: true}) + res, err := adm.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ + Plan: &plan, + AllNodes: true, + MaxConcurrency: -1, // no limit + }) if err != nil { return err } @@ -588,6 +632,7 @@ func TestRetrieveApplyStatus(t *testing.T) { r, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{ DecommissionedNodeIDs: plan.DecommissionedNodeIDs, + MaxConcurrency: -1, // no limit }) require.NoError(t, err, "failed to run recovery verify") @@ -618,6 +663,7 @@ func TestRetrieveApplyStatus(t *testing.T) { r, err = adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{ PendingPlanID: &plan.PlanID, DecommissionedNodeIDs: plan.DecommissionedNodeIDs, + MaxConcurrency: -1, // no limit }) require.NoError(t, err, "failed to run recovery verify") applied := 0 @@ -646,7 +692,7 @@ func TestRejectBadVersionApplication(t *testing.T) { var replicas loqrecoverypb.ClusterReplicaInfo testutils.SucceedsSoon(t, func() error { var err error - replicas, _, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm) + replicas, _, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */) return err }) plan, _, err := loqrecovery.PlanReplicas(ctx, replicas, nil, nil, uuid.DefaultGenerator) @@ -658,7 +704,7 @@ func TestRejectBadVersionApplication(t *testing.T) { require.NoError(t, pss[1].SavePlan(plan), "failed to inject plan into storage") require.NoError(t, tc.RestartServer(1), "failed to restart server") - r, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + r, err := adm.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{MaxConcurrency: -1 /* no limit */}) require.NoError(t, err, "failed to run recovery verify") found := false for _, s := range r.Statuses { diff --git a/pkg/server/serverpb/admin.proto b/pkg/server/serverpb/admin.proto index bddcf8271a94..51661b23bd2d 100644 --- a/pkg/server/serverpb/admin.proto +++ b/pkg/server/serverpb/admin.proto @@ -888,7 +888,12 @@ message ChartCatalogResponse { repeated cockroach.ts.catalog.ChartSection catalog = 1 [(gogoproto.nullable) = false]; } -message RecoveryCollectReplicaInfoRequest {} +message RecoveryCollectReplicaInfoRequest { + // MaxConcurrency is the maximum parallelism that will be used when fanning + // out RPCs to nodes in the cluster while servicing this request. A value of 0 + // disables concurrency. A negative value configures no limit for concurrency. + int32 max_concurrency = 1; +} // RecoveryCollectReplicaRestartNodeStream is sent by collector node to client // if it experiences a transient failure collecting data from one of the nodes. @@ -933,6 +938,10 @@ message RecoveryStagePlanRequest { // if target cluster is stuck in recovery where only part of nodes were // successfully migrated. bool force_local_internal_version = 4; + // MaxConcurrency is the maximum parallelism that will be used when fanning + // out RPCs to nodes in the cluster while servicing this request. A value of 0 + // disables concurrency. A negative value configures no limit for concurrency. + int32 max_concurrency = 5; } message RecoveryStagePlanResponse { @@ -961,6 +970,10 @@ message RecoveryVerifyRequest { // If more unhealthy ranges are found, error will be returned alongside range // to indicate that ranges were cut short. int32 max_reported_ranges = 3; + // MaxConcurrency is the maximum parallelism that will be used when fanning + // out RPCs to nodes in the cluster while servicing this request. A value of 0 + // disables concurrency. A negative value configures no limit for concurrency. + int32 max_concurrency = 4; } message RecoveryVerifyResponse {