From f4e1493f0bf45eeea087bcd68668175e7a2d2750 Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Tue, 6 Jun 2023 19:31:42 -0700 Subject: [PATCH 01/21] topicctl new action: rebalance --- cmd/topicctl/subcmd/rebalance.go | 323 +++++++++++++++++++++++++++++++ pkg/apply/apply.go | 34 ++++ pkg/util/progress.go | 82 ++++++++ 3 files changed, 439 insertions(+) create mode 100644 cmd/topicctl/subcmd/rebalance.go create mode 100644 pkg/util/progress.go diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go new file mode 100644 index 00000000..8f1ffe90 --- /dev/null +++ b/cmd/topicctl/subcmd/rebalance.go @@ -0,0 +1,323 @@ +package subcmd + +import ( + "context" + "fmt" + "github.com/spf13/cobra" + "os" + "os/signal" + "path/filepath" + "strconv" + "syscall" + "time" + + "github.com/segmentio/topicctl/pkg/admin" + "github.com/segmentio/topicctl/pkg/apply" + "github.com/segmentio/topicctl/pkg/cli" + "github.com/segmentio/topicctl/pkg/config" + "github.com/segmentio/topicctl/pkg/util" + log "github.com/sirupsen/logrus" +) + +var rebalanceCmd = &cobra.Command{ + Use: "rebalance", + Short: "rebalance all topic configs for a kafka cluster", + PreRunE: rebalancePreRun, + RunE: rebalanceRun, +} + +type rebalanceCmdConfig struct { + brokersToRemove []int + brokerThrottleMBsOverride int + dryRun bool + partitionBatchSizeOverride int + pathPrefix string + sleepLoopDuration time.Duration + showProgressInterval time.Duration + + shared sharedOptions +} + +var rebalanceConfig rebalanceCmdConfig + +func init() { + rebalanceCmd.Flags().IntSliceVar( + &rebalanceConfig.brokersToRemove, + "to-remove", + []int{}, + "Brokers to remove; only applies if rebalance is also set", + ) + rebalanceCmd.Flags().IntVar( + &rebalanceConfig.brokerThrottleMBsOverride, + "broker-throttle-mb", + 0, + "Broker throttle override (MB/sec)", + ) + rebalanceCmd.Flags().BoolVar( + &rebalanceConfig.dryRun, + "dry-run", + false, + "Do a dry-run", + ) + rebalanceCmd.Flags().IntVar( + &rebalanceConfig.partitionBatchSizeOverride, + "partition-batch-size", + 0, + "Partition batch size override", + ) + rebalanceCmd.Flags().StringVar( + &rebalanceConfig.pathPrefix, + "path-prefix", + os.Getenv("TOPICCTL_APPLY_PATH_PREFIX"), + "Prefix for topic config paths", + ) + rebalanceCmd.Flags().DurationVar( + &rebalanceConfig.sleepLoopDuration, + "sleep-loop-duration", + 10*time.Second, + "Amount of time to wait between partition checks", + ) + rebalanceCmd.Flags().DurationVar( + &rebalanceConfig.showProgressInterval, + "show-progress-interval", + 0*time.Second, + "show progress during rebalance at intervals", + ) + + addSharedConfigOnlyFlags(rebalanceCmd, &rebalanceConfig.shared) + RootCmd.AddCommand(rebalanceCmd) +} + +func rebalancePreRun(cmd *cobra.Command, args []string) error { + if rebalanceConfig.shared.clusterConfig == "" || rebalanceConfig.pathPrefix == "" { + return fmt.Errorf("Must set args --cluster-config & --path-prefix (or) env variables TOPICCTL_CLUSTER_CONFIG & TOPICCTL_APPLY_PATH_PREFIX") + } + + return nil +} + +func rebalanceRun(cmd *cobra.Command, args []string) error { + ctx := context.Background() + rebalanceCtxMap, err := getRebalanceCtxMap(&rebalanceConfig) + if err != nil { + return err + } + ctx = context.WithValue(ctx, "progress", rebalanceCtxMap) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) + go func() { + // for an interrupt, cancel context and exit program to end all topic rebalances + <-sigChan + cancel() + os.Exit(1) + }() + + clusterConfigPath := rebalanceConfig.shared.clusterConfig + topicConfigDir := rebalanceConfig.pathPrefix + clusterConfig, err := config.LoadClusterFile(clusterConfigPath, rebalanceConfig.shared.expandEnv) + if err != nil { + return err + } + + adminClient, err := clusterConfig.NewAdminClient(ctx, + nil, + rebalanceConfig.dryRun, + rebalanceConfig.shared.saslUsername, + rebalanceConfig.shared.saslPassword, + ) + if err != nil { + log.Fatal(err) + } + defer adminClient.Close() + + // get all topic configs from path-prefix i.e topics folder + // this folder should only have topic configs + // recursive search is not performed in the path-prefix + log.Infof("Getting all topic configs from path prefix (from root) %v", topicConfigDir) + matches, err := filepath.Glob(topicConfigDir + "/**") + if err != nil { + return err + } + + // iterate through each topic config and initiate rebalance + topicConfigs := []config.TopicConfig{} + topicErrorDict := make(map[string]error) + for _, match := range matches { + topicConfigs, err = config.LoadTopicsFile(match) + if err != nil { + return err + } + for _, topicConfig := range topicConfigs { + log.Infof( + "Rebalancing topic %s in config %s with cluster config %s", + topicConfig.Meta.Name, + match, + clusterConfigPath, + ) + + topicErrorDict[topicConfig.Meta.Name] = nil + rebalanceTopicProgressConfig := util.RebalanceTopicProgressConfig{ + TopicName: topicConfig.Meta.Name, + ClusterName: clusterConfig.Meta.Name, + ClusterEnvironment: clusterConfig.Meta.Environment, + ToRemove: rebalanceConfig.brokersToRemove, + RebalanceError: false, + } + if err := rebalanceApplyTopic(ctx, topicConfig, clusterConfig, adminClient); err != nil { + topicErrorDict[topicConfig.Meta.Name] = err + rebalanceTopicProgressConfig.RebalanceError = true + log.Errorf("Ignoring topic %v for rebalance. Got error: %+v", topicConfig.Meta.Name, err) + } + + // show topic final progress + if rebalanceCtxMap.Enabled { + progressStr, err := util.DictToStr(rebalanceTopicProgressConfig) + if err != nil { + log.Errorf("Got error: %+v", err) + } else { + log.Infof("Progress: %s", progressStr) + } + } + } + } + + // audit at the end of all topic rebalances + successTopics := 0 + errorTopics := 0 + for thisTopicName, thisTopicError := range topicErrorDict { + if thisTopicError != nil { + errorTopics += 1 + log.Errorf("topic: %s failed with error: %v", thisTopicName, thisTopicError) + } else { + successTopics += 1 + } + } + log.Infof("Overall rebalance - success topics: %d, error topics: %d", successTopics, errorTopics) + + // show overall rebalance progress + if rebalanceCtxMap.Enabled { + progressStr, err := util.DictToStr(util.RebalanceProgressConfig{ + SuccessTopics: successTopics, + ErrorTopics: errorTopics, + ClusterName: clusterConfig.Meta.Name, + ClusterEnvironment: clusterConfig.Meta.Environment, + ToRemove: rebalanceConfig.brokersToRemove, + }) + if err != nil { + log.Errorf("Got error: %+v", err) + } else { + log.Infof("Progress: %s", progressStr) + } + } + + return nil +} + +// Check whether a topic is a candidate for action rebalance +// - consistency of topic with cluster config +// - settings(partitions, retention time) of topic config with settings for topic in the cluster +func rebalanceTopicCheck( + topicConfig config.TopicConfig, + clusterConfig config.ClusterConfig, + topicInfo admin.TopicInfo, +) error { + // topic config should be same as the cluster config + if err := config.CheckConsistency(topicConfig, clusterConfig); err != nil { + return err + } + + log.Infof("Check topic partitions...") + if len(topicInfo.Partitions) != topicConfig.Spec.Partitions { + return fmt.Errorf("Topic partitions in kafka does not match with topic config file") + } + + log.Infof("Check topic retention.ms...") + if topicInfo.Config["retention.ms"] != strconv.Itoa(topicConfig.Spec.RetentionMinutes*60000) { + return fmt.Errorf("Topic retention in kafka does not match with topic config file") + } + + return nil +} + +// Perform rebalance on a topic. returns error if unsuccessful +// topic will not be rebalanced if +// - topic config is inconsistent with cluster config (name, region, environment etc...) +// - partitions of a topic in kafka cluster does not match with topic partition setting in topic config +// - retention.ms of a topic in kafka cluster does not match with topic retentionMinutes setting in topic config +// +// to ensure there are no disruptions to kafka cluster +// +// NOTE: topic that is not present in kafka cluster will not be applied +func rebalanceApplyTopic( + ctx context.Context, + topicConfig config.TopicConfig, + clusterConfig config.ClusterConfig, + adminClient admin.Client, +) error { + topicConfig.SetDefaults() + topicInfo, err := adminClient.GetTopic(ctx, topicConfig.Meta.Name, true) + if err != nil { + if err == admin.ErrTopicDoesNotExist { + return fmt.Errorf("Topic: %s does not exist in Kafka cluster", topicConfig.Meta.Name) + } + return err + } + log.Debugf("topicInfo from kafka: %+v", topicInfo) + + if err := rebalanceTopicCheck(topicConfig, clusterConfig, topicInfo); err != nil { + return err + } + + retentionDropStepDuration, err := clusterConfig.GetDefaultRetentionDropStepDuration() + if err != nil { + return err + } + + applierConfig := apply.TopicApplierConfig{ + BrokerThrottleMBsOverride: rebalanceConfig.brokerThrottleMBsOverride, + BrokersToRemove: rebalanceConfig.brokersToRemove, + ClusterConfig: clusterConfig, + DryRun: rebalanceConfig.dryRun, + PartitionBatchSizeOverride: rebalanceConfig.partitionBatchSizeOverride, + Rebalance: true, // to enforce action: rebalance + AutoContinueRebalance: true, // to continue without prompts + RetentionDropStepDuration: retentionDropStepDuration, // not needed for rebalance + SkipConfirm: true, // to enforce action: rebalance + SleepLoopDuration: rebalanceConfig.sleepLoopDuration, + TopicConfig: topicConfig, + } + + cliRunner := cli.NewCLIRunner(adminClient, log.Infof, false) + if err := cliRunner.ApplyTopic(ctx, applierConfig); err != nil { + return err + } + + return nil +} + +// build ctx map for rebalance progress +func getRebalanceCtxMap(rebalanceConfig *rebalanceCmdConfig) (util.RebalanceCtxMap, error) { + rebalanceCtxMap := util.RebalanceCtxMap{ + Enabled: true, + Interval: rebalanceConfig.showProgressInterval, + } + + zeroDur, _ := time.ParseDuration("0s") + if rebalanceConfig.showProgressInterval == zeroDur { + rebalanceCtxMap.Enabled = false + log.Infof("--progress-interval is 0s. Not showing progress...") + } else if rebalanceConfig.showProgressInterval < zeroDur { + return rebalanceCtxMap, fmt.Errorf("--show-progress-interval should be > 0s") + } + + if rebalanceConfig.dryRun { + rebalanceCtxMap.Enabled = false + log.Infof("--dry-run enabled. Not showing progress...") + return rebalanceCtxMap, nil + } + + return rebalanceCtxMap, nil +} diff --git a/pkg/apply/apply.go b/pkg/apply/apply.go index 2e2fd91a..534edbaa 100644 --- a/pkg/apply/apply.go +++ b/pkg/apply/apply.go @@ -876,6 +876,31 @@ func (t *TopicApplier) updatePlacementRunner( roundLabel, ) + // at the moment show-progress option is avilable only with action: rebalance + showProgress := false + var stop chan bool + rebalanceCtxMap, ok := ctx.Value("progress").(util.RebalanceCtxMap) + if !ok { + log.Infof("No context value found") + } else if rebalanceCtxMap.Enabled { + stop = make(chan bool) + showProgress = true + + go util.ShowProgress( + ctx, + util.RoundTopicProgressConfig{ + CurrRound: round, + TotalRounds: numRounds, + TopicName: t.topicName, + ClusterName: t.clusterConfig.Meta.Name, + ClusterEnvironment: t.clusterConfig.Meta.Environment, + ToRemove: t.config.BrokersToRemove, + }, + rebalanceCtxMap.Interval, + stop, + ) + } + err := t.updatePartitionsIteration( ctx, currDiffAssignments[i:end], @@ -884,6 +909,10 @@ func (t *TopicApplier) updatePlacementRunner( roundLabel, ) if err != nil { + // error handler. stop showing progress for this iteration + if showProgress { + stop <- true + } return err } @@ -895,6 +924,11 @@ func (t *TopicApplier) updatePlacementRunner( return errors.New("Stopping because of user response") } } + + // stop showing progress for this iteration + if showProgress { + stop <- true + } } topicInfo, err := t.adminClient.GetTopic(ctx, t.topicName, true) diff --git a/pkg/util/progress.go b/pkg/util/progress.go new file mode 100644 index 00000000..34be0591 --- /dev/null +++ b/pkg/util/progress.go @@ -0,0 +1,82 @@ +package util + +import ( + "context" + "encoding/json" + log "github.com/sirupsen/logrus" + "time" +) + +// Rebalance topic progress Config +type RebalanceTopicProgressConfig struct { + TopicName string `json:"topic"` + ClusterName string `json:"cluster"` + ClusterEnvironment string `json:"environment"` + ToRemove []int `json:"to_remove"` + RebalanceError bool `json:"rebalance_error"` +} + +// Rebalance overall progress Config +type RebalanceProgressConfig struct { + SuccessTopics int `json:"success_topics"` + ErrorTopics int `json:"error_topics"` + ClusterName string `json:"cluster"` + ClusterEnvironment string `json:"environment"` + ToRemove []int `json:"to_remove"` +} + +// Round topic progress Config +type RoundTopicProgressConfig struct { + TopicName string `json:"topic"` + ClusterName string `json:"cluster"` + ClusterEnvironment string `json:"environment"` + ToRemove []int `json:"to_remove"` + CurrRound int `json:"round"` + TotalRounds int `json:"total_rounds"` +} + +// context map for rebalance +type RebalanceCtxMap struct { + Enabled bool `json:"enabled"` + Interval time.Duration `json:"interval"` +} + +// shows progress of a config repeatedly during an interval +func ShowProgress( + ctx context.Context, + progressConfig interface{}, + interval time.Duration, + stop chan bool, +) { + progressStr, err := DictToStr(progressConfig) + if err != nil { + log.Errorf("Got error: %+v", err) + } else { + // print first before ticker starts + log.Infof("Progress: %s", progressStr) + } + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err == nil { + log.Infof("Progress: %s", progressStr) + } + case <-stop: + return + } + } +} + +// convert any dict to json string +func DictToStr(dict interface{}) (string, error) { + jsonBytes, err := json.Marshal(dict) + if err != nil { + return "{}", err + } + + return string(jsonBytes), nil +} From a548f443e5e037f4016e7ed8dc75cd4544fcac00 Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Wed, 7 Jun 2023 11:53:57 -0700 Subject: [PATCH 02/21] topicctl new action: rebalance --- pkg/apply/apply.go | 4 ++-- pkg/util/progress.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/apply/apply.go b/pkg/apply/apply.go index 534edbaa..a49566f6 100644 --- a/pkg/apply/apply.go +++ b/pkg/apply/apply.go @@ -876,7 +876,7 @@ func (t *TopicApplier) updatePlacementRunner( roundLabel, ) - // at the moment show-progress option is avilable only with action: rebalance + // at the moment show-progress option is available only with action: rebalance showProgress := false var stop chan bool rebalanceCtxMap, ok := ctx.Value("progress").(util.RebalanceCtxMap) @@ -888,7 +888,7 @@ func (t *TopicApplier) updatePlacementRunner( go util.ShowProgress( ctx, - util.RoundTopicProgressConfig{ + util.TopicRoundProgressConfig{ CurrRound: round, TotalRounds: numRounds, TopicName: t.topicName, diff --git a/pkg/util/progress.go b/pkg/util/progress.go index 34be0591..b5802906 100644 --- a/pkg/util/progress.go +++ b/pkg/util/progress.go @@ -25,8 +25,8 @@ type RebalanceProgressConfig struct { ToRemove []int `json:"to_remove"` } -// Round topic progress Config -type RoundTopicProgressConfig struct { +// Topic Round progress Config +type TopicRoundProgressConfig struct { TopicName string `json:"topic"` ClusterName string `json:"cluster"` ClusterEnvironment string `json:"environment"` From 1289c366835fbe85403e5c4e2500cc10173bac3d Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Wed, 7 Jun 2023 11:55:43 -0700 Subject: [PATCH 03/21] topicctl new action: rebalance --- cmd/topicctl/subcmd/rebalance.go | 8 ++++---- pkg/apply/apply.go | 2 +- pkg/util/progress.go | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index 8f1ffe90..b4922904 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -196,12 +196,12 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { } } log.Infof("Overall rebalance - success topics: %d, error topics: %d", successTopics, errorTopics) - + // show overall rebalance progress if rebalanceCtxMap.Enabled { progressStr, err := util.DictToStr(util.RebalanceProgressConfig{ - SuccessTopics: successTopics, - ErrorTopics: errorTopics, + SuccessTopics: successTopics, + ErrorTopics: errorTopics, ClusterName: clusterConfig.Meta.Name, ClusterEnvironment: clusterConfig.Meta.Environment, ToRemove: rebalanceConfig.brokersToRemove, @@ -301,7 +301,7 @@ func rebalanceApplyTopic( // build ctx map for rebalance progress func getRebalanceCtxMap(rebalanceConfig *rebalanceCmdConfig) (util.RebalanceCtxMap, error) { rebalanceCtxMap := util.RebalanceCtxMap{ - Enabled: true, + Enabled: true, Interval: rebalanceConfig.showProgressInterval, } diff --git a/pkg/apply/apply.go b/pkg/apply/apply.go index a49566f6..9c99a696 100644 --- a/pkg/apply/apply.go +++ b/pkg/apply/apply.go @@ -888,7 +888,7 @@ func (t *TopicApplier) updatePlacementRunner( go util.ShowProgress( ctx, - util.TopicRoundProgressConfig{ + util.RebalanceRoundProgressConfig{ CurrRound: round, TotalRounds: numRounds, TopicName: t.topicName, diff --git a/pkg/util/progress.go b/pkg/util/progress.go index b5802906..8453066e 100644 --- a/pkg/util/progress.go +++ b/pkg/util/progress.go @@ -25,8 +25,8 @@ type RebalanceProgressConfig struct { ToRemove []int `json:"to_remove"` } -// Topic Round progress Config -type TopicRoundProgressConfig struct { +// Rebalance Topic Round progress Config +type RebalanceRoundProgressConfig struct { TopicName string `json:"topic"` ClusterName string `json:"cluster"` ClusterEnvironment string `json:"environment"` From afc0bb8fc4abb64f826d42d6f005004e5c87f2de Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Wed, 7 Jun 2023 21:56:45 -0700 Subject: [PATCH 04/21] topicctl new action: rebalance --- cmd/topicctl/subcmd/rebalance.go | 10 +++++----- pkg/apply/apply.go | 10 +++++----- pkg/util/progress.go | 12 ++++++------ 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index b4922904..ae76e989 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -90,7 +90,7 @@ func init() { func rebalancePreRun(cmd *cobra.Command, args []string) error { if rebalanceConfig.shared.clusterConfig == "" || rebalanceConfig.pathPrefix == "" { - return fmt.Errorf("Must set args --cluster-config & --path-prefix (or) env variables TOPICCTL_CLUSTER_CONFIG & TOPICCTL_APPLY_PATH_PREFIX") + return fmt.Errorf("Requires args --cluster-config & --path-prefix (or) env variables TOPICCTL_CLUSTER_CONFIG & TOPICCTL_APPLY_PATH_PREFIX") } return nil @@ -174,7 +174,7 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { // show topic final progress if rebalanceCtxMap.Enabled { - progressStr, err := util.DictToStr(rebalanceTopicProgressConfig) + progressStr, err := util.MapToStr(rebalanceTopicProgressConfig) if err != nil { log.Errorf("Got error: %+v", err) } else { @@ -195,11 +195,11 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { successTopics += 1 } } - log.Infof("Overall rebalance - success topics: %d, error topics: %d", successTopics, errorTopics) + log.Infof("Rebalance summary - success topics: %d, error topics: %d", successTopics, errorTopics) - // show overall rebalance progress + // show overall rebalance summary report if rebalanceCtxMap.Enabled { - progressStr, err := util.DictToStr(util.RebalanceProgressConfig{ + progressStr, err := util.MapToStr(util.RebalanceProgressConfig{ SuccessTopics: successTopics, ErrorTopics: errorTopics, ClusterName: clusterConfig.Meta.Name, diff --git a/pkg/apply/apply.go b/pkg/apply/apply.go index 9c99a696..4229deb1 100644 --- a/pkg/apply/apply.go +++ b/pkg/apply/apply.go @@ -878,12 +878,12 @@ func (t *TopicApplier) updatePlacementRunner( // at the moment show-progress option is available only with action: rebalance showProgress := false - var stop chan bool + var stopChan chan bool rebalanceCtxMap, ok := ctx.Value("progress").(util.RebalanceCtxMap) if !ok { log.Infof("No context value found") } else if rebalanceCtxMap.Enabled { - stop = make(chan bool) + stopChan = make(chan bool) showProgress = true go util.ShowProgress( @@ -897,7 +897,7 @@ func (t *TopicApplier) updatePlacementRunner( ToRemove: t.config.BrokersToRemove, }, rebalanceCtxMap.Interval, - stop, + stopChan, ) } @@ -911,7 +911,7 @@ func (t *TopicApplier) updatePlacementRunner( if err != nil { // error handler. stop showing progress for this iteration if showProgress { - stop <- true + stopChan <- true } return err } @@ -927,7 +927,7 @@ func (t *TopicApplier) updatePlacementRunner( // stop showing progress for this iteration if showProgress { - stop <- true + stopChan <- true } } diff --git a/pkg/util/progress.go b/pkg/util/progress.go index 8453066e..ae963b40 100644 --- a/pkg/util/progress.go +++ b/pkg/util/progress.go @@ -46,9 +46,9 @@ func ShowProgress( ctx context.Context, progressConfig interface{}, interval time.Duration, - stop chan bool, + stopChan chan bool, ) { - progressStr, err := DictToStr(progressConfig) + progressStr, err := MapToStr(progressConfig) if err != nil { log.Errorf("Got error: %+v", err) } else { @@ -65,15 +65,15 @@ func ShowProgress( if err == nil { log.Infof("Progress: %s", progressStr) } - case <-stop: + case <-stopChan: return } } } -// convert any dict to json string -func DictToStr(dict interface{}) (string, error) { - jsonBytes, err := json.Marshal(dict) +// convert any map to json string +func MapToStr(inputMap interface{}) (string, error) { + jsonBytes, err := json.Marshal(inputMap) if err != nil { return "{}", err } From f24d3750e9ef7fa00a20a482b075e93e469885a4 Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Thu, 8 Jun 2023 19:18:51 -0700 Subject: [PATCH 05/21] topicctl new action: rebalance --- cmd/topicctl/subcmd/rebalance.go | 63 +++++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 17 deletions(-) diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index ae76e989..3842cde7 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -134,10 +134,9 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { defer adminClient.Close() // get all topic configs from path-prefix i.e topics folder - // this folder should only have topic configs - // recursive search is not performed in the path-prefix - log.Infof("Getting all topic configs from path prefix (from root) %v", topicConfigDir) - matches, err := filepath.Glob(topicConfigDir + "/**") + // recursive search is performed in the path-prefix + log.Infof("Getting all topic configs from path prefix %v", topicConfigDir) + topicFiles, err := getAllFiles(topicConfigDir) if err != nil { return err } @@ -145,16 +144,30 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { // iterate through each topic config and initiate rebalance topicConfigs := []config.TopicConfig{} topicErrorDict := make(map[string]error) - for _, match := range matches { - topicConfigs, err = config.LoadTopicsFile(match) + for _, topicFile := range topicFiles { + // ignore any cluster.yaml files in the --path-prefix + if filepath.Base(topicFile) == "cluster.yaml" { + log.Warnf("Not a valid topic yaml file: %s", topicFile) + continue + } + + topicConfigs, err = config.LoadTopicsFile(topicFile) if err != nil { - return err + log.Errorf("Invalid topic yaml file: %s", topicFile) + continue } + for _, topicConfig := range topicConfigs { + // topic config should be consistent with the cluster config + if err := config.CheckConsistency(topicConfig, clusterConfig); err != nil { + log.Errorf("topic file: %s inconsistent with cluster: %s", topicFile, clusterConfigPath) + continue + } + log.Infof( "Rebalancing topic %s in config %s with cluster config %s", topicConfig.Meta.Name, - match, + topicFile, clusterConfigPath, ) @@ -217,18 +230,11 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { } // Check whether a topic is a candidate for action rebalance -// - consistency of topic with cluster config -// - settings(partitions, retention time) of topic config with settings for topic in the cluster +// settings(partitions, retention time) of topic config with settings for topic in the cluster func rebalanceTopicCheck( topicConfig config.TopicConfig, - clusterConfig config.ClusterConfig, topicInfo admin.TopicInfo, ) error { - // topic config should be same as the cluster config - if err := config.CheckConsistency(topicConfig, clusterConfig); err != nil { - return err - } - log.Infof("Check topic partitions...") if len(topicInfo.Partitions) != topicConfig.Spec.Partitions { return fmt.Errorf("Topic partitions in kafka does not match with topic config file") @@ -267,7 +273,7 @@ func rebalanceApplyTopic( } log.Debugf("topicInfo from kafka: %+v", topicInfo) - if err := rebalanceTopicCheck(topicConfig, clusterConfig, topicInfo); err != nil { + if err := rebalanceTopicCheck(topicConfig, topicInfo); err != nil { return err } @@ -321,3 +327,26 @@ func getRebalanceCtxMap(rebalanceConfig *rebalanceCmdConfig) (util.RebalanceCtxM return rebalanceCtxMap, nil } + +// get all files for a given dir path +func getAllFiles(dir string) ([]string, error) { + var files []string + + err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if !info.IsDir() { + files = append(files, path) + } + + return nil + }) + + if err != nil { + return nil, err + } + + return files, err +} From 7f4d8204014ca019dc95e50915e7a16d65ff045d Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Thu, 8 Jun 2023 19:23:22 -0700 Subject: [PATCH 06/21] topicctl new action: rebalance --- cmd/topicctl/subcmd/rebalance.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index 3842cde7..854a3681 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -145,12 +145,13 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { topicConfigs := []config.TopicConfig{} topicErrorDict := make(map[string]error) for _, topicFile := range topicFiles { - // ignore any cluster.yaml files in the --path-prefix + // ignore any cluster.yaml files in the --path-prefix for rebalance if filepath.Base(topicFile) == "cluster.yaml" { log.Warnf("Not a valid topic yaml file: %s", topicFile) continue } + // do not consider invaalid topic yaml files for rebalance topicConfigs, err = config.LoadTopicsFile(topicFile) if err != nil { log.Errorf("Invalid topic yaml file: %s", topicFile) @@ -250,7 +251,6 @@ func rebalanceTopicCheck( // Perform rebalance on a topic. returns error if unsuccessful // topic will not be rebalanced if -// - topic config is inconsistent with cluster config (name, region, environment etc...) // - partitions of a topic in kafka cluster does not match with topic partition setting in topic config // - retention.ms of a topic in kafka cluster does not match with topic retentionMinutes setting in topic config // From aa90036c2f2497b5cc0b3f61abf60f75a84e7f38 Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Thu, 8 Jun 2023 19:24:09 -0700 Subject: [PATCH 07/21] topicctl new action: rebalance --- cmd/topicctl/subcmd/rebalance.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index 854a3681..92d0b7df 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -151,7 +151,7 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { continue } - // do not consider invaalid topic yaml files for rebalance + // do not consider invalid topic yaml files for rebalance topicConfigs, err = config.LoadTopicsFile(topicFile) if err != nil { log.Errorf("Invalid topic yaml file: %s", topicFile) From 0662376da0a2bde889f9fcb74c1928051b0a7091 Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Thu, 8 Jun 2023 20:14:41 -0700 Subject: [PATCH 08/21] topicctl new action: rebalance --- cmd/topicctl/subcmd/rebalance.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index 92d0b7df..e4fefe92 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -133,8 +133,12 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { } defer adminClient.Close() - // get all topic configs from path-prefix i.e topics folder - // recursive search is performed in the path-prefix + // get all topic configs from --path-prefix i.e topics folder + // recursive search is performed on the --path-prefix + // + // NOTE: a topic file is ignored for rebalance if + // - a file is not a valid topic yaml file + // - any topic config is not consistent with cluster config log.Infof("Getting all topic configs from path prefix %v", topicConfigDir) topicFiles, err := getAllFiles(topicConfigDir) if err != nil { @@ -145,12 +149,6 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { topicConfigs := []config.TopicConfig{} topicErrorDict := make(map[string]error) for _, topicFile := range topicFiles { - // ignore any cluster.yaml files in the --path-prefix for rebalance - if filepath.Base(topicFile) == "cluster.yaml" { - log.Warnf("Not a valid topic yaml file: %s", topicFile) - continue - } - // do not consider invalid topic yaml files for rebalance topicConfigs, err = config.LoadTopicsFile(topicFile) if err != nil { From e0e1bed141eff2de15078e5b66dc46d10baed49c Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Thu, 8 Jun 2023 20:19:38 -0700 Subject: [PATCH 09/21] topicctl new action: rebalance --- cmd/topicctl/subcmd/rebalance.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index e4fefe92..9e53f9d9 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -134,7 +134,8 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { defer adminClient.Close() // get all topic configs from --path-prefix i.e topics folder - // recursive search is performed on the --path-prefix + // we perform a recursive on the --path-prefix because there can be nested directories with + // more topics for the --cluster-config // // NOTE: a topic file is ignored for rebalance if // - a file is not a valid topic yaml file From 6f937b6540958054d57f9f373e10236a95e72c32 Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Fri, 9 Jun 2023 14:03:43 -0700 Subject: [PATCH 10/21] topicctl new action: rebalance --- cmd/topicctl/subcmd/rebalance.go | 42 +++++++++++++++++--------------- pkg/apply/apply.go | 8 +++--- pkg/util/progress.go | 12 ++++----- 3 files changed, 32 insertions(+), 30 deletions(-) diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index 9e53f9d9..41e1168f 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -21,7 +21,7 @@ import ( var rebalanceCmd = &cobra.Command{ Use: "rebalance", - Short: "rebalance all topic configs for a kafka cluster", + Short: "rebalance all topics for a given topic prefix path", PreRunE: rebalancePreRun, RunE: rebalanceRun, } @@ -81,7 +81,7 @@ func init() { &rebalanceConfig.showProgressInterval, "show-progress-interval", 0*time.Second, - "show progress during rebalance at intervals", + "Interval of time to show progress during rebalance", ) addSharedConfigOnlyFlags(rebalanceCmd, &rebalanceConfig.shared) @@ -98,11 +98,11 @@ func rebalancePreRun(cmd *cobra.Command, args []string) error { func rebalanceRun(cmd *cobra.Command, args []string) error { ctx := context.Background() - rebalanceCtxMap, err := getRebalanceCtxMap(&rebalanceConfig) + rebalanceCtxStruct, err := getRebalanceCtxStruct(&rebalanceConfig) if err != nil { return err } - ctx = context.WithValue(ctx, "progress", rebalanceCtxMap) + ctx = context.WithValue(ctx, "progress", rebalanceCtxStruct) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -182,12 +182,12 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { if err := rebalanceApplyTopic(ctx, topicConfig, clusterConfig, adminClient); err != nil { topicErrorDict[topicConfig.Meta.Name] = err rebalanceTopicProgressConfig.RebalanceError = true - log.Errorf("Ignoring topic %v for rebalance. Got error: %+v", topicConfig.Meta.Name, err) + log.Errorf("Ignoring rebalance of topic %v due to error: %+v", topicConfig.Meta.Name, err) } // show topic final progress - if rebalanceCtxMap.Enabled { - progressStr, err := util.MapToStr(rebalanceTopicProgressConfig) + if rebalanceCtxStruct.Enabled { + progressStr, err := util.StructToStr(rebalanceTopicProgressConfig) if err != nil { log.Errorf("Got error: %+v", err) } else { @@ -211,8 +211,8 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { log.Infof("Rebalance summary - success topics: %d, error topics: %d", successTopics, errorTopics) // show overall rebalance summary report - if rebalanceCtxMap.Enabled { - progressStr, err := util.MapToStr(util.RebalanceProgressConfig{ + if rebalanceCtxStruct.Enabled { + progressStr, err := util.StructToStr(util.RebalanceProgressConfig{ SuccessTopics: successTopics, ErrorTopics: errorTopics, ClusterName: clusterConfig.Meta.Name, @@ -235,13 +235,17 @@ func rebalanceTopicCheck( topicConfig config.TopicConfig, topicInfo admin.TopicInfo, ) error { - log.Infof("Check topic partitions...") + log.Debugf("Check topic partitions...") if len(topicInfo.Partitions) != topicConfig.Spec.Partitions { return fmt.Errorf("Topic partitions in kafka does not match with topic config file") } - log.Infof("Check topic retention.ms...") - if topicInfo.Config["retention.ms"] != strconv.Itoa(topicConfig.Spec.RetentionMinutes*60000) { + log.Debugf("Check topic retention.ms...") + topicInfoRetentionMs := topicInfo.Config["retention.ms"] + if topicInfoRetentionMs == "" { + topicInfoRetentionMs = strconv.Itoa(0) + } + if topicInfoRetentionMs != strconv.Itoa(topicConfig.Spec.RetentionMinutes*60000) { return fmt.Errorf("Topic retention in kafka does not match with topic config file") } @@ -304,27 +308,27 @@ func rebalanceApplyTopic( } // build ctx map for rebalance progress -func getRebalanceCtxMap(rebalanceConfig *rebalanceCmdConfig) (util.RebalanceCtxMap, error) { - rebalanceCtxMap := util.RebalanceCtxMap{ +func getRebalanceCtxStruct(rebalanceConfig *rebalanceCmdConfig) (util.RebalanceCtxStruct, error) { + rebalanceCtxStruct := util.RebalanceCtxStruct{ Enabled: true, Interval: rebalanceConfig.showProgressInterval, } zeroDur, _ := time.ParseDuration("0s") if rebalanceConfig.showProgressInterval == zeroDur { - rebalanceCtxMap.Enabled = false + rebalanceCtxStruct.Enabled = false log.Infof("--progress-interval is 0s. Not showing progress...") } else if rebalanceConfig.showProgressInterval < zeroDur { - return rebalanceCtxMap, fmt.Errorf("--show-progress-interval should be > 0s") + return rebalanceCtxStruct, fmt.Errorf("--show-progress-interval should be > 0s") } if rebalanceConfig.dryRun { - rebalanceCtxMap.Enabled = false + rebalanceCtxStruct.Enabled = false log.Infof("--dry-run enabled. Not showing progress...") - return rebalanceCtxMap, nil + return rebalanceCtxStruct, nil } - return rebalanceCtxMap, nil + return rebalanceCtxStruct, nil } // get all files for a given dir path diff --git a/pkg/apply/apply.go b/pkg/apply/apply.go index 4229deb1..6d55048a 100644 --- a/pkg/apply/apply.go +++ b/pkg/apply/apply.go @@ -879,10 +879,8 @@ func (t *TopicApplier) updatePlacementRunner( // at the moment show-progress option is available only with action: rebalance showProgress := false var stopChan chan bool - rebalanceCtxMap, ok := ctx.Value("progress").(util.RebalanceCtxMap) - if !ok { - log.Infof("No context value found") - } else if rebalanceCtxMap.Enabled { + rebalanceCtxStruct, ok := ctx.Value("progress").(util.RebalanceCtxStruct) + if ok && rebalanceCtxStruct.Enabled { stopChan = make(chan bool) showProgress = true @@ -896,7 +894,7 @@ func (t *TopicApplier) updatePlacementRunner( ClusterEnvironment: t.clusterConfig.Meta.Environment, ToRemove: t.config.BrokersToRemove, }, - rebalanceCtxMap.Interval, + rebalanceCtxStruct.Interval, stopChan, ) } diff --git a/pkg/util/progress.go b/pkg/util/progress.go index ae963b40..fc195ca7 100644 --- a/pkg/util/progress.go +++ b/pkg/util/progress.go @@ -35,8 +35,8 @@ type RebalanceRoundProgressConfig struct { TotalRounds int `json:"total_rounds"` } -// context map for rebalance -type RebalanceCtxMap struct { +// Rebalance context struct +type RebalanceCtxStruct struct { Enabled bool `json:"enabled"` Interval time.Duration `json:"interval"` } @@ -48,7 +48,7 @@ func ShowProgress( interval time.Duration, stopChan chan bool, ) { - progressStr, err := MapToStr(progressConfig) + progressStr, err := StructToStr(progressConfig) if err != nil { log.Errorf("Got error: %+v", err) } else { @@ -71,9 +71,9 @@ func ShowProgress( } } -// convert any map to json string -func MapToStr(inputMap interface{}) (string, error) { - jsonBytes, err := json.Marshal(inputMap) +// convert any struct to json string +func StructToStr(inputStruct interface{}) (string, error) { + jsonBytes, err := json.Marshal(inputStruct) if err != nil { return "{}", err } From c71ade7c9dce4fc34f5dcc5d96d6ca4092cbc7a6 Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Fri, 9 Jun 2023 14:20:09 -0700 Subject: [PATCH 11/21] topicctl new action: rebalance --- cmd/topicctl/subcmd/rebalance.go | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index 41e1168f..7ad8123d 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -134,10 +134,10 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { defer adminClient.Close() // get all topic configs from --path-prefix i.e topics folder - // we perform a recursive on the --path-prefix because there can be nested directories with + // we perform a recursive on the --path-prefix because there can be nested directories with // more topics for the --cluster-config - // - // NOTE: a topic file is ignored for rebalance if + // + // NOTE: a topic file is ignored for rebalance if // - a file is not a valid topic yaml file // - any topic config is not consistent with cluster config log.Infof("Getting all topic configs from path prefix %v", topicConfigDir) @@ -165,7 +165,7 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { } log.Infof( - "Rebalancing topic %s in config %s with cluster config %s", + "Rebalancing topic %s from config file %s with cluster config %s", topicConfig.Meta.Name, topicFile, clusterConfigPath, @@ -189,9 +189,9 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { if rebalanceCtxStruct.Enabled { progressStr, err := util.StructToStr(rebalanceTopicProgressConfig) if err != nil { - log.Errorf("Got error: %+v", err) + log.Errorf("Rebalance failed due to error: %+v", err) } else { - log.Infof("Progress: %s", progressStr) + log.Infof("Rebalance Progress: %s", progressStr) } } } @@ -237,16 +237,23 @@ func rebalanceTopicCheck( ) error { log.Debugf("Check topic partitions...") if len(topicInfo.Partitions) != topicConfig.Spec.Partitions { - return fmt.Errorf("Topic partitions in kafka does not match with topic config file") + return fmt.Errorf("Topic partitions in kafka: %d does not match with topic config: %d", + len(topicInfo.Partitions), + topicConfig.Spec.Partitions, + ) } log.Debugf("Check topic retention.ms...") topicInfoRetentionMs := topicInfo.Config["retention.ms"] + topicConfigRetentionMs := strconv.Itoa(topicConfig.Spec.RetentionMinutes * 60000) if topicInfoRetentionMs == "" { topicInfoRetentionMs = strconv.Itoa(0) } - if topicInfoRetentionMs != strconv.Itoa(topicConfig.Spec.RetentionMinutes*60000) { - return fmt.Errorf("Topic retention in kafka does not match with topic config file") + if topicInfoRetentionMs != topicConfigRetentionMs { + return fmt.Errorf("Topic retention in kafka: %s does not match with topic config: %s", + topicInfoRetentionMs, + topicConfigRetentionMs, + ) } return nil From d7b35a558508d0225b7e5d22b670dba586139614 Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Fri, 9 Jun 2023 14:34:48 -0700 Subject: [PATCH 12/21] topicctl new action: rebalance --- cmd/topicctl/subcmd/rebalance.go | 8 ++++---- pkg/util/progress.go | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index 7ad8123d..f3bd1763 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -189,7 +189,7 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { if rebalanceCtxStruct.Enabled { progressStr, err := util.StructToStr(rebalanceTopicProgressConfig) if err != nil { - log.Errorf("Rebalance failed due to error: %+v", err) + log.Errorf("progress struct to string error: %+v", err) } else { log.Infof("Rebalance Progress: %s", progressStr) } @@ -208,7 +208,7 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { successTopics += 1 } } - log.Infof("Rebalance summary - success topics: %d, error topics: %d", successTopics, errorTopics) + log.Infof("Rebalance complete! topics: %d topics rebalanced successfully, %d topics had errors", successTopics, errorTopics) // show overall rebalance summary report if rebalanceCtxStruct.Enabled { @@ -220,9 +220,9 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { ToRemove: rebalanceConfig.brokersToRemove, }) if err != nil { - log.Errorf("Got error: %+v", err) + log.Errorf("progress struct to string error: %+v", err) } else { - log.Infof("Progress: %s", progressStr) + log.Infof("Rebalance Progress: %s", progressStr) } } diff --git a/pkg/util/progress.go b/pkg/util/progress.go index fc195ca7..7af5d2b7 100644 --- a/pkg/util/progress.go +++ b/pkg/util/progress.go @@ -50,10 +50,10 @@ func ShowProgress( ) { progressStr, err := StructToStr(progressConfig) if err != nil { - log.Errorf("Got error: %+v", err) + log.Errorf("progress struct to string error: %+v", err) } else { // print first before ticker starts - log.Infof("Progress: %s", progressStr) + log.Infof("Rebalance Progress: %s", progressStr) } ticker := time.NewTicker(interval) @@ -63,7 +63,7 @@ func ShowProgress( select { case <-ticker.C: if err == nil { - log.Infof("Progress: %s", progressStr) + log.Infof("Rebalance Progress: %s", progressStr) } case <-stopChan: return From 2a3cb3e9ac210e2147876f05da6719c16bec0404 Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Fri, 9 Jun 2023 14:50:12 -0700 Subject: [PATCH 13/21] topicctl new action: rebalance --- cmd/topicctl/subcmd/rebalance.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index f3bd1763..23015ebc 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -208,7 +208,7 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { successTopics += 1 } } - log.Infof("Rebalance complete! topics: %d topics rebalanced successfully, %d topics had errors", successTopics, errorTopics) + log.Infof("Rebalance complete! %d topics rebalanced successfully, %d topics had errors", successTopics, errorTopics) // show overall rebalance summary report if rebalanceCtxStruct.Enabled { From fc00a4dced65fc03a65d14ad2f8c3878ca1dddb8 Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Fri, 9 Jun 2023 14:57:02 -0700 Subject: [PATCH 14/21] topicctl new action: rebalance --- pkg/util/progress.go | 11 ----------- pkg/util/strings.go | 15 ++++++++++++++- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/pkg/util/progress.go b/pkg/util/progress.go index 7af5d2b7..ee559fc2 100644 --- a/pkg/util/progress.go +++ b/pkg/util/progress.go @@ -2,7 +2,6 @@ package util import ( "context" - "encoding/json" log "github.com/sirupsen/logrus" "time" ) @@ -70,13 +69,3 @@ func ShowProgress( } } } - -// convert any struct to json string -func StructToStr(inputStruct interface{}) (string, error) { - jsonBytes, err := json.Marshal(inputStruct) - if err != nil { - return "{}", err - } - - return string(jsonBytes), nil -} diff --git a/pkg/util/strings.go b/pkg/util/strings.go index 36aca352..00a323df 100644 --- a/pkg/util/strings.go +++ b/pkg/util/strings.go @@ -1,6 +1,9 @@ package util -import "fmt" +import ( + "encoding/json" + "fmt" +) // TruncateStringSuffix truncates a string by replacing the trailing characters with // "..." if needed. @@ -26,3 +29,13 @@ func TruncateStringMiddle(input string, maxLen int, suffixLen int) (string, int) numOmitted := len(input) - len(prefix) - len(suffix) return fmt.Sprintf("%s...%s", prefix, suffix), numOmitted } + +// Convert any struct to json string +func StructToStr(inputStruct interface{}) (string, error) { + jsonBytes, err := json.Marshal(inputStruct) + if err != nil { + return "{}", err + } + + return string(jsonBytes), nil +} From e1b59d83a83f1eb819f69da0948f1703967ad305 Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Fri, 9 Jun 2023 16:44:24 -0700 Subject: [PATCH 15/21] topicctl new action: rebalance --- cmd/topicctl/subcmd/rebalance.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index 23015ebc..a1cf0b9f 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -203,12 +203,12 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { for thisTopicName, thisTopicError := range topicErrorDict { if thisTopicError != nil { errorTopics += 1 - log.Errorf("topic: %s failed with error: %v", thisTopicName, thisTopicError) + log.Errorf("topic: %s rebalance failed with error: %v", thisTopicName, thisTopicError) } else { + log.Infof("topic: %s rebalance is successful", thisTopicName) successTopics += 1 } } - log.Infof("Rebalance complete! %d topics rebalanced successfully, %d topics had errors", successTopics, errorTopics) // show overall rebalance summary report if rebalanceCtxStruct.Enabled { @@ -226,6 +226,7 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { } } + log.Infof("Rebalance complete! %d topics rebalanced successfully, %d topics had errors", successTopics, errorTopics) return nil } From 842c06ddd56db0c51391a608f30ab1c1c749772f Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Mon, 12 Jun 2023 10:19:39 -0700 Subject: [PATCH 16/21] topicctl new action: rebalance --- cmd/topicctl/subcmd/rebalance.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index a1cf0b9f..7da0096f 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -182,7 +182,7 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { if err := rebalanceApplyTopic(ctx, topicConfig, clusterConfig, adminClient); err != nil { topicErrorDict[topicConfig.Meta.Name] = err rebalanceTopicProgressConfig.RebalanceError = true - log.Errorf("Ignoring rebalance of topic %v due to error: %+v", topicConfig.Meta.Name, err) + log.Errorf("topic: %s rebalance failed with error: %v", topicConfig.Meta.Name, err) } // show topic final progress From 3889be031782c9a101808c020dbabbd779c8e74b Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Tue, 11 Jul 2023 11:32:17 -0700 Subject: [PATCH 17/21] README updated with subcomand rebalance information --- README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index 90ee83d9..048224a2 100644 --- a/README.md +++ b/README.md @@ -175,6 +175,16 @@ resource type in the cluster. Currently, the following operations are supported: | `get offsets [topic]` | Number of messages per partition along with start and end times | | `get topics` | All topics in the cluster | +#### rebalance + +``` +topicctl rebalance [flags] +``` + +The `rebalance` subcommand will do a full broker rebalance for all the topic configs in a given topic prefix path. + +See the [rebalancing](#rebalancing) section below for more information on the full broker rebalance. + #### repl ``` From d0054516813daaf6d4c560738e952517f0d73d02 Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Tue, 11 Jul 2023 11:47:20 -0700 Subject: [PATCH 18/21] Updated topicctl version --- pkg/version/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/version/version.go b/pkg/version/version.go index c080775a..97ac83b0 100644 --- a/pkg/version/version.go +++ b/pkg/version/version.go @@ -1,4 +1,4 @@ package version // Version is the current topicctl version. -const Version = "1.9.1" +const Version = "1.10.1" From 221a56da1f8c02f8ca95ad0204284f1f19b481f8 Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Tue, 11 Jul 2023 11:56:05 -0700 Subject: [PATCH 19/21] README updated with subcomand rebalance information --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 048224a2..510b5f17 100644 --- a/README.md +++ b/README.md @@ -181,9 +181,9 @@ resource type in the cluster. Currently, the following operations are supported: topicctl rebalance [flags] ``` -The `rebalance` subcommand will do a full broker rebalance for all the topic configs in a given topic prefix path. +The `rebalance` subcommand will do a full cluster rebalance for all the topic configs in a given topic prefix path. -See the [rebalancing](#rebalancing) section below for more information on the full broker rebalance. +See the [rebalancing](#rebalancing) section below for more information on the full cluster rebalance. #### repl From 10d181b2e49d46057c5bb54c188450077ee7ebd0 Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Tue, 11 Jul 2023 12:18:52 -0700 Subject: [PATCH 20/21] README updated with subcomand rebalance information --- README.md | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 510b5f17..7c5c9d6f 100644 --- a/README.md +++ b/README.md @@ -181,7 +181,16 @@ resource type in the cluster. Currently, the following operations are supported: topicctl rebalance [flags] ``` -The `rebalance` subcommand will do a full cluster rebalance for all the topic configs in a given topic prefix path. +`apply` subcommand with flag `--rebalance` works with topic(s) to do a full cluster rebalance. However, there is a need to rebalance all topics for a given cluster. + +The `rebalance` subcommand will do a full cluster rebalance for all the topic configs in a given topic prefix path. `rebalance` subcommand will determine a cluster with flag `--cluster-config` and topic configs with flag `--path-prefix`. This subcommand will actually perform `apply --rebalance` or `apply --rebalance --to-remove` on all the topic configs for a cluster. + +This subcommand will not perform a full cluster rebalance on a topic if: + +1. topic config is inconsistent with cluster config (name, region, environment etc...) +2. partitions of a topic in kafka cluster does not match with topic partition setting in topic config +3. retention.ms of a topic in kafka cluster does not match with topic retentionMinutes setting in topic config +4. topic does not exist in kafka cluster See the [rebalancing](#rebalancing) section below for more information on the full cluster rebalance. From 019ef1103e4f7897dd65d06355a64f4b73ed7474 Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Tue, 11 Jul 2023 13:37:59 -0700 Subject: [PATCH 21/21] README updated with subcommand rebalance information --- README.md | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 7c5c9d6f..9f75c88f 100644 --- a/README.md +++ b/README.md @@ -181,18 +181,11 @@ resource type in the cluster. Currently, the following operations are supported: topicctl rebalance [flags] ``` -`apply` subcommand with flag `--rebalance` works with topic(s) to do a full cluster rebalance. However, there is a need to rebalance all topics for a given cluster. +The `apply` subcommand can be used with flag `--rebalance` rebalances a specified topics across a cluster. -The `rebalance` subcommand will do a full cluster rebalance for all the topic configs in a given topic prefix path. `rebalance` subcommand will determine a cluster with flag `--cluster-config` and topic configs with flag `--path-prefix`. This subcommand will actually perform `apply --rebalance` or `apply --rebalance --to-remove` on all the topic configs for a cluster. +The `rebalance` subcommand, on the other hand, performs a rebalance for **all** the topics defined at a given topic prefix path. -This subcommand will not perform a full cluster rebalance on a topic if: - -1. topic config is inconsistent with cluster config (name, region, environment etc...) -2. partitions of a topic in kafka cluster does not match with topic partition setting in topic config -3. retention.ms of a topic in kafka cluster does not match with topic retentionMinutes setting in topic config -4. topic does not exist in kafka cluster - -See the [rebalancing](#rebalancing) section below for more information on the full cluster rebalance. +See the [rebalancing](#rebalancing) section below for more information on rebalancing. #### repl @@ -391,22 +384,33 @@ Note that these all try to achieve in-topic balance, and only vary in the case o Thus, the placements won't be significantly different in most cases. In the future, we may add pickers that allow for some in-topic imbalance, e.g. to correct a -cluster-wide broker inbalance. +cluster-wide broker imbalance. #### Rebalancing -If `apply` is run with the `--rebalance` flag set, then `topicctl` will do a full broker rebalance +If `apply` is run with the `--rebalance` flag, then `topicctl` will rebalance specified topics after the usual apply steps. This process will check the balance of the brokers for each index -position (i.e., first, second, third, etc.) in each partition and make replacements if there +position (i.e., first, second, third, etc.) for each partition and make replacements if there are any brokers that are significantly over- or under-represented. -The rebalance process can optionally remove brokers from a topic too. To use this feature, set the +The rebalance process can optionally remove brokers from a topic. To use this feature, set the `--to-remove` flag. Note that this flag has no effect unless `--rebalance` is also set. Rebalancing is not done by default on all apply runs because it can be fairly disruptive and -generally shouldn't be necessary unless the topic started off in an inbalanced state or there +generally shouldn't be necessary unless the topic started off in an imbalanced state or there has been a change in the number of brokers. +To rebalance **all** topics in a cluster, use the `rebalance` subcommand, which will perform the `apply --rebalance` +function on all qualifying topics. It will inventory all topic configs found at `--path-prefix` for a cluster +specified by `--cluster-config`. + +This subcommand will not rebalance a topic if: + +1. the topic config is inconsistent with the cluster config (name, region, environment etc...) +1. the partition count of a topic in the kafka cluster does not match the topic partition setting in the topic config +1. a topic's `retention.ms` in the kafka cluster does not match the topic's `retentionMinutes` setting in the topic config +1. a topic does not exist in the kafka cluster + ## Tool safety The `bootstrap`, `get`, `repl`, and `tail` subcommands are read-only and should never make