Skip to content

Commit

Permalink
rpk/config: Support named listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
David Castillo committed Feb 2, 2021
1 parent 9bc274d commit 17a6803
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 149 deletions.
16 changes: 10 additions & 6 deletions src/go/rpk/pkg/cli/cmd/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@ func TestDeduceBrokers(t *testing.T) {
},
config: func() (*config.Config, error) {
conf := config.Default()
conf.Redpanda.KafkaApi = config.SocketAddress{
Address: "192.168.25.88",
Port: 1235,
conf.Redpanda.KafkaApi = config.NamedSocketAddress{
SocketAddress: config.SocketAddress{
Address: "192.168.25.88",
Port: 1235,
},
}
return conf, nil
},
Expand All @@ -94,9 +96,11 @@ func TestDeduceBrokers(t *testing.T) {
name: "it should prioritize the config over the default broker addr",
config: func() (*config.Config, error) {
conf := config.Default()
conf.Redpanda.KafkaApi = config.SocketAddress{
Address: "192.168.25.87",
Port: 1234,
conf.Redpanda.KafkaApi = config.NamedSocketAddress{
SocketAddress: config.SocketAddress{
Address: "192.168.25.87",
Port: 1234,
},
}
return conf, nil
},
Expand Down
36 changes: 6 additions & 30 deletions src/go/rpk/pkg/cli/cmd/redpanda/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func NewStartCommand(
kafkaAddr,
os.Getenv("REDPANDA_KAFKA_ADDRESS"),
)
kafkaApi, err := parseAddress(
kafkaApi, err := parseNamedAddress(
kafkaAddr,
config.Default().Redpanda.KafkaApi.Port,
)
Expand Down Expand Up @@ -162,7 +162,7 @@ func NewStartCommand(
",",
),
)
advKafkaApi, err := parseAddresses(
advKafkaApi, err := parseNamedAddresses(
advertisedKafka,
config.Default().Redpanda.KafkaApi.Port,
)
Expand Down Expand Up @@ -703,39 +703,15 @@ func parseSeeds(seeds []string) ([]config.SeedServer, error) {
return seedServers, nil
}

func parseAddresses(
addrs []string, defaultPort int,
) ([]config.SocketAddress, error) {
as := make([]config.SocketAddress, 0, len(addrs))
for _, addr := range addrs {
a, err := parseAddress(addr, defaultPort)
if err != nil {
return nil, err
}
if a != nil {
as = append(as, *a)
}
}
return as, nil
}

func parseAddress(addr string, defaultPort int) (*config.SocketAddress, error) {
if addr == "" {
return nil, nil
}

_, hostname, port, err := parseURL(addr)
named, err := parseNamedAddress(addr, defaultPort)
if err != nil {
return nil, err
}
if port == 0 {
port = defaultPort
if named == nil {
return nil, nil
}

return &config.SocketAddress{
Address: hostname,
Port: port,
}, nil
return &named.SocketAddress, nil
}

func parseNamedAddresses(
Expand Down
104 changes: 74 additions & 30 deletions src/go/rpk/pkg/cli/cmd/redpanda/start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,9 +451,11 @@ func TestStartCommand(t *testing.T) {
mgr := config.NewManager(fs)
conf, err := mgr.Read(config.Default().ConfigFile)
require.NoError(st, err)
expectedAddr := config.SocketAddress{
Address: "192.168.34.32",
Port: 33145,
expectedAddr := config.NamedSocketAddress{
SocketAddress: config.SocketAddress{
Address: "192.168.34.32",
Port: 33145,
},
}
// Check that the generated config is as expected.
require.Exactly(
Expand All @@ -472,9 +474,35 @@ func TestStartCommand(t *testing.T) {
mgr := config.NewManager(fs)
conf, err := mgr.Read(config.Default().ConfigFile)
require.NoError(st, err)
expectedAddr := config.SocketAddress{
Address: "192.168.34.32",
Port: 9092,
expectedAddr := config.NamedSocketAddress{
SocketAddress: config.SocketAddress{
Address: "192.168.34.32",
Port: 9092,
},
}
// Check that the generated config is as expected.
require.Exactly(
st,
expectedAddr,
conf.Redpanda.KafkaApi,
)
},
}, {
name: "it should parse the --kafka-addr and persist it (named)",
args: []string{
"--install-dir", "/var/lib/redpanda",
"--kafka-addr", "nondefaultname://192.168.34.32",
},
postCheck: func(fs afero.Fs, _ *rp.RedpandaArgs, st *testing.T) {
mgr := config.NewManager(fs)
conf, err := mgr.Read(config.Default().ConfigFile)
require.NoError(st, err)
expectedAddr := config.NamedSocketAddress{
Name: "nondefaultname",
SocketAddress: config.SocketAddress{
Address: "192.168.34.32",
Port: 9092,
},
}
// Check that the generated config is as expected.
require.Exactly(
Expand Down Expand Up @@ -506,9 +534,11 @@ func TestStartCommand(t *testing.T) {
mgr := config.NewManager(fs)
conf, err := mgr.Read(config.Default().ConfigFile)
require.NoError(st, err)
expectedAddr := config.SocketAddress{
Address: "host",
Port: 3123,
expectedAddr := config.NamedSocketAddress{
SocketAddress: config.SocketAddress{
Address: "host",
Port: 3123,
},
}
// Check that the generated config is as expected.
require.Exactly(
Expand All @@ -525,19 +555,23 @@ func TestStartCommand(t *testing.T) {
before: func(fs afero.Fs) error {
mgr := config.NewManager(fs)
conf := config.Default()
conf.Redpanda.KafkaApi = config.SocketAddress{
Address: "192.168.33.33",
Port: 9892,
conf.Redpanda.KafkaApi = config.NamedSocketAddress{
SocketAddress: config.SocketAddress{
Address: "192.168.33.33",
Port: 9892,
},
}
return mgr.Write(conf)
},
postCheck: func(fs afero.Fs, _ *rp.RedpandaArgs, st *testing.T) {
mgr := config.NewManager(fs)
conf, err := mgr.Read(config.Default().ConfigFile)
require.NoError(st, err)
expectedAddr := config.SocketAddress{
Address: "192.168.33.33",
Port: 9892,
expectedAddr := config.NamedSocketAddress{
SocketAddress: config.SocketAddress{
Address: "192.168.33.33",
Port: 9892,
},
}
// Check that the generated config is as expected.
require.Exactly(
Expand All @@ -556,9 +590,11 @@ func TestStartCommand(t *testing.T) {
mgr := config.NewManager(fs)
conf, err := mgr.Read(config.Default().ConfigFile)
require.NoError(st, err)
expectedAddr := []config.SocketAddress{{
Address: "192.168.34.32",
Port: 33145,
expectedAddr := []config.NamedSocketAddress{{
SocketAddress: config.SocketAddress{
Address: "192.168.34.32",
Port: 33145,
},
}}
// Check that the generated config is as expected.
require.Exactly(
Expand All @@ -577,9 +613,11 @@ func TestStartCommand(t *testing.T) {
mgr := config.NewManager(fs)
conf, err := mgr.Read(config.Default().ConfigFile)
require.NoError(st, err)
expectedAddr := []config.SocketAddress{{
Address: "192.168.34.32",
Port: 9092,
expectedAddr := []config.NamedSocketAddress{{
SocketAddress: config.SocketAddress{
Address: "192.168.34.32",
Port: 9092,
},
}}
// Check that the generated config is as expected.
require.Exactly(
Expand Down Expand Up @@ -611,9 +649,11 @@ func TestStartCommand(t *testing.T) {
mgr := config.NewManager(fs)
conf, err := mgr.Read(config.Default().ConfigFile)
require.NoError(st, err)
expectedAddr := []config.SocketAddress{{
Address: "host",
Port: 3123,
expectedAddr := []config.NamedSocketAddress{{
SocketAddress: config.SocketAddress{
Address: "host",
Port: 3123,
},
}}
// Check that the generated config is as expected.
require.Exactly(
Expand All @@ -630,19 +670,23 @@ func TestStartCommand(t *testing.T) {
before: func(fs afero.Fs) error {
mgr := config.NewManager(fs)
conf := config.Default()
conf.Redpanda.AdvertisedKafkaApi = []config.SocketAddress{{
Address: "192.168.33.33",
Port: 9892,
conf.Redpanda.AdvertisedKafkaApi = []config.NamedSocketAddress{{
SocketAddress: config.SocketAddress{
Address: "192.168.33.33",
Port: 9892,
},
}}
return mgr.Write(conf)
},
postCheck: func(fs afero.Fs, _ *rp.RedpandaArgs, st *testing.T) {
mgr := config.NewManager(fs)
conf, err := mgr.Read(config.Default().ConfigFile)
require.NoError(st, err)
expectedAddr := []config.SocketAddress{{
Address: "192.168.33.33",
Port: 9892,
expectedAddr := []config.NamedSocketAddress{{
SocketAddress: config.SocketAddress{
Address: "192.168.33.33",
Port: 9892,
},
}}
// Check that the generated config is as expected.
require.Exactly(
Expand Down
4 changes: 4 additions & 0 deletions src/go/rpk/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,10 @@ func checkSocketAddress(s SocketAddress, configPath string) []error {
return errs
}

func checkNamedSocketAddress(s NamedSocketAddress, configPath string) []error {
return checkSocketAddress(s.SocketAddress, configPath)
}

func checkRpkConfig(v *viper.Viper) []error {
errs := []error{}
if v.GetBool("rpk.tune_coredump") && v.GetString("rpk.coredump_dir") == "" {
Expand Down
15 changes: 11 additions & 4 deletions src/go/rpk/pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,12 @@ func TestDefault(t *testing.T) {
Redpanda: RedpandaConfig{
Directory: "/var/lib/redpanda/data",
RPCServer: SocketAddress{"0.0.0.0", 33145},
KafkaApi: SocketAddress{"0.0.0.0", 9092},
KafkaApi: NamedSocketAddress{
SocketAddress: SocketAddress{
"0.0.0.0",
9092,
},
},
AdminApi: SocketAddress{"0.0.0.0", 9644},
Id: 0,
DeveloperMode: true,
Expand Down Expand Up @@ -330,9 +335,11 @@ rpk:
name: "shall write a valid config file without advertised_rpc_api",
conf: func() *Config {
c := getValidConfig()
c.Redpanda.AdvertisedKafkaApi = []SocketAddress{{
"174.32.64.2",
9092,
c.Redpanda.AdvertisedKafkaApi = []NamedSocketAddress{{
SocketAddress: SocketAddress{
"174.32.64.2",
9092,
},
}}
return c
},
Expand Down
20 changes: 10 additions & 10 deletions src/go/rpk/pkg/config/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ type Config struct {
}

type RedpandaConfig struct {
Directory string `yaml:"data_directory" mapstructure:"data_directory" json:"dataDirectory"`
RPCServer SocketAddress `yaml:"rpc_server" mapstructure:"rpc_server" json:"rpcServer"`
AdvertisedRPCAPI *SocketAddress `yaml:"advertised_rpc_api,omitempty" mapstructure:"advertised_rpc_api,omitempty" json:"advertisedRpcApi,omitempty"`
KafkaApi SocketAddress `yaml:"kafka_api" mapstructure:"kafka_api" json:"kafkaApi"`
AdvertisedKafkaApi []SocketAddress `yaml:"advertised_kafka_api,omitempty" mapstructure:"advertised_kafka_api,omitempty" json:"advertisedKafkaApi,omitempty"`
KafkaApiTLS ServerTLS `yaml:"kafka_api_tls,omitempty" mapstructure:"kafka_api_tls,omitempty" json:"kafkaApiTls"`
AdminApi SocketAddress `yaml:"admin" mapstructure:"admin" json:"admin"`
Id int `yaml:"node_id" mapstructure:"node_id" json:"id"`
SeedServers []SeedServer `yaml:"seed_servers" mapstructure:"seed_servers" json:"seedServers"`
DeveloperMode bool `yaml:"developer_mode" mapstructure:"developer_mode" json:"developerMode"`
Directory string `yaml:"data_directory" mapstructure:"data_directory" json:"dataDirectory"`
RPCServer SocketAddress `yaml:"rpc_server" mapstructure:"rpc_server" json:"rpcServer"`
AdvertisedRPCAPI *SocketAddress `yaml:"advertised_rpc_api,omitempty" mapstructure:"advertised_rpc_api,omitempty" json:"advertisedRpcApi,omitempty"`
KafkaApi NamedSocketAddress `yaml:"kafka_api" mapstructure:"kafka_api" json:"kafkaApi"`
AdvertisedKafkaApi []NamedSocketAddress `yaml:"advertised_kafka_api,omitempty" mapstructure:"advertised_kafka_api,omitempty" json:"advertisedKafkaApi,omitempty"`
KafkaApiTLS ServerTLS `yaml:"kafka_api_tls,omitempty" mapstructure:"kafka_api_tls,omitempty" json:"kafkaApiTls"`
AdminApi SocketAddress `yaml:"admin" mapstructure:"admin" json:"admin"`
Id int `yaml:"node_id" mapstructure:"node_id" json:"id"`
SeedServers []SeedServer `yaml:"seed_servers" mapstructure:"seed_servers" json:"seedServers"`
DeveloperMode bool `yaml:"developer_mode" mapstructure:"developer_mode" json:"developerMode"`
}

type SeedServer struct {
Expand Down
Loading

0 comments on commit 17a6803

Please sign in to comment.