Skip to content

Commit

Permalink
Support configuring addtional listeners with different port: get Auth…
Browse files Browse the repository at this point in the history
…N, Address, TLS config from the default external listener if not set
  • Loading branch information
paulzhang97 committed Nov 26, 2023
1 parent cce800d commit ef51332
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 49 deletions.
126 changes: 78 additions & 48 deletions src/go/k8s/cmd/configurator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ func hostIndex(hostName string) (brokerID, error) {
// sample additional listeners config string:
// {"pandaproxy.advertised_pandaproxy_api":"[{'name': 'private-link-proxy', 'address': '{{ .Index }}-f415bda0-{{ .HostIP | sha256sum | substr 0 }}.redpanda.com', 'port': {{39282 | add .Index}}}]","pandaproxy.pandaproxy_api":"[{'name': 'private-link-proxy', 'address': '0.0.0.0','port': 'port': {{39282 | add .Index}}}]","redpanda.advertised_kafka_api":"[{'name': 'private-link-kafka', 'address': '{{ .Index }}-f415bda0-{{ .HostIP | sha256sum | substr 0 }}.redpanda.com', 'port': {{30092 | add .Index}}}]","redpanda.kafka_api":"[{'name': 'private-link-kakfa', 'address': '0.0.0.0', 'port': {{30092 | add .Index}}}]"}
func setAdditionalListeners(additionalListenersCfg, hostIP string, hostIndex int, cfg *config.Config) error {
if additionalListenersCfg == "" {
if additionalListenersCfg == "" || additionalListenersCfg == "{}" {
return nil
}

Expand All @@ -588,33 +588,12 @@ func setAdditionalListeners(additionalListenersCfg, hostIP string, hostIndex int

// Merge additional listeners to the input config
if len(nodeConfig.Redpanda.KafkaAPI) > 0 {
cfg.Redpanda.KafkaAPI = append(cfg.Redpanda.KafkaAPI, nodeConfig.Redpanda.KafkaAPI...)
setAuthnAdditionalListeners(resources.ExternalListenerName, &cfg.Redpanda.KafkaAPI, nodeConfig.Redpanda.KafkaAPI)
}

if len(nodeConfig.Redpanda.AdvertisedKafkaAPI) > 0 {
cfg.Redpanda.AdvertisedKafkaAPI = append(cfg.Redpanda.AdvertisedKafkaAPI, nodeConfig.Redpanda.AdvertisedKafkaAPI...)
// Assume that the advertised kafka api use the same TLS configuration as the default external one.
var serverTLSCfg *config.ServerTLS
for i := 0; i < len(cfg.Redpanda.KafkaAPITLS); i++ {
tlsCfg := &cfg.Redpanda.KafkaAPITLS[i]
if tlsCfg.Name == resources.ExternalListenerName {
serverTLSCfg = tlsCfg
break
}
}
if serverTLSCfg != nil {
for i := 0; i < len(nodeConfig.Redpanda.AdvertisedKafkaAPI); i++ {
cfg.Redpanda.KafkaAPITLS = append(cfg.Redpanda.KafkaAPITLS, config.ServerTLS{
Name: nodeConfig.Redpanda.AdvertisedKafkaAPI[i].Name,
Enabled: serverTLSCfg.Enabled,
CertFile: serverTLSCfg.CertFile,
KeyFile: serverTLSCfg.KeyFile,
TruststoreFile: serverTLSCfg.TruststoreFile,
RequireClientAuth: serverTLSCfg.RequireClientAuth,
Other: serverTLSCfg.Other,
})
}
}
setAdditionalAdvertisedListeners(resources.ExternalListenerName, &cfg.Redpanda.AdvertisedKafkaAPI, &cfg.Redpanda.KafkaAPITLS,
nodeConfig.Redpanda.AdvertisedKafkaAPI)
}

if nodeConfig.Pandaproxy == nil {
Expand All @@ -625,37 +604,88 @@ func setAdditionalListeners(additionalListenersCfg, hostIP string, hostIndex int
if cfg.Pandaproxy == nil {
cfg.Pandaproxy = &config.Pandaproxy{}
}
cfg.Pandaproxy.PandaproxyAPI = append(cfg.Pandaproxy.PandaproxyAPI, nodeConfig.Pandaproxy.PandaproxyAPI...)
setAuthnAdditionalListeners(resources.PandaproxyPortExternalName, &cfg.Pandaproxy.PandaproxyAPI, nodeConfig.Pandaproxy.PandaproxyAPI)
}
if len(nodeConfig.Pandaproxy.AdvertisedPandaproxyAPI) > 0 {
if cfg.Pandaproxy == nil {
cfg.Pandaproxy = &config.Pandaproxy{}
}
cfg.Pandaproxy.AdvertisedPandaproxyAPI = append(cfg.Pandaproxy.AdvertisedPandaproxyAPI, nodeConfig.Pandaproxy.AdvertisedPandaproxyAPI...)

// Assume that the advertised panda proxies use the same TLS configuration as the default external one.
var serverTLSCfg *config.ServerTLS
for i := 0; i < len(cfg.Pandaproxy.PandaproxyAPITLS); i++ {
tlsCfg := &cfg.Pandaproxy.PandaproxyAPITLS[i]
if tlsCfg.Name == resources.PandaproxyPortExternalName {
serverTLSCfg = tlsCfg
break
}

setAdditionalAdvertisedListeners(resources.PandaproxyPortExternalName, &cfg.Pandaproxy.AdvertisedPandaproxyAPI, &cfg.Pandaproxy.PandaproxyAPITLS,
nodeConfig.Pandaproxy.AdvertisedPandaproxyAPI)
}

return nil
}

// setAuthnAdditionalListeners populates the authentication config in the addtiional listeners with the config from the external listener,
// and append the additional listeners to the input listeners.
func setAuthnAdditionalListeners(externalListenerName string, listeners *[]config.NamedAuthNSocketAddress, additionalListeners []config.NamedAuthNSocketAddress) {
var externalListenerCfg *config.NamedAuthNSocketAddress
for i := 0; i < len(*listeners); i++ {
cfg := &(*listeners)[i]
if cfg.Name == externalListenerName {
externalListenerCfg = cfg
break
}
if serverTLSCfg != nil {
for i := 0; i < len(nodeConfig.Pandaproxy.AdvertisedPandaproxyAPI); i++ {
cfg.Pandaproxy.PandaproxyAPITLS = append(cfg.Pandaproxy.PandaproxyAPITLS, config.ServerTLS{
Name: nodeConfig.Pandaproxy.AdvertisedPandaproxyAPI[i].Name,
Enabled: serverTLSCfg.Enabled,
CertFile: serverTLSCfg.CertFile,
KeyFile: serverTLSCfg.KeyFile,
TruststoreFile: serverTLSCfg.TruststoreFile,
RequireClientAuth: serverTLSCfg.RequireClientAuth,
Other: serverTLSCfg.Other,
})
}
if externalListenerCfg == nil {
*listeners = append(*listeners, additionalListeners...)
return
}
// Use the authn methold of the default external listener if authn method is not set in additional listener.
for i := 0; i < len(additionalListeners); i++ {
cfg := &additionalListeners[i]
if cfg.AuthN == nil || *cfg.AuthN == "" {
cfg.AuthN = externalListenerCfg.AuthN
}
}
*listeners = append(*listeners, additionalListeners...)
}

// setAdditionalAdvertisedListeners populates the TLS config and address in the addtiional listeners with the config from the external listener,
// and append the additional listeners to the input advertised listeners and TLS configs.
func setAdditionalAdvertisedListeners(externalListenerName string, advListeners *[]config.NamedSocketAddress, tlsCfgs *[]config.ServerTLS, additionalAdvListeners []config.NamedSocketAddress) {
var externalAPICfg *config.NamedSocketAddress
for i := 0; i < len(*advListeners); i++ {
cfg := &(*advListeners)[i]
if cfg.Name == externalListenerName {
externalAPICfg = cfg
break
}
}
if externalAPICfg != nil {
// Use the address of the default external listener if address is not set in additional listener.
for i := 0; i < len(additionalAdvListeners); i++ {
cfg := &additionalAdvListeners[i]
if cfg.Address == "" {
cfg.Address = externalAPICfg.Address
}
}
}

return nil
*advListeners = append(*advListeners, additionalAdvListeners...)

// Assume that the advertised panda proxies use the same TLS configuration as the default external one.
var serverTLSCfg *config.ServerTLS
for i := 0; i < len(*tlsCfgs); i++ {
tlsCfg := &(*tlsCfgs)[i]
if tlsCfg.Name == resources.PandaproxyPortExternalName {
serverTLSCfg = tlsCfg
break
}
}
if serverTLSCfg != nil {
for i := 0; i < len(additionalAdvListeners); i++ {
*tlsCfgs = append(*tlsCfgs, config.ServerTLS{
Name: additionalAdvListeners[i].Name,
Enabled: serverTLSCfg.Enabled,
CertFile: serverTLSCfg.CertFile,
KeyFile: serverTLSCfg.KeyFile,
TruststoreFile: serverTLSCfg.TruststoreFile,
RequireClientAuth: serverTLSCfg.RequireClientAuth,
Other: serverTLSCfg.Other,
})
}
}
}
128 changes: 127 additions & 1 deletion src/go/k8s/cmd/configurator/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestPopulateRack(t *testing.T) {

func TestAdditionalListeners(t *testing.T) { //nolint
sasl := "sasl"
httpBasic := "http_basic"
tests := []struct {
name string
addtionalListenersCfg string
Expand All @@ -62,7 +63,7 @@ func TestAdditionalListeners(t *testing.T) { //nolint
expectedError: true,
},
{
name: "no additional listener",
name: "no additional listener with empty string",
addtionalListenersCfg: "",
hostIndex: 1,
hostIP: "192.168.0.1",
Expand Down Expand Up @@ -97,6 +98,42 @@ func TestAdditionalListeners(t *testing.T) { //nolint
},
},
},
{
name: "no additional listener {}",
addtionalListenersCfg: "{}",
hostIndex: 1,
hostIP: "192.168.0.1",
nodeCfg: config.Config{
Redpanda: config.RedpandaNodeConfig{
KafkaAPI: []config.NamedAuthNSocketAddress{{
Name: "internal",
Address: "0.0.0.0",
Port: 9092,
AuthN: &sasl,
}},
AdvertisedKafkaAPI: []config.NamedSocketAddress{{
Name: "internal",
Address: "cluster1.redpanda.svc.cluster.local",
Port: 9092,
}},
},
},
expectedKafkaAPI: []config.NamedAuthNSocketAddress{
{
Name: "internal",
Address: "0.0.0.0",
Port: 9092,
AuthN: &sasl,
},
},
expectedAdvertisedKafkaAPI: []config.NamedSocketAddress{
{
Name: "internal",
Address: "cluster1.redpanda.svc.cluster.local",
Port: 9092,
},
},
},
{
name: "additional kafka listener",
addtionalListenersCfg: `{"redpanda.advertised_kafka_api":"[{'name': 'private-link-kafka', 'address': '{{ .Index }}-f415bda0-{{ .HostIP | sha256sum | substr 0 7 }}.redpanda.com', 'port': {{30092 | add .Index}}}]",` +
Expand All @@ -122,6 +159,95 @@ func TestAdditionalListeners(t *testing.T) { //nolint
},
},
},
{
name: "additional listeners using the address from the external listerners",
addtionalListenersCfg: `{"redpanda.advertised_kafka_api":"[{'name': 'private-link-kafka', 'port': {{39002 | add .Index}}}]",` +
`"redpanda.kafka_api":"[{'name': 'private-link-kafka', 'address': '0.0.0.0', 'port': {{39002 | add .Index}}}]",` +
`"pandaproxy.advertised_pandaproxy_api":"[{'name': 'private-link-proxy', 'port': {{32082 | add .Index}}}]",` +
`"pandaproxy.pandaproxy_api":"[{'name': 'private-link-proxy', 'address': '0.0.0.0', 'port': {{32082 | add .Index}}}]"}`,
hostIndex: 1,
hostIP: "192.168.0.1",
nodeCfg: config.Config{
Redpanda: config.RedpandaNodeConfig{
KafkaAPI: []config.NamedAuthNSocketAddress{{
Name: "kafka-external",
Address: "0.0.0.0",
Port: 9092,
AuthN: &sasl,
}},
AdvertisedKafkaAPI: []config.NamedSocketAddress{{
Name: "kafka-external",
Address: "kafka.cluster123.redpanda.com",
Port: 9092,
}},
},
Pandaproxy: &config.Pandaproxy{
PandaproxyAPI: []config.NamedAuthNSocketAddress{{
Name: "proxy-external",
Address: "0.0.0.0",
Port: 30082,
AuthN: &httpBasic,
}},
AdvertisedPandaproxyAPI: []config.NamedSocketAddress{{
Name: "proxy-external",
Address: "proxy.cluster123.redpanda.com",
Port: 30082,
}},
},
},
expectedKafkaAPI: []config.NamedAuthNSocketAddress{
{
Name: "kafka-external",
Address: "0.0.0.0",
Port: 9092,
AuthN: &sasl,
},
{
Name: "private-link-kafka",
Address: "0.0.0.0",
Port: 39002 + 1,
AuthN: &sasl,
},
},
expectedAdvertisedKafkaAPI: []config.NamedSocketAddress{
{
Name: "kafka-external",
Address: "kafka.cluster123.redpanda.com",
Port: 9092,
},
{
Name: "private-link-kafka",
Address: "kafka.cluster123.redpanda.com",
Port: 39002 + 1,
},
},
expectedPandaProxyAPI: []config.NamedAuthNSocketAddress{
{
Name: "proxy-external",
Address: "0.0.0.0",
Port: 30082,
AuthN: &httpBasic,
},
{
Name: "private-link-proxy",
Address: "0.0.0.0",
Port: 32082 + 1,
AuthN: &httpBasic,
},
},
expectedadvertisedPandaProxyAPI: []config.NamedSocketAddress{
{
Name: "proxy-external",
Address: "proxy.cluster123.redpanda.com",
Port: 30082,
},
{
Name: "private-link-proxy",
Address: "proxy.cluster123.redpanda.com",
Port: 32082 + 1,
},
},
},
{
name: "additional kafka and proxy listeners with mTLS",
addtionalListenersCfg: `{"redpanda.advertised_kafka_api":"[{'name': 'private-link-kafka', 'address': '{{ .Index }}-f415bda0-{{ .HostIP | sha256sum | substr 0 7 }}.redpanda.com', 'port': {{39002 | add .Index}}}]",` +
Expand Down

0 comments on commit ef51332

Please sign in to comment.