Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 46 additions & 20 deletions cmd/topicctl/subcmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,26 @@ import (
)

var applyCmd = &cobra.Command{
Use: "apply [topic configs]",
Short: "apply one or more topic configs",
Args: cobra.MinimumNArgs(1),
RunE: applyRun,
Use: "apply [topic configs]",
Short: "apply one or more topic configs",
Args: cobra.MinimumNArgs(1),
PreRunE: applyPreRun,
RunE: applyRun,
}

type applyCmdConfig struct {
brokersToRemove []int
brokerThrottleMBsOverride int
clusterConfig string
dryRun bool
partitionBatchSizeOverride int
pathPrefix string
rebalance bool
skipConfirm bool
sleepLoopTime time.Duration
brokersToRemove []int
brokerThrottleMBsOverride int
clusterConfig string
dryRun bool
partitionBatchSizeOverride int
pathPrefix string
rebalance bool
retentionDropStepDurationStr string
skipConfirm bool
sleepLoopDuration time.Duration

retentionDropStepDuration time.Duration
}

var applyConfig applyCmdConfig
Expand Down Expand Up @@ -69,17 +73,23 @@ func init() {
0,
"Partition batch size override",
)
applyCmd.Flags().StringVar(
&applyConfig.pathPrefix,
"path-prefix",
os.Getenv("TOPICCTL_APPLY_PATH_PREFIX"),
"Prefix for topic config paths",
)
applyCmd.Flags().BoolVar(
&applyConfig.rebalance,
"rebalance",
false,
"Explicitly rebalance broker partition assignments",
)
applyCmd.Flags().StringVar(
&applyConfig.pathPrefix,
"path-prefix",
os.Getenv("TOPICCTL_APPLY_PATH_PREFIX"),
"Prefix for topic config paths",
&applyConfig.retentionDropStepDurationStr,
"retention-drop-step-duration",
"",
"Amount of time to use for retention drop steps",
)
applyCmd.Flags().BoolVar(
&applyConfig.skipConfirm,
Expand All @@ -88,15 +98,30 @@ func init() {
"Skip confirmation prompts during apply process",
)
applyCmd.Flags().DurationVar(
&applyConfig.sleepLoopTime,
"sleep-loop-time",
&applyConfig.sleepLoopDuration,
"sleep-loop-duration",
10*time.Second,
"Amount of time to wait between partition checks",
)

RootCmd.AddCommand(applyCmd)
}

func applyPreRun(cmd *cobra.Command, args []string) error {
if applyConfig.retentionDropStepDurationStr != "" {
var err error
applyConfig.retentionDropStepDuration, err = time.ParseDuration(
applyConfig.retentionDropStepDurationStr,
)

if err != nil {
return err
}
}

return nil
}

func applyRun(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -189,8 +214,9 @@ func applyTopic(
DryRun: applyConfig.dryRun,
PartitionBatchSizeOverride: applyConfig.partitionBatchSizeOverride,
Rebalance: applyConfig.rebalance,
RetentionDropStepDuration: applyConfig.retentionDropStepDuration,
SkipConfirm: applyConfig.skipConfirm,
SleepLoopTime: applyConfig.sleepLoopTime,
SleepLoopDuration: applyConfig.sleepLoopDuration,
TopicConfig: topicConfig,
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/topicctl/subcmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func getRun(cmd *cobra.Command, args []string) error {
return fmt.Errorf("Must provide topic and groupID as additional positional arguments")
}

return cliRunner.GetMemberLags(ctx, args[1], args[2])
return cliRunner.GetMemberLags(ctx, args[1], args[2], getConfig.full)
case "members":
if len(args) != 2 {
return fmt.Errorf("Must provide group ID as second positional argument")
Expand Down
45 changes: 39 additions & 6 deletions pkg/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"path/filepath"
"reflect"
"strings"
"time"

"github.com/hashicorp/go-multierror"
Expand All @@ -29,8 +30,9 @@ type TopicApplierConfig struct {
DryRun bool
PartitionBatchSizeOverride int
Rebalance bool
RetentionDropStepDuration time.Duration
SkipConfirm bool
SleepLoopTime time.Duration
SleepLoopDuration time.Duration
TopicConfig config.TopicConfig
}

Expand Down Expand Up @@ -168,7 +170,7 @@ func (t *TopicApplier) applyNewTopic(ctx context.Context) error {
}

// Just do a short sleep to ensure that zk is updated before we check
if err := interruptableSleep(ctx, t.config.SleepLoopTime/5); err != nil {
if err := interruptableSleep(ctx, t.config.SleepLoopDuration/5); err != nil {
return err
}

Expand Down Expand Up @@ -354,6 +356,25 @@ func (t *TopicApplier) updateSettings(
return err
}

var retentionDropStepDuration time.Duration
if t.config.RetentionDropStepDuration != 0 {
retentionDropStepDuration = t.config.RetentionDropStepDuration
} else {
var err error
retentionDropStepDuration, err = t.config.ClusterConfig.GetDefaultRetentionDropStepDuration()
if err != nil {
return err
}
}

reduced, err := topicSettings.ReduceRetentionDrop(
topicInfo.Config,
retentionDropStepDuration,
)
if err != nil {
return err
}

if len(diffKeys) > 0 {
diffsTable, err := FormatSettingsDiff(topicSettings, topicInfo.Config, diffKeys)
if err != nil {
Expand All @@ -366,6 +387,18 @@ func (t *TopicApplier) updateSettings(
diffsTable,
)

if reduced {
log.Infof(
strings.Join(
[]string{
"Note: Retention drop has been reduced to minimize cluster disruption.",
"Re-run apply afterwards to keep dropping retention to configured value or run with --retention-drop-step-duration=0 to not do gradual step-down.",
},
" ",
),
)
}

if t.config.DryRun {
log.Infof("Skipping update because dryRun is set to true")
return nil
Expand Down Expand Up @@ -887,7 +920,7 @@ func (t *TopicApplier) updatePartitionsIteration(
return err
}

checkTimer := time.NewTicker(t.config.SleepLoopTime)
checkTimer := time.NewTicker(t.config.SleepLoopDuration)
defer checkTimer.Stop()

log.Info("Sleeping then entering check loop")
Expand Down Expand Up @@ -945,7 +978,7 @@ outerLoop:
len(assignmentsToUpdate),
admin.FormatTopicPartitions(notReady, t.brokers),
)
log.Infof("Sleeping for %s", t.config.SleepLoopTime.String())
log.Infof("Sleeping for %s", t.config.SleepLoopDuration.String())
case <-ctx.Done():
return ctx.Err()
}
Expand Down Expand Up @@ -1181,7 +1214,7 @@ func (t *TopicApplier) updateLeadersIteration(
return err
}

checkTimer := time.NewTicker(t.config.SleepLoopTime)
checkTimer := time.NewTicker(t.config.SleepLoopDuration)
defer checkTimer.Stop()

log.Info("Sleeping then entering check loop")
Expand Down Expand Up @@ -1212,7 +1245,7 @@ outerLoop:
admin.FormatTopicPartitions(wrongLeaders, t.brokers),
)

log.Infof("Sleeping for %s", t.config.SleepLoopTime.String())
log.Infof("Sleeping for %s", t.config.SleepLoopDuration.String())
case <-ctx.Done():
return ctx.Err()
}
Expand Down
20 changes: 12 additions & 8 deletions pkg/apply/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func TestApplyBasicUpdates(t *testing.T) {
}

applier := testApplier(ctx, t, topicConfig)
applier.config.RetentionDropStepDuration = 50 * time.Minute

assert.Equal(t, 3, applier.maxBatchSize)
assert.Equal(t, int64(2000000), applier.throttleBytes)

Expand All @@ -62,13 +64,15 @@ func TestApplyBasicUpdates(t *testing.T) {
assert.Equal(t, "compact", topicInfo.Config["cleanup.policy"])

// Update retention and settings
applier.topicConfig.Spec.RetentionMinutes = 501
applier.topicConfig.Spec.RetentionMinutes = 400
applier.topicConfig.Spec.Settings["cleanup.policy"] = "delete"
err = applier.Apply(ctx)
require.Nil(t, err)
topicInfo, err = applier.adminClient.GetTopic(ctx, topicName, true)
require.Nil(t, err)
assert.Equal(t, "30060000", topicInfo.Config[admin.RetentionKey])

// Dropped to only 450 because of retention reduction
assert.Equal(t, "27000000", topicInfo.Config[admin.RetentionKey])
assert.Equal(t, "delete", topicInfo.Config["cleanup.policy"])

// Updating replication factor not allowed
Expand Down Expand Up @@ -868,7 +872,7 @@ func TestApplyOverrides(t *testing.T) {
TopicConfig: topicConfig,
DryRun: false,
SkipConfirm: true,
SleepLoopTime: 500 * time.Millisecond,
SleepLoopDuration: 500 * time.Millisecond,
PartitionBatchSizeOverride: 8,
},
)
Expand Down Expand Up @@ -906,11 +910,11 @@ func testApplier(
ctx,
adminClient,
TopicApplierConfig{
ClusterConfig: clusterConfig,
TopicConfig: topicConfig,
DryRun: false,
SkipConfirm: true,
SleepLoopTime: 500 * time.Millisecond,
ClusterConfig: clusterConfig,
TopicConfig: topicConfig,
DryRun: false,
SkipConfirm: true,
SleepLoopDuration: 500 * time.Millisecond,
},
)
require.Nil(t, err)
Expand Down
10 changes: 5 additions & 5 deletions pkg/check/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ func TestCheck(t *testing.T) {
ctx,
adminClient,
apply.TopicApplierConfig{
ClusterConfig: clusterConfig,
TopicConfig: topicConfig,
DryRun: false,
SkipConfirm: true,
SleepLoopTime: 500 * time.Millisecond,
ClusterConfig: clusterConfig,
TopicConfig: topicConfig,
DryRun: false,
SkipConfirm: true,
SleepLoopDuration: 500 * time.Millisecond,
},
)
require.Nil(t, err)
Expand Down
9 changes: 7 additions & 2 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,12 @@ func (c *CLIRunner) GetGroupMembers(ctx context.Context, groupID string, full bo

// GetMemberLags fetches and prints a summary of the consumer group lag for each partition
// in a single topic.
func (c *CLIRunner) GetMemberLags(ctx context.Context, topic string, groupID string) error {
func (c *CLIRunner) GetMemberLags(
ctx context.Context,
topic string,
groupID string,
full bool,
) error {
c.startSpinner()

// Check that topic exists before getting offsets; otherwise, the topic get
Expand All @@ -370,7 +375,7 @@ func (c *CLIRunner) GetMemberLags(ctx context.Context, topic string, groupID str
return err
}

c.printer("Group member lags:\n%s", groups.FormatMemberLags(memberLags))
c.printer("Group member lags:\n%s", groups.FormatMemberLags(memberLags, full))
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (r *Repl) executor(in string) {
log.Errorf("Error: %+v", err)
return
}
if err := r.cliRunner.GetMemberLags(ctx, words[2], words[3]); err != nil {
if err := r.cliRunner.GetMemberLags(ctx, words[2], words[3], false); err != nil {
log.Errorf("Error: %+v", err)
return
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/config/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package config
import (
"context"
"errors"
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws/session"
"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -68,6 +70,10 @@ type ClusterSpec struct {
// DefaultThrottleMB is the default broker throttle used for migrations in this
// cluster. If unset, then a reasonable default is used instead.
DefaultThrottleMB int64 `json:"defaultThrottleMB"`

// DefaultRetentionDropStepDuration is the default amount of time that retention drops will be
// limited by. If unset, no retention drop limiting will be applied.
DefaultRetentionDropStepDurationStr string `json:"defaultRetentionDropStepDuration"`
}

// Validate evaluates whether the cluster config is valid.
Expand Down Expand Up @@ -98,9 +104,25 @@ func (c ClusterConfig) Validate() error {
multierror.Append(err, errors.New("MajorVersion must be v0.10 or v2"))
}

_, parseErr := c.GetDefaultRetentionDropStepDuration()
if parseErr != nil {
err = multierror.Append(
err,
fmt.Errorf("Error parsing retention drop step retention: %+v", parseErr),
)
}

return err
}

func (c ClusterConfig) GetDefaultRetentionDropStepDuration() (time.Duration, error) {
if c.Spec.DefaultRetentionDropStepDurationStr == "" {
return 0, nil
}

return time.ParseDuration(c.Spec.DefaultRetentionDropStepDurationStr)
}

// NewAdminClient returns a new admin client using the parameters in the current cluster config.
func (c ClusterConfig) NewAdminClient(
ctx context.Context,
Expand Down
Loading