Skip to content

Commit

Permalink
rpk/config: support multiple Kafka listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
David Castillo committed Feb 2, 2021
1 parent f439e36 commit efcfd33
Show file tree
Hide file tree
Showing 16 changed files with 241 additions and 119 deletions.
14 changes: 11 additions & 3 deletions src/go/rpk/pkg/cli/cmd/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,22 +124,30 @@ func DeduceBrokers(
return []string{"127.0.0.1:9092"}
}

if len(conf.Redpanda.KafkaApi) == 0 {
log.Trace(
"The config file contains no kafka listeners." +
" Empty redpanda.kafka_api.",
)
return []string{}
}

// Add the seed servers' Kafka addrs.
if len(conf.Redpanda.SeedServers) > 0 {
for _, b := range conf.Redpanda.SeedServers {
addr := fmt.Sprintf(
"%s:%d",
b.Host.Address,
conf.Redpanda.KafkaApi.Port,
conf.Redpanda.KafkaApi[0].Port,
)
bs = append(bs, addr)
}
}
// Add the current node's Kafka addr.
selfAddr := fmt.Sprintf(
"%s:%d",
conf.Redpanda.KafkaApi.Address,
conf.Redpanda.KafkaApi.Port,
conf.Redpanda.KafkaApi[0].Address,
conf.Redpanda.KafkaApi[0].Port,
)
bs = append(bs, selfAddr)
log.Debugf(
Expand Down
8 changes: 4 additions & 4 deletions src/go/rpk/pkg/cli/cmd/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ func TestDeduceBrokers(t *testing.T) {
},
config: func() (*config.Config, error) {
conf := config.Default()
conf.Redpanda.KafkaApi = config.NamedSocketAddress{
conf.Redpanda.KafkaApi = []config.NamedSocketAddress{{
SocketAddress: config.SocketAddress{
Address: "192.168.25.88",
Port: 1235,
},
}
}}
return conf, nil
},
expected: []string{"192.168.25.88:1235"},
Expand All @@ -96,12 +96,12 @@ func TestDeduceBrokers(t *testing.T) {
name: "it should prioritize the config over the default broker addr",
config: func() (*config.Config, error) {
conf := config.Default()
conf.Redpanda.KafkaApi = config.NamedSocketAddress{
conf.Redpanda.KafkaApi = []config.NamedSocketAddress{{
SocketAddress: config.SocketAddress{
Address: "192.168.25.87",
Port: 1234,
},
}
}}
return conf, nil
},
expected: []string{"192.168.25.87:1234"},
Expand Down
6 changes: 3 additions & 3 deletions src/go/rpk/pkg/cli/cmd/container/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func GetState(c Client, nodeID uint) (*NodeState, error) {
return nil, err
}
hostKafkaPort, err := getHostPort(
config.Default().Redpanda.KafkaApi.Port,
config.DefaultKafkaPort,
containerJSON,
)
if err != nil {
Expand Down Expand Up @@ -212,7 +212,7 @@ func CreateNode(
}
kPort, err := nat.NewPort(
"tcp",
strconv.Itoa(config.Default().Redpanda.KafkaApi.Port),
strconv.Itoa(config.DefaultKafkaPort),
)
if err != nil {
return nil, err
Expand All @@ -234,7 +234,7 @@ func CreateNode(
"--node-id",
fmt.Sprintf("%d", nodeID),
"--kafka-addr",
fmt.Sprintf("%s:%d", ip, config.Default().Redpanda.KafkaApi.Port),
fmt.Sprintf("%s:%d", ip, config.DefaultKafkaPort),
"--rpc-addr",
fmt.Sprintf("%s:%d", ip, config.Default().Redpanda.RPCServer.Port),
"--advertise-kafka-addr",
Expand Down
9 changes: 7 additions & 2 deletions src/go/rpk/pkg/cli/cmd/debug/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,15 @@ func getKafkaInfo(
send bool,
) error {
kInfo := kafkaInfo{}
if len(conf.Redpanda.KafkaApi) == 0 {
out <- [][]string{}
kafkaInfoCh <- kInfo
return nil
}
addr := fmt.Sprintf(
"%s:%d",
conf.Redpanda.KafkaApi.Address,
conf.Redpanda.KafkaApi.Port,
conf.Redpanda.KafkaApi[0].Address,
conf.Redpanda.KafkaApi[0].Port,
)
client, err := kafka.InitClientWithConf(&conf, addr)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions src/go/rpk/pkg/cli/cmd/generate/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ func executePrometheusConfig(
return []byte(""), err
}
hosts, err := discoverHosts(
conf.Redpanda.KafkaApi.Address,
conf.Redpanda.KafkaApi.Port,
conf.Redpanda.KafkaApi[0].Address,
conf.Redpanda.KafkaApi[0].Port,
)
if err != nil {
return []byte(""), err
Expand Down
8 changes: 7 additions & 1 deletion src/go/rpk/pkg/cli/cmd/redpanda/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,13 @@ func bootstrap(mgr config.Manager) *cobra.Command {
}
conf.Redpanda.Id = id
conf.Redpanda.RPCServer.Address = ownIp.String()
conf.Redpanda.KafkaApi.Address = ownIp.String()
conf.Redpanda.KafkaApi = []config.NamedSocketAddress{{
SocketAddress: config.SocketAddress{
Address: ownIp.String(),
Port: config.DefaultKafkaPort,
},
}}

conf.Redpanda.AdminApi.Address = ownIp.String()
conf.Redpanda.SeedServers = []config.SeedServer{}
seeds := []config.SeedServer{}
Expand Down
2 changes: 1 addition & 1 deletion src/go/rpk/pkg/cli/cmd/redpanda/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func TestBootstrap(t *testing.T) {
conf, err := mgr.Read(configPath)
require.NoError(t, err)
require.Equal(t, conf.Redpanda.RPCServer.Address, tt.self)
require.Equal(t, conf.Redpanda.KafkaApi.Address, tt.self)
require.Equal(t, conf.Redpanda.KafkaApi[0].Address, tt.self)
require.Equal(t, conf.Redpanda.AdminApi.Address, tt.self)
if len(tt.ips) == 1 {
require.Equal(
Expand Down
25 changes: 14 additions & 11 deletions src/go/rpk/pkg/cli/cmd/redpanda/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func NewStartCommand(
configFile string
nodeID uint
seeds []string
kafkaAddr string
kafkaAddr []string
rpcAddr string
advertisedKafka []string
advertisedRPC string
Expand Down Expand Up @@ -123,20 +123,23 @@ func NewStartCommand(
conf.Redpanda.SeedServers = seedServers
}

kafkaAddr = stringOr(
kafkaAddr = stringSliceOr(
kafkaAddr,
os.Getenv("REDPANDA_KAFKA_ADDRESS"),
strings.Split(
os.Getenv("REDPANDA_KAFKA_ADDRESS"),
",",
),
)
kafkaApi, err := parseNamedAddress(
kafkaApi, err := parseNamedAddresses(
kafkaAddr,
config.Default().Redpanda.KafkaApi.Port,
config.DefaultKafkaPort,
)
if err != nil {
sendEnv(fs, mgr, env, conf, err)
return err
}
if kafkaApi != nil {
conf.Redpanda.KafkaApi = *kafkaApi
if kafkaApi != nil && len(kafkaApi) > 0 {
conf.Redpanda.KafkaApi = kafkaApi
}

rpcAddr = stringOr(
Expand Down Expand Up @@ -164,7 +167,7 @@ func NewStartCommand(
)
advKafkaApi, err := parseNamedAddresses(
advertisedKafka,
config.Default().Redpanda.KafkaApi.Port,
config.DefaultKafkaPort,
)
if err != nil {
sendEnv(fs, mgr, env, conf, err)
Expand Down Expand Up @@ -254,11 +257,11 @@ func NewStartCommand(
"A comma-separated list of seed node addresses"+
" (<host>[:<port>]) to connect to",
)
command.Flags().StringVar(
command.Flags().StringSliceVar(
&kafkaAddr,
"kafka-addr",
"",
"The Kafka address to bind to (<host>:<port>)",
[]string{},
"The list of Kafka listener addresses to bind to (<host>:<port>)",
)
command.Flags().StringVar(
&rpcAddr,
Expand Down
53 changes: 41 additions & 12 deletions src/go/rpk/pkg/cli/cmd/redpanda/start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,12 +451,12 @@ func TestStartCommand(t *testing.T) {
mgr := config.NewManager(fs)
conf, err := mgr.Read(config.Default().ConfigFile)
require.NoError(st, err)
expectedAddr := config.NamedSocketAddress{
expectedAddr := []config.NamedSocketAddress{{
SocketAddress: config.SocketAddress{
Address: "192.168.34.32",
Port: 33145,
},
}
}}
// Check that the generated config is as expected.
require.Exactly(
st,
Expand All @@ -474,12 +474,12 @@ func TestStartCommand(t *testing.T) {
mgr := config.NewManager(fs)
conf, err := mgr.Read(config.Default().ConfigFile)
require.NoError(st, err)
expectedAddr := config.NamedSocketAddress{
expectedAddr := []config.NamedSocketAddress{{
SocketAddress: config.SocketAddress{
Address: "192.168.34.32",
Port: 9092,
},
}
}}
// Check that the generated config is as expected.
require.Exactly(
st,
Expand All @@ -497,13 +497,42 @@ func TestStartCommand(t *testing.T) {
mgr := config.NewManager(fs)
conf, err := mgr.Read(config.Default().ConfigFile)
require.NoError(st, err)
expectedAddr := config.NamedSocketAddress{
expectedAddr := []config.NamedSocketAddress{{
Name: "nondefaultname",
SocketAddress: config.SocketAddress{
Address: "192.168.34.32",
Port: 9092,
},
}
}}
// Check that the generated config is as expected.
require.Exactly(
st,
expectedAddr,
conf.Redpanda.KafkaApi,
)
},
}, {
name: "it should parse the --kafka-addr and persist it (list)",
args: []string{
"--install-dir", "/var/lib/redpanda",
"--kafka-addr", "nondefaultname://192.168.34.32,host:9092",
},
postCheck: func(fs afero.Fs, _ *rp.RedpandaArgs, st *testing.T) {
mgr := config.NewManager(fs)
conf, err := mgr.Read(config.Default().ConfigFile)
require.NoError(st, err)
expectedAddr := []config.NamedSocketAddress{{
Name: "nondefaultname",
SocketAddress: config.SocketAddress{
Address: "192.168.34.32",
Port: 9092,
},
}, {
SocketAddress: config.SocketAddress{
Address: "host",
Port: 9092,
},
}}
// Check that the generated config is as expected.
require.Exactly(
st,
Expand Down Expand Up @@ -534,12 +563,12 @@ func TestStartCommand(t *testing.T) {
mgr := config.NewManager(fs)
conf, err := mgr.Read(config.Default().ConfigFile)
require.NoError(st, err)
expectedAddr := config.NamedSocketAddress{
expectedAddr := []config.NamedSocketAddress{{
SocketAddress: config.SocketAddress{
Address: "host",
Port: 3123,
},
}
}}
// Check that the generated config is as expected.
require.Exactly(
st,
Expand All @@ -555,24 +584,24 @@ func TestStartCommand(t *testing.T) {
before: func(fs afero.Fs) error {
mgr := config.NewManager(fs)
conf := config.Default()
conf.Redpanda.KafkaApi = config.NamedSocketAddress{
conf.Redpanda.KafkaApi = []config.NamedSocketAddress{{
SocketAddress: config.SocketAddress{
Address: "192.168.33.33",
Port: 9892,
},
}
}}
return mgr.Write(conf)
},
postCheck: func(fs afero.Fs, _ *rp.RedpandaArgs, st *testing.T) {
mgr := config.NewManager(fs)
conf, err := mgr.Read(config.Default().ConfigFile)
require.NoError(st, err)
expectedAddr := config.NamedSocketAddress{
expectedAddr := []config.NamedSocketAddress{{
SocketAddress: config.SocketAddress{
Address: "192.168.33.33",
Port: 9892,
},
}
}}
// Check that the generated config is as expected.
require.Exactly(
st,
Expand Down
Loading

0 comments on commit efcfd33

Please sign in to comment.