Skip to content

Commit

Permalink
cli: add --max-concurrency flag to debug recover commands
Browse files Browse the repository at this point in the history
Informs cockroachdb#122639.

This commit adds a `--max-concurrency` flag to the following `debug
recover` commands:
- `debug recovery collect-info`
- `debug recovery make-plan`
- `debug recovery apply-plan`
- `debug recovery verify`

The flag controls the maximum concurrency when fanning out RPCs to nodes
in the cluster while servicing the command. It defaults to `2 x num_cpus`,
which is a decent proxy for how much fanout the process can tolerate without
overloading itself.

Some plumbing is performed, but the flags are currently unused.

Release note (cli change): Added `--max-concurrency` flag to `debug
recover` commands to control the maximum concurrency when fanning out
RPCs to nodes in the cluster. The flag defaults to `2 x num_cpus`.
  • Loading branch information
nvanbenschoten committed Apr 24, 2024
1 parent 848fce0 commit fac74db
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 15 deletions.
6 changes: 6 additions & 0 deletions docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -7689,6 +7689,10 @@ Support status: [reserved](#support-status)



| Field | Type | Label | Description | Support status |
| ----- | ---- | ----- | ----------- | -------------- |
| max_concurrency | [int32](#cockroach.server.serverpb.RecoveryCollectReplicaInfoRequest-int32) | | MaxConcurrency is the maximum parallelism that will be used when fanning out RPCs to nodes in the cluster while servicing this request. A value of 0 disables concurrency. A negative value configures no limit for concurrency. | [reserved](#support-status) |




Expand Down Expand Up @@ -7799,6 +7803,7 @@ Support status: [reserved](#support-status)
| all_nodes | [bool](#cockroach.server.serverpb.RecoveryStagePlanRequest-bool) | | If all nodes is true, then receiver should act as a coordinator and perform a fan-out to stage plan on all nodes of the cluster. | [reserved](#support-status) |
| force_plan | [bool](#cockroach.server.serverpb.RecoveryStagePlanRequest-bool) | | ForcePlan tells receiver to ignore any plan already staged on the node if it is present and replace it with new plan (including empty one). | [reserved](#support-status) |
| force_local_internal_version | [bool](#cockroach.server.serverpb.RecoveryStagePlanRequest-bool) | | ForceLocalInternalVersion tells server to update internal component of plan version to the one of active cluster version. This option needs to be set if target cluster is stuck in recovery where only part of nodes were successfully migrated. | [reserved](#support-status) |
| max_concurrency | [int32](#cockroach.server.serverpb.RecoveryStagePlanRequest-int32) | | MaxConcurrency is the maximum parallelism that will be used when fanning out RPCs to nodes in the cluster while servicing this request. A value of 0 disables concurrency. A negative value configures no limit for concurrency. | [reserved](#support-status) |



Expand Down Expand Up @@ -7888,6 +7893,7 @@ Support status: [reserved](#support-status)
| plan_id | [bytes](#cockroach.server.serverpb.RecoveryVerifyRequest-bytes) | | PlanID is ID of the plan to verify. | [reserved](#support-status) |
| decommissioned_node_ids | [int32](#cockroach.server.serverpb.RecoveryVerifyRequest-int32) | repeated | DecommissionedNodeIDs is a set of nodes that should be marked as decommissioned in the cluster when loss of quorum recovery successfully applies. | [reserved](#support-status) |
| max_reported_ranges | [int32](#cockroach.server.serverpb.RecoveryVerifyRequest-int32) | | MaxReportedRanges is the maximum number of failed ranges to report. If more unhealthy ranges are found, error will be returned alongside range to indicate that ranges were cut short. | [reserved](#support-status) |
| max_concurrency | [int32](#cockroach.server.serverpb.RecoveryVerifyRequest-int32) | | MaxConcurrency is the maximum parallelism that will be used when fanning out RPCs to nodes in the cluster while servicing this request. A value of 0 disables concurrency. A negative value configures no limit for concurrency. | [reserved](#support-status) |



Expand Down
10 changes: 10 additions & 0 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1515,6 +1515,8 @@ func init() {

f = debugRecoverCollectInfoCmd.Flags()
f.VarP(&debugRecoverCollectInfoOpts.Stores, cliflags.RecoverStore.Name, cliflags.RecoverStore.Shorthand, cliflags.RecoverStore.Usage())
f.IntVarP(&debugRecoverCollectInfoOpts.maxConcurrency, "max-concurrency", "c", debugRecoverDefaultMaxConcurrency,
"maximum concurrency when fanning out RPCs to nodes in the cluster")

f = debugRecoverPlanCmd.Flags()
f.StringVarP(&debugRecoverPlanOpts.outputFileName, "plan", "o", "",
Expand All @@ -1528,6 +1530,8 @@ func init() {
f.BoolVar(&debugRecoverPlanOpts.force, "force", false,
"force creation of plan even when problems were encountered; applying this plan may "+
"result in additional problems and should be done only with care and as a last resort")
f.IntVarP(&debugRecoverPlanOpts.maxConcurrency, "max-concurrency", "c", debugRecoverDefaultMaxConcurrency,
"maximum concurrency when fanning out RPCs to nodes in the cluster")
f.UintVar(&formatHelper.maxPrintedKeyLength, cliflags.PrintKeyLength.Name,
formatHelper.maxPrintedKeyLength, cliflags.PrintKeyLength.Usage())

Expand All @@ -1539,6 +1543,12 @@ func init() {
formatHelper.maxPrintedKeyLength, cliflags.PrintKeyLength.Usage())
f.BoolVar(&debugRecoverExecuteOpts.ignoreInternalVersion, cliflags.RecoverIgnoreInternalVersion.Name,
debugRecoverExecuteOpts.ignoreInternalVersion, cliflags.RecoverIgnoreInternalVersion.Usage())
f.IntVarP(&debugRecoverExecuteOpts.maxConcurrency, "max-concurrency", "c", debugRecoverDefaultMaxConcurrency,
"maximum concurrency when fanning out RPCs to nodes in the cluster")

f = debugRecoverVerifyCmd.Flags()
f.IntVarP(&debugRecoverVerifyOpts.maxConcurrency, "max-concurrency", "c", debugRecoverDefaultMaxConcurrency,
"maximum concurrency when fanning out RPCs to nodes in the cluster")

f = debugMergeLogsCmd.Flags()
f.Var(flagutil.Time(&debugMergeLogsOpts.from), "from",
Expand Down
40 changes: 33 additions & 7 deletions pkg/cli/debug_recover_loss_of_quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"io"
"os"
"path"
"runtime"
"sort"
"strings"

Expand Down Expand Up @@ -297,7 +298,8 @@ See debug recover command help for more details on how to use this command.
}

var debugRecoverCollectInfoOpts struct {
Stores base.StoreSpecList
Stores base.StoreSpecList
maxConcurrency int
}

func runDebugDeadReplicaCollect(cmd *cobra.Command, args []string) error {
Expand All @@ -316,7 +318,7 @@ func runDebugDeadReplicaCollect(cmd *cobra.Command, args []string) error {
return errors.Wrapf(err, "failed to get admin connection to cluster")
}
defer finish()
replicaInfo, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c)
replicaInfo, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c, debugRecoverCollectInfoOpts.maxConcurrency)
if err != nil {
return errors.WithHint(errors.Wrap(err,
"failed to retrieve replica info from cluster"),
Expand Down Expand Up @@ -402,6 +404,7 @@ var debugRecoverPlanOpts struct {
deadNodeIDs []int
confirmAction confirmActionFlag
force bool
maxConcurrency int
}

var planSpecificFlags = map[string]struct{}{
Expand Down Expand Up @@ -432,7 +435,7 @@ func runDebugPlanReplicaRemoval(cmd *cobra.Command, args []string) error {
return errors.Wrapf(err, "failed to get admin connection to cluster")
}
defer finish()
replicas, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c)
replicas, stats, err = loqrecovery.CollectRemoteReplicaInfo(ctx, c, debugRecoverPlanOpts.maxConcurrency)
if err != nil {
return errors.Wrapf(err, "failed to retrieve replica info from cluster")
}
Expand Down Expand Up @@ -643,6 +646,7 @@ var debugRecoverExecuteOpts struct {
Stores base.StoreSpecList
confirmAction confirmActionFlag
ignoreInternalVersion bool
maxConcurrency int
}

// runDebugExecuteRecoverPlan is using the following pattern when performing command
Expand Down Expand Up @@ -670,7 +674,7 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error {

if len(debugRecoverExecuteOpts.Stores.Specs) == 0 {
return stageRecoveryOntoCluster(ctx, cmd, planFile, nodeUpdates,
debugRecoverExecuteOpts.ignoreInternalVersion)
debugRecoverExecuteOpts.ignoreInternalVersion, debugRecoverExecuteOpts.maxConcurrency)
}
return applyRecoveryToLocalStore(ctx, nodeUpdates, debugRecoverExecuteOpts.ignoreInternalVersion)
}
Expand All @@ -681,6 +685,7 @@ func stageRecoveryOntoCluster(
planFile string,
plan loqrecoverypb.ReplicaUpdatePlan,
ignoreInternalVersion bool,
maxConcurrency int,
) error {
c, finish, err := getAdminClient(ctx, serverCfg)
if err != nil {
Expand All @@ -693,7 +698,9 @@ func stageRecoveryOntoCluster(
nodeID roachpb.NodeID
planID string
}
vr, err := c.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{})
vr, err := c.RecoveryVerify(ctx, &serverpb.RecoveryVerifyRequest{
MaxConcurrency: int32(maxConcurrency),
})
if err != nil {
return errors.Wrap(err, "failed to retrieve loss of quorum recovery status from cluster")
}
Expand Down Expand Up @@ -754,7 +761,11 @@ func stageRecoveryOntoCluster(
// We don't want to combine removing old plan and adding new one since it
// could produce cluster with multiple plans at the same time which could
// make situation worse.
res, err := c.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{AllNodes: true, ForcePlan: true})
res, err := c.RecoveryStagePlan(ctx, &serverpb.RecoveryStagePlanRequest{
AllNodes: true,
ForcePlan: true,
MaxConcurrency: int32(maxConcurrency),
})
if err := maybeWrapStagingError("failed removing existing loss of quorum replica recovery plan from cluster", res, err); err != nil {
return err
}
Expand All @@ -763,6 +774,7 @@ func stageRecoveryOntoCluster(
Plan: &plan,
AllNodes: true,
ForceLocalInternalVersion: ignoreInternalVersion,
MaxConcurrency: int32(maxConcurrency),
})
if err := maybeWrapStagingError("failed to stage loss of quorum recovery plan on cluster",
sr, err); err != nil {
Expand Down Expand Up @@ -919,6 +931,10 @@ See debug recover command help for more details on how to use this command.
RunE: runDebugVerify,
}

var debugRecoverVerifyOpts struct {
maxConcurrency int
}

func runDebugVerify(cmd *cobra.Command, args []string) error {
// We must have cancellable context here to obtain grpc client connection.
ctx, cancel := context.WithCancel(cmd.Context())
Expand Down Expand Up @@ -952,6 +968,7 @@ func runDebugVerify(cmd *cobra.Command, args []string) error {
req := serverpb.RecoveryVerifyRequest{
DecommissionedNodeIDs: updatePlan.DecommissionedNodeIDs,
MaxReportedRanges: 20,
MaxConcurrency: int32(debugRecoverVerifyOpts.maxConcurrency),
}
// Maybe switch to non-nullable?
if !updatePlan.PlanID.Equal(uuid.UUID{}) {
Expand Down Expand Up @@ -1155,14 +1172,23 @@ func getCLIClusterFlags(fromCfg bool, cmd *cobra.Command, filter func(flag strin
return buf.String()
}

// debugRecoverDefaultMaxConcurrency is the default concurrency that will be
// used when fanning out RPCs to nodes in the cluster while servicing a debug
// recover command.
var debugRecoverDefaultMaxConcurrency = 2 * runtime.GOMAXPROCS(0)

// setDebugRecoverContextDefaults resets values of command line flags to
// their default values to ensure tests don't interfere with each other.
func setDebugRecoverContextDefaults() {
debugRecoverCollectInfoOpts.Stores.Specs = nil
debugRecoverCollectInfoOpts.maxConcurrency = debugRecoverDefaultMaxConcurrency
debugRecoverPlanOpts.outputFileName = ""
debugRecoverPlanOpts.confirmAction = prompt
debugRecoverPlanOpts.deadStoreIDs = nil
debugRecoverPlanOpts.deadStoreIDs = nil
debugRecoverPlanOpts.deadNodeIDs = nil
debugRecoverPlanOpts.maxConcurrency = debugRecoverDefaultMaxConcurrency
debugRecoverExecuteOpts.Stores.Specs = nil
debugRecoverExecuteOpts.confirmAction = prompt
debugRecoverExecuteOpts.maxConcurrency = debugRecoverDefaultMaxConcurrency
debugRecoverVerifyOpts.maxConcurrency = debugRecoverDefaultMaxConcurrency
}
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/loqrecovery/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ type CollectionStats struct {
}

func CollectRemoteReplicaInfo(
ctx context.Context, c serverpb.AdminClient,
ctx context.Context, c serverpb.AdminClient, maxConcurrency int,
) (loqrecoverypb.ClusterReplicaInfo, CollectionStats, error) {
cc, err := c.RecoveryCollectReplicaInfo(ctx, &serverpb.RecoveryCollectReplicaInfoRequest{})
cc, err := c.RecoveryCollectReplicaInfo(ctx, &serverpb.RecoveryCollectReplicaInfoRequest{
MaxConcurrency: int32(maxConcurrency),
})
if err != nil {
return loqrecoverypb.ClusterReplicaInfo{}, CollectionStats{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/loqrecovery/collect_raft_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func TestCollectLeaseholderStatus(t *testing.T) {
// Note: we need to retry because replica collection is not atomic and
// leaseholder could move around so we could see none or more than one.
testutils.SucceedsSoon(t, func() error {
replicas, _, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm)
replicas, _, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */)
require.NoError(t, err, "failed to collect replica info")

foundLeaseholders := 0
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/loqrecovery/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestReplicaCollection(t *testing.T) {
var replicas loqrecoverypb.ClusterReplicaInfo
var stats loqrecovery.CollectionStats

replicas, stats, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm)
replicas, stats, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */)
require.NoError(t, err, "failed to retrieve replica info")

// Check counters on retrieved replica info.
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestStreamRestart(t *testing.T) {
var replicas loqrecoverypb.ClusterReplicaInfo
var stats loqrecovery.CollectionStats

replicas, stats, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm)
replicas, stats, err := loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */)
require.NoError(t, err, "failed to retrieve replica info")

// Check counters on retrieved replica info.
Expand Down Expand Up @@ -570,7 +570,7 @@ func TestRetrieveApplyStatus(t *testing.T) {
var replicas loqrecoverypb.ClusterReplicaInfo
testutils.SucceedsSoon(t, func() error {
var err error
replicas, _, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm)
replicas, _, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */)
return err
})
plan, planDetails, err := loqrecovery.PlanReplicas(ctx, replicas, nil, nil, uuid.DefaultGenerator)
Expand Down Expand Up @@ -646,7 +646,7 @@ func TestRejectBadVersionApplication(t *testing.T) {
var replicas loqrecoverypb.ClusterReplicaInfo
testutils.SucceedsSoon(t, func() error {
var err error
replicas, _, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm)
replicas, _, err = loqrecovery.CollectRemoteReplicaInfo(ctx, adm, -1 /* maxConcurrency */)
return err
})
plan, _, err := loqrecovery.PlanReplicas(ctx, replicas, nil, nil, uuid.DefaultGenerator)
Expand Down
15 changes: 14 additions & 1 deletion pkg/server/serverpb/admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,12 @@ message ChartCatalogResponse {
repeated cockroach.ts.catalog.ChartSection catalog = 1 [(gogoproto.nullable) = false];
}

message RecoveryCollectReplicaInfoRequest {}
message RecoveryCollectReplicaInfoRequest {
// MaxConcurrency is the maximum parallelism that will be used when fanning
// out RPCs to nodes in the cluster while servicing this request. A value of 0
// disables concurrency. A negative value configures no limit for concurrency.
int32 max_concurrency = 1;
}

// RecoveryCollectReplicaRestartNodeStream is sent by collector node to client
// if it experiences a transient failure collecting data from one of the nodes.
Expand Down Expand Up @@ -933,6 +938,10 @@ message RecoveryStagePlanRequest {
// if target cluster is stuck in recovery where only part of nodes were
// successfully migrated.
bool force_local_internal_version = 4;
// MaxConcurrency is the maximum parallelism that will be used when fanning
// out RPCs to nodes in the cluster while servicing this request. A value of 0
// disables concurrency. A negative value configures no limit for concurrency.
int32 max_concurrency = 5;
}

message RecoveryStagePlanResponse {
Expand Down Expand Up @@ -961,6 +970,10 @@ message RecoveryVerifyRequest {
// If more unhealthy ranges are found, error will be returned alongside range
// to indicate that ranges were cut short.
int32 max_reported_ranges = 3;
// MaxConcurrency is the maximum parallelism that will be used when fanning
// out RPCs to nodes in the cluster while servicing this request. A value of 0
// disables concurrency. A negative value configures no limit for concurrency.
int32 max_concurrency = 4;
}

message RecoveryVerifyResponse {
Expand Down

0 comments on commit fac74db

Please sign in to comment.