diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index 0bdf6235d48e..2fcae31458ca 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -7689,6 +7689,10 @@ Support status: [reserved](#support-status) +| Field | Type | Label | Description | Support status | +| ----- | ---- | ----- | ----------- | -------------- | +| max_concurrency | [int32](#cockroach.server.serverpb.RecoveryCollectReplicaInfoRequest-int32) | | MaxConcurrency is the maximum parallelism that will be used when fanning out RPCs to nodes in the cluster while servicing this request. A value of 0 disables concurrency. A negative value configures no limit for concurrency. | [reserved](#support-status) | + @@ -7799,6 +7803,7 @@ Support status: [reserved](#support-status) | all_nodes | [bool](#cockroach.server.serverpb.RecoveryStagePlanRequest-bool) | | If all nodes is true, then receiver should act as a coordinator and perform a fan-out to stage plan on all nodes of the cluster. | [reserved](#support-status) | | force_plan | [bool](#cockroach.server.serverpb.RecoveryStagePlanRequest-bool) | | ForcePlan tells receiver to ignore any plan already staged on the node if it is present and replace it with new plan (including empty one). | [reserved](#support-status) | | force_local_internal_version | [bool](#cockroach.server.serverpb.RecoveryStagePlanRequest-bool) | | ForceLocalInternalVersion tells server to update internal component of plan version to the one of active cluster version. This option needs to be set if target cluster is stuck in recovery where only part of nodes were successfully migrated. | [reserved](#support-status) | +| max_concurrency | [int32](#cockroach.server.serverpb.RecoveryStagePlanRequest-int32) | | MaxConcurrency is the maximum parallelism that will be used when fanning out RPCs to nodes in the cluster while servicing this request. A value of 0 disables concurrency. A negative value configures no limit for concurrency. | [reserved](#support-status) | @@ -7888,6 +7893,7 @@ Support status: [reserved](#support-status) | plan_id | [bytes](#cockroach.server.serverpb.RecoveryVerifyRequest-bytes) | | PlanID is ID of the plan to verify. | [reserved](#support-status) | | decommissioned_node_ids | [int32](#cockroach.server.serverpb.RecoveryVerifyRequest-int32) | repeated | DecommissionedNodeIDs is a set of nodes that should be marked as decommissioned in the cluster when loss of quorum recovery successfully applies. | [reserved](#support-status) | | max_reported_ranges | [int32](#cockroach.server.serverpb.RecoveryVerifyRequest-int32) | | MaxReportedRanges is the maximum number of failed ranges to report. If more unhealthy ranges are found, error will be returned alongside range to indicate that ranges were cut short. | [reserved](#support-status) | +| max_concurrency | [int32](#cockroach.server.serverpb.RecoveryVerifyRequest-int32) | | MaxConcurrency is the maximum parallelism that will be used when fanning out RPCs to nodes in the cluster while servicing this request. A value of 0 disables concurrency. A negative value configures no limit for concurrency. | [reserved](#support-status) | diff --git a/pkg/cli/debug.go b/pkg/cli/debug.go index 1c0195ddafbe..ac9e2f2324bc 100644 --- a/pkg/cli/debug.go +++ b/pkg/cli/debug.go @@ -1515,6 +1515,8 @@ func init() { f = debugRecoverCollectInfoCmd.Flags() f.VarP(&debugRecoverCollectInfoOpts.Stores, cliflags.RecoverStore.Name, cliflags.RecoverStore.Shorthand, cliflags.RecoverStore.Usage()) + f.IntVarP(&debugRecoverCollectInfoOpts.maxConcurrency, "max-concurrency", "c", debugRecoverDefaultMaxConcurrency, + "maximum concurrency when fanning out RPCs to nodes in the cluster") f = debugRecoverPlanCmd.Flags() f.StringVarP(&debugRecoverPlanOpts.outputFileName, "plan", "o", "", @@ -1528,6 +1530,8 @@ func init() { f.BoolVar(&debugRecoverPlanOpts.force, "force", false, "force creation of plan even when problems were encountered; applying this plan may "+ "result in additional problems and should be done only with care and as a last resort") + f.IntVarP(&debugRecoverPlanOpts.maxConcurrency, "max-concurrency", "c", debugRecoverDefaultMaxConcurrency, + "maximum concurrency when fanning out RPCs to nodes in the cluster") f.UintVar(&formatHelper.maxPrintedKeyLength, cliflags.PrintKeyLength.Name, formatHelper.maxPrintedKeyLength, cliflags.PrintKeyLength.Usage()) @@ -1539,6 +1543,12 @@ func init() { formatHelper.maxPrintedKeyLength, cliflags.PrintKeyLength.Usage()) f.BoolVar(&debugRecoverExecuteOpts.ignoreInternalVersion, cliflags.RecoverIgnoreInternalVersion.Name, debugRecoverExecuteOpts.ignoreInternalVersion, cliflags.RecoverIgnoreInternalVersion.Usage()) + f.IntVarP(&debugRecoverExecuteOpts.maxConcurrency, "max-concurrency", "c", debugRecoverDefaultMaxConcurrency, + "maximum concurrency when fanning out RPCs to nodes in the cluster") + + f = debugRecoverVerifyCmd.Flags() + f.IntVarP(&debugRecoverVerifyOpts.maxConcurrency, "max-concurrency", "c", debugRecoverDefaultMaxConcurrency, + "maximum concurrency when fanning out RPCs to nodes in the cluster") f = debugMergeLogsCmd.Flags() f.Var(flagutil.Time(&debugMergeLogsOpts.from), "from", diff --git a/pkg/cli/debug_recover_loss_of_quorum.go b/pkg/cli/debug_recover_loss_of_quorum.go index fc2e4a137d2d..25110e51b2dd 100644 --- a/pkg/cli/debug_recover_loss_of_quorum.go +++ b/pkg/cli/debug_recover_loss_of_quorum.go @@ -17,6 +17,7 @@ import ( "io" "os" "path" + "runtime" "sort" "strings" @@ -297,7 +298,8 @@ See debug recover command help for more details on how to use this command. } var debugRecoverCollectInfoOpts struct { - Stores base.StoreSpecList + Stores base.StoreSpecList + maxConcurrency int } func runDebugDeadReplicaCollect(cmd *cobra.Command, args []string) error { @@ -316,7 +318,7 @@ func runDebugDeadReplicaCollect(cmd *cobra.Command, args []string) error { return errors.Wrapf(err, "failed to get admin connection to cluster") } defer finish() - replicaInfo, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c) + replicaInfo, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c, debugRecoverCollectInfoOpts.maxConcurrency) if err != nil { return errors.WithHint(errors.Wrap(err, "failed to retrieve replica info from cluster"), @@ -402,6 +404,7 @@ var debugRecoverPlanOpts struct { deadNodeIDs []int confirmAction confirmActionFlag force bool + maxConcurrency int } var planSpecificFlags = map[string]struct{}{ @@ -432,7 +435,7 @@ func runDebugPlanReplicaRemoval(cmd *cobra.Command, args []string) error { return errors.Wrapf(err, "failed to get admin connection to cluster") } defer finish() - replicas, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c) + replicas, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c, debugRecoverPlanOpts.maxConcurrency) if err != nil { return errors.Wrapf(err, "failed to retrieve replica info from cluster") } @@ -643,6 +646,7 @@ var debugRecoverExecuteOpts struct { Stores base.StoreSpecList confirmAction confirmActionFlag ignoreInternalVersion bool + maxConcurrency int } // runDebugExecuteRecoverPlan is using the following pattern when performing command @@ -670,7 +674,7 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error { if len(debugRecoverExecuteOpts.Stores.Specs) == 0 { return stageRecoveryOntoCluster(ctx, cmd, planFile, nodeUpdates, - debugRecoverExecuteOpts.ignoreInternalVersion) + debugRecoverExecuteOpts.ignoreInternalVersion, debugRecoverExecuteOpts.maxConcurrency) } return applyRecoveryToLocalStore(ctx, nodeUpdates, debugRecoverExecuteOpts.ignoreInternalVersion) } @@ -681,6 +685,7 @@ func stageRecoveryOntoCluster( planFile string, plan loqrecoverypb.ReplicaUpdatePlan, ignoreInternalVersion bool, + maxConcurrency int, ) error { c, finish, err := getAdminClient(ctx, serverCfg) if err != nil { @@ -693,7 +698,9 @@ func stageRecoveryOntoCluster( nodeID roachpb.NodeID planID string } - vr, err := c.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{}) + vr, err := c.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{ + MaxConcurrency: int32(maxConcurrency), + }) if err != nil { return errors.Wrap(err, "failed to retrieve loss of quorum recovery status from cluster") } @@ -754,7 +761,11 @@ func stageRecoveryOntoCluster( // We don't want to combine removing old plan and adding new one since it // could produce cluster with multiple plans at the same time which could // make situation worse. - res, err := c.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{AllNodes: true, ForcePlan: true}) + res, err := c.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{ + AllNodes: true, + ForcePlan: true, + MaxConcurrency: int32(maxConcurrency), + }) if err := maybeWrapStagingError("failed removing existing loss of quorum replica recovery plan from cluster", res, err); err != nil { return err } @@ -763,6 +774,7 @@ func stageRecoveryOntoCluster( Plan: &plan, AllNodes: true, ForceLocalInternalVersion: ignoreInternalVersion, + MaxConcurrency: int32(maxConcurrency), }) if err := maybeWrapStagingError("failed to stage loss of quorum recovery plan on cluster", sr, err); err != nil { @@ -919,6 +931,10 @@ See debug recover command help for more details on how to use this command. RunE: runDebugVerify, } +var debugRecoverVerifyOpts struct { + maxConcurrency int +} + func runDebugVerify(cmd *cobra.Command, args []string) error { // We must have cancellable context here to obtain grpc client connection. ctx, cancel := context.WithCancel(cmd.Context()) @@ -952,6 +968,7 @@ func runDebugVerify(cmd *cobra.Command, args []string) error { req := serverpb.RecoveryVerifyRequest{ DecommissionedNodeIDs: updatePlan.DecommissionedNodeIDs, MaxReportedRanges: 20, + MaxConcurrency: int32(debugRecoverVerifyOpts.maxConcurrency), } // Maybe switch to non-nullable? if !updatePlan.PlanID.Equal(uuid.UUID{}) { @@ -1155,14 +1172,23 @@ func getCLIClusterFlags(fromCfg bool, cmd *cobra.Command, filter func(flag strin return buf.String() } +// debugRecoverDefaultMaxConcurrency is the default concurrency that will be +// used when fanning out RPCs to nodes in the cluster while servicing a debug +// recover command. +var debugRecoverDefaultMaxConcurrency = 2 * runtime.GOMAXPROCS(0) + // setDebugRecoverContextDefaults resets values of command line flags to // their default values to ensure tests don't interfere with each other. func setDebugRecoverContextDefaults() { debugRecoverCollectInfoOpts.Stores.Specs = nil + debugRecoverCollectInfoOpts.maxConcurrency = debugRecoverDefaultMaxConcurrency debugRecoverPlanOpts.outputFileName = "" debugRecoverPlanOpts.confirmAction = prompt debugRecoverPlanOpts.deadStoreIDs = nil - debugRecoverPlanOpts.deadStoreIDs = nil + debugRecoverPlanOpts.deadNodeIDs = nil + debugRecoverPlanOpts.maxConcurrency = debugRecoverDefaultMaxConcurrency debugRecoverExecuteOpts.Stores.Specs = nil debugRecoverExecuteOpts.confirmAction = prompt + debugRecoverExecuteOpts.maxConcurrency = debugRecoverDefaultMaxConcurrency + debugRecoverVerifyOpts.maxConcurrency = debugRecoverDefaultMaxConcurrency } diff --git a/pkg/kv/kvserver/loqrecovery/collect.go b/pkg/kv/kvserver/loqrecovery/collect.go index 4056dc406139..e8ec284e9b03 100644 --- a/pkg/kv/kvserver/loqrecovery/collect.go +++ b/pkg/kv/kvserver/loqrecovery/collect.go @@ -36,9 +36,11 @@ type CollectionStats struct { } func CollectRemoteReplicaInfo( - ctx context.Context, c serverpb.AdminClient, + ctx context.Context, c serverpb.AdminClient, maxConcurrency int, ) (loqrecoverypb.ClusterReplicaInfo, CollectionStats, error) { - cc, err := c.RecoveryCollectReplicaInfo(ctx, &serverpb.RecoveryCollectReplicaInfoRequest{}) + cc, err := c.RecoveryCollectReplicaInfo(ctx, &serverpb.RecoveryCollectReplicaInfoRequest{ + MaxConcurrency: int32(maxConcurrency), + }) if err != nil { return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, err } diff --git a/pkg/kv/kvserver/loqrecovery/collect_raft_log_test.go b/pkg/kv/kvserver/loqrecovery/collect_raft_log_test.go index 886bebdbb1a4..ef39ac658748 100644 --- a/pkg/kv/kvserver/loqrecovery/collect_raft_log_test.go +++ b/pkg/kv/kvserver/loqrecovery/collect_raft_log_test.go @@ -255,7 +255,7 @@ func TestCollectLeaseholderStatus(t *testing.T) { // Note: we need to retry because replica collection is not atomic and // leaseholder could move around so we could see none or more than one. testutils.SucceedsSoon(t, func() error { - replicas, _, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm) + replicas, _, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */) require.NoError(t, err, "failed to collect replica info") foundLeaseholders := 0 diff --git a/pkg/kv/kvserver/loqrecovery/server_integration_test.go b/pkg/kv/kvserver/loqrecovery/server_integration_test.go index c30ac9fc6432..568fbb59c23e 100644 --- a/pkg/kv/kvserver/loqrecovery/server_integration_test.go +++ b/pkg/kv/kvserver/loqrecovery/server_integration_test.go @@ -87,7 +87,7 @@ func TestReplicaCollection(t *testing.T) { var replicas loqrecoverypb.ClusterReplicaInfo var stats loqrecovery.CollectionStats - replicas, stats, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm) + replicas, stats, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */) require.NoError(t, err, "failed to retrieve replica info") // Check counters on retrieved replica info. @@ -159,7 +159,7 @@ func TestStreamRestart(t *testing.T) { var replicas loqrecoverypb.ClusterReplicaInfo var stats loqrecovery.CollectionStats - replicas, stats, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm) + replicas, stats, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */) require.NoError(t, err, "failed to retrieve replica info") // Check counters on retrieved replica info. @@ -570,7 +570,7 @@ func TestRetrieveApplyStatus(t *testing.T) { var replicas loqrecoverypb.ClusterReplicaInfo testutils.SucceedsSoon(t, func() error { var err error - replicas, _, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm) + replicas, _, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */) return err }) plan, planDetails, err := loqrecovery.PlanReplicas(ctx, replicas, nil, nil, uuid.DefaultGenerator) @@ -646,7 +646,7 @@ func TestRejectBadVersionApplication(t *testing.T) { var replicas loqrecoverypb.ClusterReplicaInfo testutils.SucceedsSoon(t, func() error { var err error - replicas, _, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm) + replicas, _, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */) return err }) plan, _, err := loqrecovery.PlanReplicas(ctx, replicas, nil, nil, uuid.DefaultGenerator) diff --git a/pkg/server/serverpb/admin.proto b/pkg/server/serverpb/admin.proto index bddcf8271a94..51661b23bd2d 100644 --- a/pkg/server/serverpb/admin.proto +++ b/pkg/server/serverpb/admin.proto @@ -888,7 +888,12 @@ message ChartCatalogResponse { repeated cockroach.ts.catalog.ChartSection catalog = 1 [(gogoproto.nullable) = false]; } -message RecoveryCollectReplicaInfoRequest {} +message RecoveryCollectReplicaInfoRequest { + // MaxConcurrency is the maximum parallelism that will be used when fanning + // out RPCs to nodes in the cluster while servicing this request. A value of 0 + // disables concurrency. A negative value configures no limit for concurrency. + int32 max_concurrency = 1; +} // RecoveryCollectReplicaRestartNodeStream is sent by collector node to client // if it experiences a transient failure collecting data from one of the nodes. @@ -933,6 +938,10 @@ message RecoveryStagePlanRequest { // if target cluster is stuck in recovery where only part of nodes were // successfully migrated. bool force_local_internal_version = 4; + // MaxConcurrency is the maximum parallelism that will be used when fanning + // out RPCs to nodes in the cluster while servicing this request. A value of 0 + // disables concurrency. A negative value configures no limit for concurrency. + int32 max_concurrency = 5; } message RecoveryStagePlanResponse { @@ -961,6 +970,10 @@ message RecoveryVerifyRequest { // If more unhealthy ranges are found, error will be returned alongside range // to indicate that ranges were cut short. int32 max_reported_ranges = 3; + // MaxConcurrency is the maximum parallelism that will be used when fanning + // out RPCs to nodes in the cluster while servicing this request. A value of 0 + // disables concurrency. A negative value configures no limit for concurrency. + int32 max_concurrency = 4; } message RecoveryVerifyResponse {