/
delete_group.go
91 lines (79 loc) · 2.27 KB
/
delete_group.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package commands
import (
"fmt"
"regexp"
"sort"
"github.com/pkg/errors"
"gopkg.in/alecthomas/kingpin.v2"
"github.com/xitonix/trubka/internal"
"github.com/xitonix/trubka/kafka"
)
type deleteGroup struct {
globalParams *GlobalParameters
kafkaParams *kafkaParameters
group string
interactive bool
groupFilter *regexp.Regexp
silent bool
}
func addDeleteGroupSubCommand(parent *kingpin.CmdClause, global *GlobalParameters, kafkaParams *kafkaParameters) {
cmd := &deleteGroup{
globalParams: global,
kafkaParams: kafkaParams,
}
c := parent.Command("delete", "Deletes an empty consumer group.").Action(cmd.run)
c.Flag("group", "The consumer group name to remove.").
Short('G').
StringVar(&cmd.group)
c.Flag("interactive", "Runs the command in interactive mode. The --group parameter will be ignored in this mode.").
Short('i').
BoolVar(&cmd.interactive)
c.Flag("group-filter", "An optional regular expression to filter the groups by (interactive mode only).").
Short('g').
RegexpVar(&cmd.groupFilter)
c.Flag("silent", "Deletes the consumer group without user confirmation.").
Short('s').
BoolVar(&cmd.silent)
}
func (c *deleteGroup) run(_ *kingpin.ParseContext) error {
manager, ctx, cancel, err := initKafkaManager(c.globalParams, c.kafkaParams)
if err != nil {
return err
}
defer func() {
cancel()
manager.Close()
}()
if !c.interactive {
return c.delete(manager, c.group)
}
groups, err := manager.GetConsumerGroups(ctx, false, nil, c.groupFilter, nil)
if err != nil {
return err
}
if len(groups) == 0 {
fmt.Println(getNotFoundMessage("consumer group", "group", c.groupFilter))
return nil
}
names := groups.Names()
sort.Strings(names)
index := pickAnIndex("Choose a consumer group ID to delete", "group", names)
if index < 0 {
return nil
}
toRemove := names[index]
return c.delete(manager, toRemove)
}
func (c *deleteGroup) delete(manager *kafka.Manager, group string) error {
if internal.IsEmpty(group) {
return errors.New("Consumer group cannot be empty.")
}
if c.silent || askForConfirmation(fmt.Sprintf("Are you sure you want to delete %s", group)) {
err := manager.DeleteConsumerGroup(group)
if err != nil {
return err
}
fmt.Printf("%s consumer group has been deleted successfully.\n", group)
}
return nil
}