Skip to content

Commit

Permalink
cli: add --max-concurrency flag to debug recover commands
Browse files Browse the repository at this point in the history
Informs cockroachdb#122639.

This commit adds a `--max-concurrency` flag to the following `debug
recover` commands:
- `debug recover collect-info`
- `debug recover make-plan`
- `debug recover apply-plan`
- `debug recover verify`

The flag controls the maximum concurrency when fanning out RPCs to nodes
in the cluster while servicing the command. It defaults to `2 x num_cpus`,
which is a decent proxy for how much fanout the process can tolerate without
overloading itself.

Some plumbing is performed, but the flags are currently unused.

Release note (cli change): Added `--max-concurrency` flag to `debug
recover` commands to control the maximum concurrency when fanning out
RPCs to nodes in the cluster. The flag defaults to `2 x num_cpus`.
  • Loading branch information
nvanbenschoten committed Apr 25, 2024
1 parent 848fce0 commit fe468ee
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 38 deletions.
6 changes: 6 additions & 0 deletions docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -7689,6 +7689,10 @@ Support status: [reserved](#support-status)



| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| max_concurrency | [int32](#cockroach.server.serverpb.RecoveryCollectReplicaInfoRequest-int32) | | MaxConcurrency is the maximum parallelism that will be used when fanning out RPCs to nodes in the cluster while servicing this request. A value of 0 disables concurrency. A negative value configures no limit for concurrency. | [reserved](#support-status) |




Expand Down Expand Up @@ -7799,6 +7803,7 @@ Support status: [reserved](#support-status)
| all_nodes | [bool](#cockroach.server.serverpb.RecoveryStagePlanRequest-bool) | | If all nodes is true, then receiver should act as a coordinator and perform a fan-out to stage plan on all nodes of the cluster. | [reserved](#support-status) |
| force_plan | [bool](#cockroach.server.serverpb.RecoveryStagePlanRequest-bool) | | ForcePlan tells receiver to ignore any plan already staged on the node if it is present and replace it with new plan (including empty one). | [reserved](#support-status) |
| force_local_internal_version | [bool](#cockroach.server.serverpb.RecoveryStagePlanRequest-bool) | | ForceLocalInternalVersion tells server to update internal component of plan version to the one of active cluster version. This option needs to be set if target cluster is stuck in recovery where only part of nodes were successfully migrated. | [reserved](#support-status) |
| max_concurrency | [int32](#cockroach.server.serverpb.RecoveryStagePlanRequest-int32) | | MaxConcurrency is the maximum parallelism that will be used when fanning out RPCs to nodes in the cluster while servicing this request. A value of 0 disables concurrency. A negative value configures no limit for concurrency. | [reserved](#support-status) |



Expand Down Expand Up @@ -7888,6 +7893,7 @@ Support status: [reserved](#support-status)
| plan_id | [bytes](#cockroach.server.serverpb.RecoveryVerifyRequest-bytes) | | PlanID is ID of the plan to verify. | [reserved](#support-status) |
| decommissioned_node_ids | [int32](#cockroach.server.serverpb.RecoveryVerifyRequest-int32) | repeated | DecommissionedNodeIDs is a set of nodes that should be marked as decommissioned in the cluster when loss of quorum recovery successfully applies. | [reserved](#support-status) |
| max_reported_ranges | [int32](#cockroach.server.serverpb.RecoveryVerifyRequest-int32) | | MaxReportedRanges is the maximum number of failed ranges to report. If more unhealthy ranges are found, error will be returned alongside range to indicate that ranges were cut short. | [reserved](#support-status) |
| max_concurrency | [int32](#cockroach.server.serverpb.RecoveryVerifyRequest-int32) | | MaxConcurrency is the maximum parallelism that will be used when fanning out RPCs to nodes in the cluster while servicing this request. A value of 0 disables concurrency. A negative value configures no limit for concurrency. | [reserved](#support-status) |



Expand Down
10 changes: 10 additions & 0 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1515,6 +1515,8 @@ func init() {

f = debugRecoverCollectInfoCmd.Flags()
f.VarP(&debugRecoverCollectInfoOpts.Stores, cliflags.RecoverStore.Name, cliflags.RecoverStore.Shorthand, cliflags.RecoverStore.Usage())
f.IntVarP(&debugRecoverCollectInfoOpts.maxConcurrency, "max-concurrency", "c", debugRecoverDefaultMaxConcurrency,
"maximum concurrency when fanning out RPCs to nodes in the cluster")

f = debugRecoverPlanCmd.Flags()
f.StringVarP(&debugRecoverPlanOpts.outputFileName, "plan", "o", "",
Expand All @@ -1528,6 +1530,8 @@ func init() {
f.BoolVar(&debugRecoverPlanOpts.force, "force", false,
"force creation of plan even when problems were encountered; applying this plan may "+
"result in additional problems and should be done only with care and as a last resort")
f.IntVarP(&debugRecoverPlanOpts.maxConcurrency, "max-concurrency", "c", debugRecoverDefaultMaxConcurrency,
"maximum concurrency when fanning out RPCs to nodes in the cluster")
f.UintVar(&formatHelper.maxPrintedKeyLength, cliflags.PrintKeyLength.Name,
formatHelper.maxPrintedKeyLength, cliflags.PrintKeyLength.Usage())

Expand All @@ -1539,6 +1543,12 @@ func init() {
formatHelper.maxPrintedKeyLength, cliflags.PrintKeyLength.Usage())
f.BoolVar(&debugRecoverExecuteOpts.ignoreInternalVersion, cliflags.RecoverIgnoreInternalVersion.Name,
debugRecoverExecuteOpts.ignoreInternalVersion, cliflags.RecoverIgnoreInternalVersion.Usage())
f.IntVarP(&debugRecoverExecuteOpts.maxConcurrency, "max-concurrency", "c", debugRecoverDefaultMaxConcurrency,
"maximum concurrency when fanning out RPCs to nodes in the cluster")

f = debugRecoverVerifyCmd.Flags()
f.IntVarP(&debugRecoverVerifyOpts.maxConcurrency, "max-concurrency", "c", debugRecoverDefaultMaxConcurrency,
"maximum concurrency when fanning out RPCs to nodes in the cluster")

f = debugMergeLogsCmd.Flags()
f.Var(flagutil.Time(&debugMergeLogsOpts.from), "from",
Expand Down
40 changes: 33 additions & 7 deletions pkg/cli/debug_recover_loss_of_quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"io"
"os"
"path"
"runtime"
"sort"
"strings"

Expand Down Expand Up @@ -297,7 +298,8 @@ See debug recover command help for more details on how to use this command.
}

var debugRecoverCollectInfoOpts struct {
Stores base.StoreSpecList
Stores base.StoreSpecList
maxConcurrency int
}

func runDebugDeadReplicaCollect(cmd *cobra.Command, args []string) error {
Expand All @@ -316,7 +318,7 @@ func runDebugDeadReplicaCollect(cmd *cobra.Command, args []string) error {
return errors.Wrapf(err, "failed to get admin connection to cluster")
}
defer finish()
replicaInfo, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c)
replicaInfo, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c, debugRecoverCollectInfoOpts.maxConcurrency)
if err != nil {
return errors.WithHint(errors.Wrap(err,
"failed to retrieve replica info from cluster"),
Expand Down Expand Up @@ -402,6 +404,7 @@ var debugRecoverPlanOpts struct {
deadNodeIDs []int
confirmAction confirmActionFlag
force bool
maxConcurrency int
}

var planSpecificFlags = map[string]struct{}{
Expand Down Expand Up @@ -432,7 +435,7 @@ func runDebugPlanReplicaRemoval(cmd *cobra.Command, args []string) error {
return errors.Wrapf(err, "failed to get admin connection to cluster")
}
defer finish()
replicas, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c)
replicas, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c, debugRecoverPlanOpts.maxConcurrency)
if err != nil {
return errors.Wrapf(err, "failed to retrieve replica info from cluster")
}
Expand Down Expand Up @@ -643,6 +646,7 @@ var debugRecoverExecuteOpts struct {
Stores base.StoreSpecList
confirmAction confirmActionFlag
ignoreInternalVersion bool
maxConcurrency int
}

// runDebugExecuteRecoverPlan is using the following pattern when performing command
Expand Down Expand Up @@ -670,7 +674,7 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error {

if len(debugRecoverExecuteOpts.Stores.Specs) == 0 {
return stageRecoveryOntoCluster(ctx, cmd, planFile, nodeUpdates,
debugRecoverExecuteOpts.ignoreInternalVersion)
debugRecoverExecuteOpts.ignoreInternalVersion, debugRecoverExecuteOpts.maxConcurrency)
}
return applyRecoveryToLocalStore(ctx, nodeUpdates, debugRecoverExecuteOpts.ignoreInternalVersion)
}
Expand All @@ -681,6 +685,7 @@ func stageRecoveryOntoCluster(
planFile string,
plan loqrecoverypb.ReplicaUpdatePlan,
ignoreInternalVersion bool,
maxConcurrency int,
) error {
c, finish, err := getAdminClient(ctx, serverCfg)
if err != nil {
Expand All @@ -693,7 +698,9 @@ func stageRecoveryOntoCluster(
nodeID roachpb.NodeID
planID string
}
vr, err := c.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{})
vr, err := c.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{
MaxConcurrency: int32(maxConcurrency),
})
if err != nil {
return errors.Wrap(err, "failed to retrieve loss of quorum recovery status from cluster")
}
Expand Down Expand Up @@ -754,7 +761,11 @@ func stageRecoveryOntoCluster(
// We don't want to combine removing old plan and adding new one since it
// could produce cluster with multiple plans at the same time which could
// make situation worse.
res, err := c.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{AllNodes: true, ForcePlan: true})
res, err := c.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{
AllNodes: true,
ForcePlan: true,
MaxConcurrency: int32(maxConcurrency),
})
if err := maybeWrapStagingError("failed removing existing loss of quorum replica recovery plan from cluster", res, err); err != nil {
return err
}
Expand All @@ -763,6 +774,7 @@ func stageRecoveryOntoCluster(
Plan: &plan,
AllNodes: true,
ForceLocalInternalVersion: ignoreInternalVersion,
MaxConcurrency: int32(maxConcurrency),
})
if err := maybeWrapStagingError("failed to stage loss of quorum recovery plan on cluster",
sr, err); err != nil {
Expand Down Expand Up @@ -919,6 +931,10 @@ See debug recover command help for more details on how to use this command.
RunE: runDebugVerify,
}

var debugRecoverVerifyOpts struct {
maxConcurrency int
}

func runDebugVerify(cmd *cobra.Command, args []string) error {
// We must have cancellable context here to obtain grpc client connection.
ctx, cancel := context.WithCancel(cmd.Context())
Expand Down Expand Up @@ -952,6 +968,7 @@ func runDebugVerify(cmd *cobra.Command, args []string) error {
req := serverpb.RecoveryVerifyRequest{
DecommissionedNodeIDs: updatePlan.DecommissionedNodeIDs,
MaxReportedRanges: 20,
MaxConcurrency: int32(debugRecoverVerifyOpts.maxConcurrency),
}
// Maybe switch to non-nullable?
if !updatePlan.PlanID.Equal(uuid.UUID{}) {
Expand Down Expand Up @@ -1155,14 +1172,23 @@ func getCLIClusterFlags(fromCfg bool, cmd *cobra.Command, filter func(flag strin
return buf.String()
}

// debugRecoverDefaultMaxConcurrency is the default concurrency that will be
// used when fanning out RPCs to nodes in the cluster while servicing a debug
// recover command.
var debugRecoverDefaultMaxConcurrency = 2 * runtime.GOMAXPROCS(0)

// setDebugRecoverContextDefaults resets values of command line flags to
// their default values to ensure tests don't interfere with each other.
func setDebugRecoverContextDefaults() {
debugRecoverCollectInfoOpts.Stores.Specs = nil
debugRecoverCollectInfoOpts.maxConcurrency = debugRecoverDefaultMaxConcurrency
debugRecoverPlanOpts.outputFileName = ""
debugRecoverPlanOpts.confirmAction = prompt
debugRecoverPlanOpts.deadStoreIDs = nil
debugRecoverPlanOpts.deadStoreIDs = nil
debugRecoverPlanOpts.deadNodeIDs = nil
debugRecoverPlanOpts.maxConcurrency = debugRecoverDefaultMaxConcurrency
debugRecoverExecuteOpts.Stores.Specs = nil
debugRecoverExecuteOpts.confirmAction = prompt
debugRecoverExecuteOpts.maxConcurrency = debugRecoverDefaultMaxConcurrency
debugRecoverVerifyOpts.maxConcurrency = debugRecoverDefaultMaxConcurrency
}
2 changes: 2 additions & 0 deletions pkg/cli/debug_recover_loss_of_quorum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ func TestStageVersionCheck(t *testing.T) {
AllNodes: true,
ForcePlan: false,
ForceLocalInternalVersion: false,
MaxConcurrency: -1, // no limit
})
require.ErrorContains(t, err, "doesn't match cluster active version")
// Enable "stuck upgrade bypass" to stage plan on the cluster.
Expand All @@ -400,6 +401,7 @@ func TestStageVersionCheck(t *testing.T) {
AllNodes: true,
ForcePlan: false,
ForceLocalInternalVersion: true,
MaxConcurrency: -1, // no limit
})
require.NoError(t, err, "force local must fix incorrect version")
// Check that stored plan has version matching cluster version.
Expand Down
15 changes: 13 additions & 2 deletions pkg/kv/kvserver/loqrecovery/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,21 @@ type CollectionStats struct {
Descriptors int
}

// CollectRemoteReplicaInfo retrieves information about:
// 1. range descriptors contained in cluster meta ranges if meta ranges
// are readable;
// 2. replica information from all live nodes that have connection to
// the target node.
//
// maxConcurrency is the maximum parallelism that will be used when fanning out
// RPCs to nodes in the cluster. A value of 0 disables concurrency. A negative
// value configures no limit for concurrency.
func CollectRemoteReplicaInfo(
ctx context.Context, c serverpb.AdminClient,
ctx context.Context, c serverpb.AdminClient, maxConcurrency int,
) (loqrecoverypb.ClusterReplicaInfo, CollectionStats, error) {
cc, err := c.RecoveryCollectReplicaInfo(ctx, &serverpb.RecoveryCollectReplicaInfoRequest{})
cc, err := c.RecoveryCollectReplicaInfo(ctx, &serverpb.RecoveryCollectReplicaInfoRequest{
MaxConcurrency: int32(maxConcurrency),
})
if err != nil {
return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/loqrecovery/collect_raft_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func TestCollectLeaseholderStatus(t *testing.T) {
// Note: we need to retry because replica collection is not atomic and
// leaseholder could move around so we could see none or more than one.
testutils.SucceedsSoon(t, func() error {
replicas, _, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm)
replicas, _, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */)
require.NoError(t, err, "failed to collect replica info")

foundLeaseholders := 0
Expand Down
Loading

0 comments on commit fe468ee

Please sign in to comment.