Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1 operator: support configuring additional listeners with different ports #21

Merged
merged 3 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 150 additions & 2 deletions src/go/k8s/cmd/configurator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package main

import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"

"github.com/redpanda-data/redpanda-operator/src/go/k8s/pkg/networking"
"github.com/redpanda-data/redpanda-operator/src/go/k8s/pkg/resources"
"github.com/redpanda-data/redpanda-operator/src/go/k8s/pkg/utils"
)

Expand All @@ -50,6 +52,7 @@ const (
validateMountedVolumeEnvVar = "VALIDATE_MOUNTED_VOLUME"
redpandaRPCPortEnvVar = "REDPANDA_RPC_PORT"
svcFQDNEnvVar = "SERVICE_FQDN"
additionalListenersEnvVar = "ADDITIONAL_LISTENERS"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting structured data in an unstructured environment variable feels icky. Can a configmap be used instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a comma-separated list of values?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh, I see, it is a JSON document. Is this the only way to pass it? if so, I would recommend encoding it in base64 or doing what @ncole is suggesting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about passing the config through configmap. I gave it up because of broader changes.

  • Reading configmap has more code than reading env in Configurator. If we don't like the name of configmap to be hardcoded, we will need to either change cluster CRD or pass the configmap name in new env var.
  • When there is change to additional listeners, if using configmap, we will need to change v1 operator to watch the configmap and restart the broker pods. If using env, no need to watch, restarting the broker pods is triggered by the changes to statefulset spec.
  • Regarding structured vs unstructured, I feel that the data in configmap is not formally structured, it is still string and needs to be parsed as well.

Currently there is no special char in the additional listener config, base64 encoding is not necessary. Without base64 encoding, it is a little convenient to examine the configurations since no base64 decoding.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @paulzhang97 the cloud v1 shouldn't be developed. More effort should be developed into Redpanda helm chart.

)

type brokerID int
Expand All @@ -71,6 +74,7 @@ type configuratorConfig struct {
redpandaRPCPort int
subdomain string
svcFQDN string
additionalListeners string
}

func (c *configuratorConfig) String() string {
Expand All @@ -87,7 +91,8 @@ func (c *configuratorConfig) String() string {
"hostPort: %d\n"+
"proxyHostPort: %d\n"+
"rackAwareness: %t\n"+
"validateMountedVolume: %t\n",
"validateMountedVolume: %t\n"+
"additionalListeners: %s\n",
c.hostName,
c.svcFQDN,
c.configSourceDir,
Expand All @@ -100,7 +105,8 @@ func (c *configuratorConfig) String() string {
c.hostPort,
c.proxyHostPort,
c.rackAwareness,
c.validateMountedVolume)
c.validateMountedVolume,
c.additionalListeners)
}

var errorMissingEnvironmentVariable = errors.New("missing environment variable")
Expand Down Expand Up @@ -180,6 +186,10 @@ func main() {
populateRack(cfg, zone, zoneID)
}

if err = setAdditionalListeners(c.additionalListeners, c.hostIP, int(hostIndex), cfg); err != nil {
log.Fatalf("%s", fmt.Errorf("unable to set additional listeners: %w", err))
}

cfgBytes, err := yaml.Marshal(cfg)
if err != nil {
log.Fatalf("%s", fmt.Errorf("unable to marshal the configuration: %w", err))
Expand Down Expand Up @@ -529,6 +539,11 @@ func checkEnvVars() (configuratorConfig, error) {
}
}

c.additionalListeners, exist = os.LookupEnv(additionalListenersEnvVar)
if exist {
log.Printf("additional listeners configured: %v", c.additionalListeners)
}

return c, result
}

Expand All @@ -541,3 +556,136 @@ func hostIndex(hostName string) (brokerID, error) {
i, err := strconv.Atoi(s[last])
return brokerID(i), err
}

// setAdditionalListeners sets the additional listeners in the input Redpanda config.
// sample additional listeners config string:
// {"pandaproxy.advertised_pandaproxy_api":"[{'name': 'private-link-proxy', 'address': '{{ .Index }}-f415bda0-{{ .HostIP | sha256sum | substr 0 }}.redpanda.com', 'port': {{39282 | add .Index}}}]","pandaproxy.pandaproxy_api":"[{'name': 'private-link-proxy', 'address': '0.0.0.0','port': 'port': {{39282 | add .Index}}}]","redpanda.advertised_kafka_api":"[{'name': 'private-link-kafka', 'address': '{{ .Index }}-f415bda0-{{ .HostIP | sha256sum | substr 0 }}.redpanda.com', 'port': {{30092 | add .Index}}}]","redpanda.kafka_api":"[{'name': 'private-link-kakfa', 'address': '0.0.0.0', 'port': {{30092 | add .Index}}}]"}
func setAdditionalListeners(additionalListenersCfg, hostIP string, hostIndex int, cfg *config.Config) error {
if additionalListenersCfg == "" || additionalListenersCfg == "{}" {
return nil
}

additionalListeners := map[string]string{}
err := json.Unmarshal([]byte(additionalListenersCfg), &additionalListeners)
if err != nil {
return err
}

additionalListenerCfgNames := []string{"redpanda.kafka_api", "redpanda.advertised_kafka_api", "pandaproxy.pandaproxy_api", "pandaproxy.advertised_pandaproxy_api"}
nodeConfig := &config.Config{}
for _, k := range additionalListenerCfgNames {
if v, found := additionalListeners[k]; found {
res, err := utils.Compute(v, utils.NewEndpointTemplateData(hostIndex, hostIP), false)
if err != nil {
return err
}
err = nodeConfig.Set(k, res, "")
if err != nil {
return err
}
}
}

// Merge additional listeners to the input config
if len(nodeConfig.Redpanda.KafkaAPI) > 0 {
setAuthnAdditionalListeners(resources.ExternalListenerName, &cfg.Redpanda.KafkaAPI, nodeConfig.Redpanda.KafkaAPI)
}

if len(nodeConfig.Redpanda.AdvertisedKafkaAPI) > 0 {
setAdditionalAdvertisedListeners(resources.ExternalListenerName, &cfg.Redpanda.AdvertisedKafkaAPI, &cfg.Redpanda.KafkaAPITLS,
nodeConfig.Redpanda.AdvertisedKafkaAPI)
}

if nodeConfig.Pandaproxy == nil {
return nil
}

if len(nodeConfig.Pandaproxy.PandaproxyAPI) > 0 {
if cfg.Pandaproxy == nil {
cfg.Pandaproxy = &config.Pandaproxy{}
}
setAuthnAdditionalListeners(resources.PandaproxyPortExternalName, &cfg.Pandaproxy.PandaproxyAPI, nodeConfig.Pandaproxy.PandaproxyAPI)
}
if len(nodeConfig.Pandaproxy.AdvertisedPandaproxyAPI) > 0 {
if cfg.Pandaproxy == nil {
cfg.Pandaproxy = &config.Pandaproxy{}
}

setAdditionalAdvertisedListeners(resources.PandaproxyPortExternalName, &cfg.Pandaproxy.AdvertisedPandaproxyAPI, &cfg.Pandaproxy.PandaproxyAPITLS,
nodeConfig.Pandaproxy.AdvertisedPandaproxyAPI)
}

return nil
}

// setAuthnAdditionalListeners populates the authentication config in the addtiional listeners with the config from the external listener,
// and append the additional listeners to the input listeners.
func setAuthnAdditionalListeners(externalListenerName string, listeners *[]config.NamedAuthNSocketAddress, additionalListeners []config.NamedAuthNSocketAddress) {
var externalListenerCfg *config.NamedAuthNSocketAddress
for i := 0; i < len(*listeners); i++ {
cfg := &(*listeners)[i]
if cfg.Name == externalListenerName {
externalListenerCfg = cfg
break
}
}
if externalListenerCfg == nil {
*listeners = append(*listeners, additionalListeners...)
return
}
// Use the authn methold of the default external listener if authn method is not set in additional listener.
for i := 0; i < len(additionalListeners); i++ {
cfg := &additionalListeners[i]
if cfg.AuthN == nil || *cfg.AuthN == "" {
cfg.AuthN = externalListenerCfg.AuthN
}
}
*listeners = append(*listeners, additionalListeners...)
}

// setAdditionalAdvertisedListeners populates the TLS config and address in the addtiional listeners with the config from the external listener,
// and append the additional listeners to the input advertised listeners and TLS configs.
func setAdditionalAdvertisedListeners(externalListenerName string, advListeners *[]config.NamedSocketAddress, tlsCfgs *[]config.ServerTLS, additionalAdvListeners []config.NamedSocketAddress) {
var externalAPICfg *config.NamedSocketAddress
for i := 0; i < len(*advListeners); i++ {
cfg := &(*advListeners)[i]
if cfg.Name == externalListenerName {
externalAPICfg = cfg
break
}
}
if externalAPICfg != nil {
// Use the address of the default external listener if address is not set in additional listener.
for i := 0; i < len(additionalAdvListeners); i++ {
cfg := &additionalAdvListeners[i]
if cfg.Address == "" {
cfg.Address = externalAPICfg.Address
}
}
}

*advListeners = append(*advListeners, additionalAdvListeners...)

// Assume that the advertised panda proxies use the same TLS configuration as the default external one.
var serverTLSCfg *config.ServerTLS
for i := 0; i < len(*tlsCfgs); i++ {
tlsCfg := &(*tlsCfgs)[i]
if tlsCfg.Name == resources.PandaproxyPortExternalName {
serverTLSCfg = tlsCfg
break
}
}
if serverTLSCfg != nil {
for i := 0; i < len(additionalAdvListeners); i++ {
*tlsCfgs = append(*tlsCfgs, config.ServerTLS{
Name: additionalAdvListeners[i].Name,
Enabled: serverTLSCfg.Enabled,
CertFile: serverTLSCfg.CertFile,
KeyFile: serverTLSCfg.KeyFile,
TruststoreFile: serverTLSCfg.TruststoreFile,
RequireClientAuth: serverTLSCfg.RequireClientAuth,
Other: serverTLSCfg.Other,
})
}
}
}
Loading
Loading