Skip to content

Commit

Permalink
Support configuring addtional listeners with different ports: pass th…
Browse files Browse the repository at this point in the history
…e additional listeners to configurator through env var
  • Loading branch information
paulzhang97 committed Nov 25, 2023
1 parent c568c9a commit b44461d
Show file tree
Hide file tree
Showing 7 changed files with 614 additions and 11 deletions.
122 changes: 120 additions & 2 deletions src/go/k8s/cmd/configurator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package main

import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"

"github.com/redpanda-data/redpanda-operator/src/go/k8s/pkg/networking"
"github.com/redpanda-data/redpanda-operator/src/go/k8s/pkg/resources"
"github.com/redpanda-data/redpanda-operator/src/go/k8s/pkg/utils"
)

Expand All @@ -50,6 +52,7 @@ const (
validateMountedVolumeEnvVar = "VALIDATE_MOUNTED_VOLUME"
redpandaRPCPortEnvVar = "REDPANDA_RPC_PORT"
svcFQDNEnvVar = "SERVICE_FQDN"
additionalListenersEnvVar = "ADDITIONAL_LISTENERS"
)

type brokerID int
Expand All @@ -71,6 +74,7 @@ type configuratorConfig struct {
redpandaRPCPort int
subdomain string
svcFQDN string
additionalListeners string
}

func (c *configuratorConfig) String() string {
Expand All @@ -87,7 +91,8 @@ func (c *configuratorConfig) String() string {
"hostPort: %d\n"+
"proxyHostPort: %d\n"+
"rackAwareness: %t\n"+
"validateMountedVolume: %t\n",
"validateMountedVolume: %t\n"+
"additionalListeners: %s\n",
c.hostName,
c.svcFQDN,
c.configSourceDir,
Expand All @@ -100,7 +105,8 @@ func (c *configuratorConfig) String() string {
c.hostPort,
c.proxyHostPort,
c.rackAwareness,
c.validateMountedVolume)
c.validateMountedVolume,
c.additionalListeners)
}

var errorMissingEnvironmentVariable = errors.New("missing environment variable")
Expand Down Expand Up @@ -185,6 +191,10 @@ func main() {
log.Fatalf("%s", fmt.Errorf("unable to marshal the configuration: %w", err))
}

if err := setAdditionalListeners(c.additionalListeners, c.hostIP, int(hostIndex), cfg); err != nil {
log.Fatalf("%s", fmt.Errorf("unable to set additional listeners: %w", err))
}

if err := os.WriteFile(c.configDestination, cfgBytes, 0o600); err != nil {
log.Fatalf("%s", fmt.Errorf("unable to write the destination configuration file: %w", err))
}
Expand Down Expand Up @@ -529,6 +539,11 @@ func checkEnvVars() (configuratorConfig, error) {
}
}

c.additionalListeners, exist = os.LookupEnv(additionalListenersEnvVar)
if exist {
log.Printf("additional listeners configured: %v", c.additionalListeners)
}

return c, result
}

Expand All @@ -541,3 +556,106 @@ func hostIndex(hostName string) (brokerID, error) {
i, err := strconv.Atoi(s[last])
return brokerID(i), err
}

// setAdditionalListeners sets the additional listeners in the input Redpanda config.
// sample additional listeners config string:
// {"pandaproxy.advertised_pandaproxy_api":"[{'name': 'private-link-proxy', 'address': '{{ .Index }}-f415bda0-{{ .HostIP | sha256sum | substr 0 }}.redpanda.com', 'port': {{39282 | add .Index}}}]","pandaproxy.pandaproxy_api":"[{'name': 'private-link-proxy', 'address': '0.0.0.0','port': 'port': {{39282 | add .Index}}}]","redpanda.advertised_kafka_api":"[{'name': 'private-link-kafka', 'address': '{{ .Index }}-f415bda0-{{ .HostIP | sha256sum | substr 0 }}.redpanda.com', 'port': {{30092 | add .Index}}}]","redpanda.kafka_api":"[{'name': 'private-link-kakfa', 'address': '0.0.0.0', 'port': {{30092 | add .Index}}}]"}
func setAdditionalListeners(additionalListenersCfg, hostIP string, hostIndex int, cfg *config.Config) error {
if additionalListenersCfg == "" {
return nil
}

additionalListeners := map[string]string{}
err := json.Unmarshal([]byte(additionalListenersCfg), &additionalListeners)
if err != nil {
return err
}

additionalListenerCfgNames := []string{"redpanda.kafka_api", "redpanda.advertised_kafka_api", "pandaproxy.pandaproxy_api", "pandaproxy.advertised_pandaproxy_api"}
nodeConfig := &config.Config{}
for _, k := range additionalListenerCfgNames {
if v, found := additionalListeners[k]; found {
res, err := utils.Compute(v, utils.NewEndpointTemplateData(hostIndex, hostIP), false)
if err != nil {
return err
}
err = nodeConfig.Set(k, res, "")
if err != nil {
return err
}
}
}

// Merge additional listeners to the input config
if len(nodeConfig.Redpanda.KafkaAPI) > 0 {
cfg.Redpanda.KafkaAPI = append(cfg.Redpanda.KafkaAPI, nodeConfig.Redpanda.KafkaAPI...)
}

if len(nodeConfig.Redpanda.AdvertisedKafkaAPI) > 0 {
cfg.Redpanda.AdvertisedKafkaAPI = append(cfg.Redpanda.AdvertisedKafkaAPI, nodeConfig.Redpanda.AdvertisedKafkaAPI...)
// Assume that the advertised kafka api use the same TLS configuration as the default external one.
var serverTLSCfg *config.ServerTLS
for i := 0; i < len(cfg.Redpanda.KafkaAPITLS); i++ {
tlsCfg := &cfg.Redpanda.KafkaAPITLS[i]
if tlsCfg.Name == resources.ExternalListenerName {
serverTLSCfg = tlsCfg
break
}
}
if serverTLSCfg != nil {
for i := 0; i < len(nodeConfig.Redpanda.AdvertisedKafkaAPI); i++ {
cfg.Redpanda.KafkaAPITLS = append(cfg.Redpanda.KafkaAPITLS, config.ServerTLS{
Name: nodeConfig.Redpanda.AdvertisedKafkaAPI[i].Name,
Enabled: serverTLSCfg.Enabled,
CertFile: serverTLSCfg.CertFile,
KeyFile: serverTLSCfg.KeyFile,
TruststoreFile: serverTLSCfg.TruststoreFile,
RequireClientAuth: serverTLSCfg.RequireClientAuth,
Other: serverTLSCfg.Other,
})
}
}
}

if nodeConfig.Pandaproxy == nil {
return nil
}

if len(nodeConfig.Pandaproxy.PandaproxyAPI) > 0 {
if cfg.Pandaproxy == nil {
cfg.Pandaproxy = &config.Pandaproxy{}
}
cfg.Pandaproxy.PandaproxyAPI = append(cfg.Pandaproxy.PandaproxyAPI, nodeConfig.Pandaproxy.PandaproxyAPI...)
}
if len(nodeConfig.Pandaproxy.AdvertisedPandaproxyAPI) > 0 {
if cfg.Pandaproxy == nil {
cfg.Pandaproxy = &config.Pandaproxy{}
}
cfg.Pandaproxy.AdvertisedPandaproxyAPI = append(cfg.Pandaproxy.AdvertisedPandaproxyAPI, nodeConfig.Pandaproxy.AdvertisedPandaproxyAPI...)

// Assume that the advertised panda proxies use the same TLS configuration as the default external one.
var serverTLSCfg *config.ServerTLS
for i := 0; i < len(cfg.Pandaproxy.PandaproxyAPITLS); i++ {
tlsCfg := &cfg.Pandaproxy.PandaproxyAPITLS[i]
if tlsCfg.Name == resources.PandaproxyPortExternalName {
serverTLSCfg = tlsCfg
break
}
}
if serverTLSCfg != nil {
for i := 0; i < len(nodeConfig.Pandaproxy.AdvertisedPandaproxyAPI); i++ {
cfg.Pandaproxy.PandaproxyAPITLS = append(cfg.Pandaproxy.PandaproxyAPITLS, config.ServerTLS{
Name: nodeConfig.Pandaproxy.AdvertisedPandaproxyAPI[i].Name,
Enabled: serverTLSCfg.Enabled,
CertFile: serverTLSCfg.CertFile,
KeyFile: serverTLSCfg.KeyFile,
TruststoreFile: serverTLSCfg.TruststoreFile,
RequireClientAuth: serverTLSCfg.RequireClientAuth,
Other: serverTLSCfg.Other,
})
}
}
}

return nil
}
Loading

0 comments on commit b44461d

Please sign in to comment.