From f3a15502423f781a3e7c97163929fab1ae514496 Mon Sep 17 00:00:00 2001 From: Benjamin Yolken Date: Fri, 4 Sep 2020 16:12:52 -0700 Subject: [PATCH 1/5] Support full printouts of member ids in get lags --- cmd/topicctl/subcmd/get.go | 2 +- pkg/cli/cli.go | 9 +++++++-- pkg/cli/repl.go | 2 +- pkg/groups/format.go | 10 +++++++--- 4 files changed, 16 insertions(+), 7 deletions(-) diff --git a/cmd/topicctl/subcmd/get.go b/cmd/topicctl/subcmd/get.go index 1d07e8ad..9bd158ea 100644 --- a/cmd/topicctl/subcmd/get.go +++ b/cmd/topicctl/subcmd/get.go @@ -153,7 +153,7 @@ func getRun(cmd *cobra.Command, args []string) error { return fmt.Errorf("Must provide topic and groupID as additional positional arguments") } - return cliRunner.GetMemberLags(ctx, args[1], args[2]) + return cliRunner.GetMemberLags(ctx, args[1], args[2], getConfig.full) case "members": if len(args) != 2 { return fmt.Errorf("Must provide group ID as second positional argument") diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 6cd272f6..e3b0e6d9 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -352,7 +352,12 @@ func (c *CLIRunner) GetGroupMembers(ctx context.Context, groupID string, full bo // GetMemberLags fetches and prints a summary of the consumer group lag for each partition // in a single topic. -func (c *CLIRunner) GetMemberLags(ctx context.Context, topic string, groupID string) error { +func (c *CLIRunner) GetMemberLags( + ctx context.Context, + topic string, + groupID string, + full bool, +) error { c.startSpinner() // Check that topic exists before getting offsets; otherwise, the topic get @@ -370,7 +375,7 @@ func (c *CLIRunner) GetMemberLags(ctx context.Context, topic string, groupID str return err } - c.printer("Group member lags:\n%s", groups.FormatMemberLags(memberLags)) + c.printer("Group member lags:\n%s", groups.FormatMemberLags(memberLags, full)) return nil } diff --git a/pkg/cli/repl.go b/pkg/cli/repl.go index ce4f4ac8..1c4a5a3f 100644 --- a/pkg/cli/repl.go +++ b/pkg/cli/repl.go @@ -262,7 +262,7 @@ func (r *Repl) executor(in string) { log.Errorf("Error: %+v", err) return } - if err := r.cliRunner.GetMemberLags(ctx, words[2], words[3]); err != nil { + if err := r.cliRunner.GetMemberLags(ctx, words[2], words[3], false); err != nil { log.Errorf("Error: %+v", err) return } diff --git a/pkg/groups/format.go b/pkg/groups/format.go index 80571ad3..dfaf3a1c 100644 --- a/pkg/groups/format.go +++ b/pkg/groups/format.go @@ -182,7 +182,7 @@ func FormatMemberPartitionCounts(members []MemberInfo) string { } // FormatMemberLags generates a pretty table from the results of GetMemberLags. -func FormatMemberLags(memberLags []MemberPartitionLag) string { +func FormatMemberLags(memberLags []MemberPartitionLag, full bool) string { buf := &bytes.Buffer{} table := tablewriter.NewWriter(buf) @@ -198,7 +198,7 @@ func FormatMemberLags(memberLags []MemberPartitionLag) string { "Time Lag", }, ) - table.SetAutoWrapText(false) + table.SetAutoWrapText(true) table.SetColumnAlignment( []int{ tablewriter.ALIGN_LEFT, @@ -224,7 +224,11 @@ func FormatMemberLags(memberLags []MemberPartitionLag) string { var memberID string if memberLag.MemberID != "" { - memberID, _ = util.TruncateStringMiddle(memberLag.MemberID, 30, 5) + if full { + memberID = memberLag.MemberID + } else { + memberID, _ = util.TruncateStringMiddle(memberLag.MemberID, 30, 5) + } } var memberIDPrinter func(f string, a ...interface{}) string From 542aa4e4060f78d5461bc8dde5f9ed333597fa30 Mon Sep 17 00:00:00 2001 From: Benjamin Yolken Date: Fri, 4 Sep 2020 17:37:43 -0700 Subject: [PATCH 2/5] Support gradual retention drops --- cmd/topicctl/subcmd/apply.go | 26 ++- .../local-cluster/topics/topic-default.yaml | 2 +- pkg/apply/apply.go | 34 +++- pkg/apply/apply_test.go | 20 +- pkg/check/check_test.go | 10 +- pkg/config/settings.go | 82 ++++++++ pkg/config/settings_test.go | 189 ++++++++++++++++++ 7 files changed, 334 insertions(+), 29 deletions(-) diff --git a/cmd/topicctl/subcmd/apply.go b/cmd/topicctl/subcmd/apply.go index 4a0c2f00..ba970e1b 100644 --- a/cmd/topicctl/subcmd/apply.go +++ b/cmd/topicctl/subcmd/apply.go @@ -32,8 +32,9 @@ type applyCmdConfig struct { partitionBatchSizeOverride int pathPrefix string rebalance bool + retentionDropStepDuration time.Duration skipConfirm bool - sleepLoopTime time.Duration + sleepLoopDuration time.Duration } var applyConfig applyCmdConfig @@ -69,17 +70,23 @@ func init() { 0, "Partition batch size override", ) + applyCmd.Flags().StringVar( + &applyConfig.pathPrefix, + "path-prefix", + os.Getenv("TOPICCTL_APPLY_PATH_PREFIX"), + "Prefix for topic config paths", + ) applyCmd.Flags().BoolVar( &applyConfig.rebalance, "rebalance", false, "Explicitly rebalance broker partition assignments", ) - applyCmd.Flags().StringVar( - &applyConfig.pathPrefix, - "path-prefix", - os.Getenv("TOPICCTL_APPLY_PATH_PREFIX"), - "Prefix for topic config paths", + applyCmd.Flags().DurationVar( + &applyConfig.retentionDropStepDuration, + "retention-drop-step-duration", + 10*time.Hour, + "Amount of time to use for retention drop steps; set to 0 to remove limit", ) applyCmd.Flags().BoolVar( &applyConfig.skipConfirm, @@ -88,8 +95,8 @@ func init() { "Skip confirmation prompts during apply process", ) applyCmd.Flags().DurationVar( - &applyConfig.sleepLoopTime, - "sleep-loop-time", + &applyConfig.sleepLoopDuration, + "sleep-loop-duration", 10*time.Second, "Amount of time to wait between partition checks", ) @@ -189,8 +196,9 @@ func applyTopic( DryRun: applyConfig.dryRun, PartitionBatchSizeOverride: applyConfig.partitionBatchSizeOverride, Rebalance: applyConfig.rebalance, + RetentionDropStepDuration: applyConfig.retentionDropStepDuration, SkipConfirm: applyConfig.skipConfirm, - SleepLoopTime: applyConfig.sleepLoopTime, + SleepLoopDuration: applyConfig.sleepLoopDuration, TopicConfig: topicConfig, } diff --git a/examples/local-cluster/topics/topic-default.yaml b/examples/local-cluster/topics/topic-default.yaml index 97b5b223..963a29a8 100644 --- a/examples/local-cluster/topics/topic-default.yaml +++ b/examples/local-cluster/topics/topic-default.yaml @@ -9,7 +9,7 @@ meta: spec: partitions: 3 replicationFactor: 2 - retentionMinutes: 100 + retentionMinutes: 50 placement: strategy: any settings: diff --git a/pkg/apply/apply.go b/pkg/apply/apply.go index 1bd0dd14..76ddf3a5 100644 --- a/pkg/apply/apply.go +++ b/pkg/apply/apply.go @@ -6,6 +6,7 @@ import ( "fmt" "path/filepath" "reflect" + "strings" "time" "github.com/hashicorp/go-multierror" @@ -29,8 +30,9 @@ type TopicApplierConfig struct { DryRun bool PartitionBatchSizeOverride int Rebalance bool + RetentionDropStepDuration time.Duration SkipConfirm bool - SleepLoopTime time.Duration + SleepLoopDuration time.Duration TopicConfig config.TopicConfig } @@ -168,7 +170,7 @@ func (t *TopicApplier) applyNewTopic(ctx context.Context) error { } // Just do a short sleep to ensure that zk is updated before we check - if err := interruptableSleep(ctx, t.config.SleepLoopTime/5); err != nil { + if err := interruptableSleep(ctx, t.config.SleepLoopDuration/5); err != nil { return err } @@ -354,6 +356,14 @@ func (t *TopicApplier) updateSettings( return err } + reduced, err := topicSettings.ReduceRetentionDrop( + topicInfo.Config, + t.config.RetentionDropStepDuration, + ) + if err != nil { + return err + } + if len(diffKeys) > 0 { diffsTable, err := FormatSettingsDiff(topicSettings, topicInfo.Config, diffKeys) if err != nil { @@ -366,6 +376,18 @@ func (t *TopicApplier) updateSettings( diffsTable, ) + if reduced { + log.Infof( + strings.Join( + []string{ + "Note: Retention drop has been reduced to minimize cluster disruption.", + "Re-run apply afterwards to keep dropping retention to configured value or run with --retention-drop-step-duration=0 to not do gradual step-down.", + }, + " ", + ), + ) + } + if t.config.DryRun { log.Infof("Skipping update because dryRun is set to true") return nil @@ -887,7 +909,7 @@ func (t *TopicApplier) updatePartitionsIteration( return err } - checkTimer := time.NewTicker(t.config.SleepLoopTime) + checkTimer := time.NewTicker(t.config.SleepLoopDuration) defer checkTimer.Stop() log.Info("Sleeping then entering check loop") @@ -945,7 +967,7 @@ outerLoop: len(assignmentsToUpdate), admin.FormatTopicPartitions(notReady, t.brokers), ) - log.Infof("Sleeping for %s", t.config.SleepLoopTime.String()) + log.Infof("Sleeping for %s", t.config.SleepLoopDuration.String()) case <-ctx.Done(): return ctx.Err() } @@ -1181,7 +1203,7 @@ func (t *TopicApplier) updateLeadersIteration( return err } - checkTimer := time.NewTicker(t.config.SleepLoopTime) + checkTimer := time.NewTicker(t.config.SleepLoopDuration) defer checkTimer.Stop() log.Info("Sleeping then entering check loop") @@ -1212,7 +1234,7 @@ outerLoop: admin.FormatTopicPartitions(wrongLeaders, t.brokers), ) - log.Infof("Sleeping for %s", t.config.SleepLoopTime.String()) + log.Infof("Sleeping for %s", t.config.SleepLoopDuration.String()) case <-ctx.Done(): return ctx.Err() } diff --git a/pkg/apply/apply_test.go b/pkg/apply/apply_test.go index d9b51b2b..538cb83b 100644 --- a/pkg/apply/apply_test.go +++ b/pkg/apply/apply_test.go @@ -45,6 +45,8 @@ func TestApplyBasicUpdates(t *testing.T) { } applier := testApplier(ctx, t, topicConfig) + applier.config.RetentionDropStepDuration = 50 * time.Minute + assert.Equal(t, 3, applier.maxBatchSize) assert.Equal(t, int64(2000000), applier.throttleBytes) @@ -62,13 +64,15 @@ func TestApplyBasicUpdates(t *testing.T) { assert.Equal(t, "compact", topicInfo.Config["cleanup.policy"]) // Update retention and settings - applier.topicConfig.Spec.RetentionMinutes = 501 + applier.topicConfig.Spec.RetentionMinutes = 400 applier.topicConfig.Spec.Settings["cleanup.policy"] = "delete" err = applier.Apply(ctx) require.Nil(t, err) topicInfo, err = applier.adminClient.GetTopic(ctx, topicName, true) require.Nil(t, err) - assert.Equal(t, "30060000", topicInfo.Config[admin.RetentionKey]) + + // Dropped to only 450 because of retention reduction + assert.Equal(t, "27000000", topicInfo.Config[admin.RetentionKey]) assert.Equal(t, "delete", topicInfo.Config["cleanup.policy"]) // Updating replication factor not allowed @@ -868,7 +872,7 @@ func TestApplyOverrides(t *testing.T) { TopicConfig: topicConfig, DryRun: false, SkipConfirm: true, - SleepLoopTime: 500 * time.Millisecond, + SleepLoopDuration: 500 * time.Millisecond, PartitionBatchSizeOverride: 8, }, ) @@ -906,11 +910,11 @@ func testApplier( ctx, adminClient, TopicApplierConfig{ - ClusterConfig: clusterConfig, - TopicConfig: topicConfig, - DryRun: false, - SkipConfirm: true, - SleepLoopTime: 500 * time.Millisecond, + ClusterConfig: clusterConfig, + TopicConfig: topicConfig, + DryRun: false, + SkipConfirm: true, + SleepLoopDuration: 500 * time.Millisecond, }, ) require.Nil(t, err) diff --git a/pkg/check/check_test.go b/pkg/check/check_test.go index 98d827ab..89f15661 100644 --- a/pkg/check/check_test.go +++ b/pkg/check/check_test.go @@ -58,11 +58,11 @@ func TestCheck(t *testing.T) { ctx, adminClient, apply.TopicApplierConfig{ - ClusterConfig: clusterConfig, - TopicConfig: topicConfig, - DryRun: false, - SkipConfirm: true, - SleepLoopTime: 500 * time.Millisecond, + ClusterConfig: clusterConfig, + TopicConfig: topicConfig, + DryRun: false, + SkipConfirm: true, + SleepLoopDuration: 500 * time.Millisecond, }, ) require.Nil(t, err) diff --git a/pkg/config/settings.go b/pkg/config/settings.go index 9df13cf2..1f7adbb5 100644 --- a/pkg/config/settings.go +++ b/pkg/config/settings.go @@ -5,9 +5,12 @@ import ( "reflect" "strconv" "strings" + "time" "github.com/hashicorp/go-multierror" "github.com/segmentio/kafka-go" + "github.com/segmentio/topicctl/pkg/admin" + log "github.com/sirupsen/logrus" ) type configValidator func(v string) bool @@ -370,6 +373,60 @@ func (t TopicSettings) ConfigMapDiffs( return diffKeys, missingKeys, nil } +func (t TopicSettings) ReduceRetentionDrop( + configMap map[string]string, + retentionDropStepDuration time.Duration, +) (bool, error) { + if retentionDropStepDuration <= 0 { + return false, nil + } + + currRetentionMsStr, ok := configMap[admin.RetentionKey] + if !ok || currRetentionMsStr == "" { + // No retention currently set + return false, nil + } + currRetentionMs, err := strconv.ParseInt(currRetentionMsStr, 10, 64) + if err != nil { + // Parse error + return false, err + } + + var setRetentionMsIface interface{} + + for key, value := range t { + if key == admin.RetentionKey { + setRetentionMsIface = value + break + } + } + + if setRetentionMsIface == nil { + // Retention not configured in topic settings + return false, nil + } + setRetentionMs, err := interfaceToInt64(setRetentionMsIface) + if err != nil { + // Parse error + return false, err + } + + maxDropMs := retentionDropStepDuration.Milliseconds() + + if currRetentionMs-setRetentionMs > maxDropMs { + // Reduce drop + log.Debugf( + "Updating retention from %d to %d ms", + setRetentionMs, + currRetentionMs-maxDropMs, + ) + t[admin.RetentionKey] = currRetentionMs - maxDropMs + return true, nil + } + + return false, nil +} + // Copy returns a shallow copy of this settings instance. func (t TopicSettings) Copy() TopicSettings { copy := TopicSettings{} @@ -436,6 +493,31 @@ func interfaceToString(v interface{}) (string, error) { return "", fmt.Errorf("Invalid setting value: %+v (%s)", v, reflect.TypeOf(v)) } +func interfaceToInt64(v interface{}) (int64, error) { + if v == nil { + return 0, nil + } + + switch t := v.(type) { + case float32: + return int64(t), nil + case float64: + return int64(t), nil + case int: + return int64(t), nil + case int64: + return t, nil + case string: + return strconv.ParseInt(t, 10, 64) + default: + return 0, fmt.Errorf( + "Could not convert value %+v (type %+v) to int64", + v, + reflect.TypeOf(v), + ) + } +} + func inValues(v string, values ...string) bool { valuesMap := map[string]struct{}{} for _, value := range values { diff --git a/pkg/config/settings_test.go b/pkg/config/settings_test.go index c9b30216..b054d3b7 100644 --- a/pkg/config/settings_test.go +++ b/pkg/config/settings_test.go @@ -1,7 +1,9 @@ package config import ( + "fmt" "testing" + "time" "github.com/segmentio/kafka-go" "github.com/stretchr/testify/assert" @@ -212,3 +214,190 @@ func TestConfigMapDiffs(t *testing.T) { missingKeys, ) } + +func TestReduceRetentionDrop(t *testing.T) { + type testCase struct { + description string + settings TopicSettings + configMap map[string]string + dropDuration time.Duration + errExpected bool + expectedReduce bool + expectedNewRetentionMs string + } + + testCases := []testCase{ + { + description: "no change", + settings: TopicSettings{ + "other.key": "other.value", + "retention.ms": "5000", + "another.key": "another.value", + }, + configMap: map[string]string{ + "retention.ms": "5000", + "other.key": "other.value", + }, + dropDuration: 100 * time.Millisecond, + errExpected: false, + expectedReduce: false, + expectedNewRetentionMs: "5000", + }, + { + description: "increase", + settings: TopicSettings{ + "other.key": "other.value", + "retention.ms": "500000", + "another.key": "another.value", + }, + configMap: map[string]string{ + "retention.ms": "5000", + "other.key": "other.value", + }, + dropDuration: 100 * time.Millisecond, + errExpected: false, + expectedReduce: false, + expectedNewRetentionMs: "500000", + }, + { + description: "small decrease", + settings: TopicSettings{ + "other.key": "other.value", + "retention.ms": "5000", + "another.key": "another.value", + }, + configMap: map[string]string{ + "retention.ms": "5010", + "other.key": "other.value", + }, + dropDuration: 100 * time.Millisecond, + errExpected: false, + expectedReduce: false, + expectedNewRetentionMs: "5000", + }, + { + description: "medium decrease", + settings: TopicSettings{ + "other.key": "other.value", + "retention.ms": "5000", + "another.key": "another.value", + }, + configMap: map[string]string{ + "retention.ms": "5100", + "other.key": "other.value", + }, + dropDuration: 100 * time.Millisecond, + errExpected: false, + expectedReduce: false, + expectedNewRetentionMs: "5000", + }, + { + description: "big decrease", + settings: TopicSettings{ + "other.key": "other.value", + "retention.ms": "5000", + "another.key": "another.value", + }, + configMap: map[string]string{ + "retention.ms": "6000", + "other.key": "other.value", + }, + dropDuration: 100 * time.Millisecond, + errExpected: false, + expectedReduce: true, + expectedNewRetentionMs: "5900", + }, + { + description: "no existing retention", + settings: TopicSettings{ + "other.key": "other.value", + "retention.ms": "5000", + "another.key": "another.value", + }, + configMap: map[string]string{ + "other.key": "other.value", + }, + dropDuration: 100 * time.Millisecond, + errExpected: false, + expectedReduce: false, + expectedNewRetentionMs: "5000", + }, + { + description: "no new retention", + settings: TopicSettings{ + "other.key": "other.value", + "another.key": "another.value", + }, + configMap: map[string]string{ + "other.key": "other.value", + "retention.ms": "5000", + }, + dropDuration: 100 * time.Millisecond, + errExpected: false, + expectedReduce: false, + expectedNewRetentionMs: "", + }, + { + description: "0 step size", + settings: TopicSettings{ + "other.key": "other.value", + "retention.ms": 3000, + "another.key": "another.value", + }, + configMap: map[string]string{ + "other.key": "other.value", + "retention.ms": "5000", + }, + dropDuration: 0, + errExpected: false, + expectedReduce: false, + expectedNewRetentionMs: "3000", + }, + { + description: "bad formatting settings", + settings: TopicSettings{ + "other.key": "other.value", + "retention.ms": "xxxx", + "another.key": "another.value", + }, + configMap: map[string]string{ + "other.key": "other.value", + "retention.ms": "5000", + }, + dropDuration: 100 * time.Millisecond, + errExpected: true, + }, + { + description: "bad formatting config", + settings: TopicSettings{ + "other.key": "other.value", + "retention.ms": "5000", + "another.key": "another.value", + }, + configMap: map[string]string{ + "other.key": "other.value", + "retention.ms": "xxxx", + }, + dropDuration: 100 * time.Millisecond, + errExpected: true, + }, + } + + for _, testCase := range testCases { + reduce, err := testCase.settings.ReduceRetentionDrop( + testCase.configMap, + testCase.dropDuration, + ) + if testCase.errExpected { + assert.Error(t, err, testCase.description) + } else { + assert.NoError(t, err, testCase.description) + assert.Equal(t, testCase.expectedReduce, reduce, testCase.description) + assert.Equal( + t, + testCase.expectedNewRetentionMs, + fmt.Sprintf("%v", testCase.settings["retention.ms"]), + ) + } + } +} From 461d729b193c63a9f25ecff057f4ada166cae049 Mon Sep 17 00:00:00 2001 From: Benjamin Yolken Date: Fri, 4 Sep 2020 17:39:17 -0700 Subject: [PATCH 3/5] Revert change to topic-default --- examples/local-cluster/topics/topic-default.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/local-cluster/topics/topic-default.yaml b/examples/local-cluster/topics/topic-default.yaml index 963a29a8..97b5b223 100644 --- a/examples/local-cluster/topics/topic-default.yaml +++ b/examples/local-cluster/topics/topic-default.yaml @@ -9,7 +9,7 @@ meta: spec: partitions: 3 replicationFactor: 2 - retentionMinutes: 50 + retentionMinutes: 100 placement: strategy: any settings: From f615dd343dd63b06773263066703d73d0171a3aa Mon Sep 17 00:00:00 2001 From: Benjamin Yolken Date: Fri, 4 Sep 2020 18:03:43 -0700 Subject: [PATCH 4/5] Allow setting reduction reduction drop duration via environment var --- cmd/topicctl/subcmd/apply.go | 54 ++++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 18 deletions(-) diff --git a/cmd/topicctl/subcmd/apply.go b/cmd/topicctl/subcmd/apply.go index ba970e1b..832f5a22 100644 --- a/cmd/topicctl/subcmd/apply.go +++ b/cmd/topicctl/subcmd/apply.go @@ -18,23 +18,26 @@ import ( ) var applyCmd = &cobra.Command{ - Use: "apply [topic configs]", - Short: "apply one or more topic configs", - Args: cobra.MinimumNArgs(1), - RunE: applyRun, + Use: "apply [topic configs]", + Short: "apply one or more topic configs", + Args: cobra.MinimumNArgs(1), + PreRunE: applyPreRun, + RunE: applyRun, } type applyCmdConfig struct { - brokersToRemove []int - brokerThrottleMBsOverride int - clusterConfig string - dryRun bool - partitionBatchSizeOverride int - pathPrefix string - rebalance bool - retentionDropStepDuration time.Duration - skipConfirm bool - sleepLoopDuration time.Duration + brokersToRemove []int + brokerThrottleMBsOverride int + clusterConfig string + dryRun bool + partitionBatchSizeOverride int + pathPrefix string + rebalance bool + retentionDropStepDurationStr string + skipConfirm bool + sleepLoopDuration time.Duration + + retentionDropStepDuration time.Duration } var applyConfig applyCmdConfig @@ -82,11 +85,11 @@ func init() { false, "Explicitly rebalance broker partition assignments", ) - applyCmd.Flags().DurationVar( - &applyConfig.retentionDropStepDuration, + applyCmd.Flags().StringVar( + &applyConfig.retentionDropStepDurationStr, "retention-drop-step-duration", - 10*time.Hour, - "Amount of time to use for retention drop steps; set to 0 to remove limit", + os.Getenv("TOPICCTL_APPLY_RETENTION_DROP_STEP_DURATION"), + "Amount of time to use for retention drop steps", ) applyCmd.Flags().BoolVar( &applyConfig.skipConfirm, @@ -104,6 +107,21 @@ func init() { RootCmd.AddCommand(applyCmd) } +func applyPreRun(cmd *cobra.Command, args []string) error { + if applyConfig.retentionDropStepDurationStr != "" { + var err error + applyConfig.retentionDropStepDuration, err = time.ParseDuration( + applyConfig.retentionDropStepDurationStr, + ) + + if err != nil { + return err + } + } + + return nil +} + func applyRun(cmd *cobra.Command, args []string) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() From b31ee9572e427c2535d56e1ceb1eddda36cab8fe Mon Sep 17 00:00:00 2001 From: Benjamin Yolken Date: Tue, 8 Sep 2020 12:24:53 -0700 Subject: [PATCH 5/5] Allow setting default step drop duration in cluster config --- cmd/topicctl/subcmd/apply.go | 2 +- pkg/apply/apply.go | 13 ++++++++++++- pkg/config/cluster.go | 22 ++++++++++++++++++++++ pkg/config/cluster_test.go | 25 ++++++++++++++++++++++--- 4 files changed, 57 insertions(+), 5 deletions(-) diff --git a/cmd/topicctl/subcmd/apply.go b/cmd/topicctl/subcmd/apply.go index 832f5a22..0d572a81 100644 --- a/cmd/topicctl/subcmd/apply.go +++ b/cmd/topicctl/subcmd/apply.go @@ -88,7 +88,7 @@ func init() { applyCmd.Flags().StringVar( &applyConfig.retentionDropStepDurationStr, "retention-drop-step-duration", - os.Getenv("TOPICCTL_APPLY_RETENTION_DROP_STEP_DURATION"), + "", "Amount of time to use for retention drop steps", ) applyCmd.Flags().BoolVar( diff --git a/pkg/apply/apply.go b/pkg/apply/apply.go index 76ddf3a5..88906a94 100644 --- a/pkg/apply/apply.go +++ b/pkg/apply/apply.go @@ -356,9 +356,20 @@ func (t *TopicApplier) updateSettings( return err } + var retentionDropStepDuration time.Duration + if t.config.RetentionDropStepDuration != 0 { + retentionDropStepDuration = t.config.RetentionDropStepDuration + } else { + var err error + retentionDropStepDuration, err = t.config.ClusterConfig.GetDefaultRetentionDropStepDuration() + if err != nil { + return err + } + } + reduced, err := topicSettings.ReduceRetentionDrop( topicInfo.Config, - t.config.RetentionDropStepDuration, + retentionDropStepDuration, ) if err != nil { return err diff --git a/pkg/config/cluster.go b/pkg/config/cluster.go index 64348d81..a80be41d 100644 --- a/pkg/config/cluster.go +++ b/pkg/config/cluster.go @@ -3,6 +3,8 @@ package config import ( "context" "errors" + "fmt" + "time" "github.com/aws/aws-sdk-go/aws/session" "github.com/hashicorp/go-multierror" @@ -68,6 +70,10 @@ type ClusterSpec struct { // DefaultThrottleMB is the default broker throttle used for migrations in this // cluster. If unset, then a reasonable default is used instead. DefaultThrottleMB int64 `json:"defaultThrottleMB"` + + // DefaultRetentionDropStepDuration is the default amount of time that retention drops will be + // limited by. If unset, no retention drop limiting will be applied. + DefaultRetentionDropStepDurationStr string `json:"defaultRetentionDropStepDuration"` } // Validate evaluates whether the cluster config is valid. @@ -98,9 +104,25 @@ func (c ClusterConfig) Validate() error { multierror.Append(err, errors.New("MajorVersion must be v0.10 or v2")) } + _, parseErr := c.GetDefaultRetentionDropStepDuration() + if parseErr != nil { + err = multierror.Append( + err, + fmt.Errorf("Error parsing retention drop step retention: %+v", parseErr), + ) + } + return err } +func (c ClusterConfig) GetDefaultRetentionDropStepDuration() (time.Duration, error) { + if c.Spec.DefaultRetentionDropStepDurationStr == "" { + return 0, nil + } + + return time.ParseDuration(c.Spec.DefaultRetentionDropStepDurationStr) +} + // NewAdminClient returns a new admin client using the parameters in the current cluster config. func (c ClusterConfig) NewAdminClient( ctx context.Context, diff --git a/pkg/config/cluster_test.go b/pkg/config/cluster_test.go index 34dbb374..9e0b3a40 100644 --- a/pkg/config/cluster_test.go +++ b/pkg/config/cluster_test.go @@ -24,9 +24,10 @@ func TestClusterValidate(t *testing.T) { Description: "test-description", }, Spec: ClusterSpec{ - BootstrapAddrs: []string{"broker-addr"}, - ZKAddrs: []string{"zk-addr"}, - VersionMajor: "v2", + BootstrapAddrs: []string{"broker-addr"}, + ZKAddrs: []string{"zk-addr"}, + VersionMajor: "v2", + DefaultRetentionDropStepDurationStr: "5m", }, }, expError: false, @@ -78,6 +79,24 @@ func TestClusterValidate(t *testing.T) { }, expError: true, }, + { + description: "bad retention drop format", + clusterConfig: ClusterConfig{ + Meta: ClusterMeta{ + Name: "test-cluster", + Region: "test-region", + Environment: "test-environment", + Description: "test-description", + }, + Spec: ClusterSpec{ + BootstrapAddrs: []string{"broker-addr"}, + ZKAddrs: []string{"zk-addr"}, + VersionMajor: "v2", + DefaultRetentionDropStepDurationStr: "10xxx", + }, + }, + expError: true, + }, } for _, testCase := range testCases {