Skip to content

Commit

Permalink
feat: expose Redpanda's listener in the docker network (#1994)
Browse files Browse the repository at this point in the history
* feat: implement WithListener for Redpanda module

New function WithListener lets you add a new listener to the redpanda
container that can be used within any of the docker networks of the
container

Signed-off-by: Santiago Jimenez Giraldo <sago2k8@gmail.com>

* test: test listeners for Redpanda Container

Add test for new WithListener function, validate connectivity and couple
of asserts in the construction of the listener

Signed-off-by: Santiago Jimenez Giraldo <sago2k8@gmail.com>

* docs: add documentation for WithListener option Redpanda

Add documentation for the additional listener option (WithListener) for the
Redpanda module

Signed-off-by: Santiago Jimenez Giraldo <sago2k8@gmail.com>

* fix: run make lint

---------

Signed-off-by: Santiago Jimenez Giraldo <sago2k8@gmail.com>
Co-authored-by: Manuel de la Peña <mdelapenya@gmail.com>
  • Loading branch information
sago2k8 and mdelapenya committed Jan 12, 2024
1 parent 12cce18 commit 2309c4e
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 6 deletions.
20 changes: 20 additions & 0 deletions docs/modules/redpanda.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,26 @@ for Redpanda. E.g. `testcontainers.WithImage("docker.redpanda.com/redpandadata/r

If you need to enable TLS use `WithTLS` with a valid PEM encoded certificate and key.

#### Additional Listener

There are scenarios where additional listeners are needed, for example if you
want to consume/from another container in the same network

You can use the `WithListener` option to add a listener to the Redpanda container.
<!--codeinclude-->
[Register additional listener](../../modules/redpanda/redpanda_test.go) inside_block:withListenerRP
<!--/codeinclude-->

Container defined in the same network
<!--codeinclude-->
[Start Kcat container](../../modules/redpanda/redpanda_test.go) inside_block:withListenerKcat
<!--/codeinclude-->

Produce messages using the new registered listener
<!--codeinclude-->
[Produce/consume via registered listener](../../modules/redpanda/redpanda_test.go) inside_block:withListenerExec
<!--/codeinclude-->

### Container Methods

The Redpanda container exposes the following methods:
Expand Down
13 changes: 13 additions & 0 deletions modules/redpanda/mounts/redpanda.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,26 @@ redpanda:
port: 9093
authentication_method: {{ if .KafkaAPI.EnableAuthorization }}sasl{{ else }}none{{ end }}

{{ range .KafkaAPI.Listeners }}
- address: 0.0.0.0
name: {{ .Address }}
port: {{ .Port }}
authentication_method: {{ .AuthenticationMethod }}
{{ end }}

advertised_kafka_api:
- address: {{ .KafkaAPI.AdvertisedHost }}
name: external
port: {{ .KafkaAPI.AdvertisedPort }}
- address: 127.0.0.1
name: internal
port: 9093
{{ range .KafkaAPI.Listeners }}
- address: {{ .Address }}
name: {{ .Address }}
port: {{ .Port }}
{{ end }}


{{ if .EnableTLS }}
admin_api_tls:
Expand Down
39 changes: 38 additions & 1 deletion modules/redpanda/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package redpanda

import "github.com/testcontainers/testcontainers-go"
import (
"net"
"strconv"

"github.com/testcontainers/testcontainers-go"
)

type options struct {
// Superusers is a list of service account names.
Expand Down Expand Up @@ -29,7 +34,12 @@ type options struct {

// EnableTLS is a flag to enable TLS.
EnableTLS bool

cert, key []byte

// Listeners is a list of custom listeners that can be provided to access the
// containers form within docker networks
Listeners []listener
}

func defaultOptions() options {
Expand All @@ -41,6 +51,7 @@ func defaultOptions() options {
ServiceAccounts: make(map[string]string, 0),
AutoCreateTopics: false,
EnableTLS: false,
Listeners: make([]listener, 0),
}
}

Expand Down Expand Up @@ -86,6 +97,8 @@ func WithEnableKafkaAuthorization() Option {
}
}

// WithEnableSchemaRegistryHTTPBasicAuth enables HTTP basic authentication for
// Schema Registry.
func WithEnableSchemaRegistryHTTPBasicAuth() Option {
return func(o *options) {
o.SchemaRegistryAuthenticationMethod = "http_basic"
Expand All @@ -106,3 +119,27 @@ func WithTLS(cert, key []byte) Option {
o.key = key
}
}

// WithListener adds a custom listener to the Redpanda containers. Listener
// will be aliases to all networks, so they can be accessed from within docker
// networks. At leas one network must be attached to the container, if not an
// error will be thrown when starting the container.
func WithListener(lis string) Option {
host, port, err := net.SplitHostPort(lis)
if err != nil {
return func(o *options) {}
}

portInt, err := strconv.Atoi(port)
if err != nil {
return func(o *options) {}
}

return func(o *options) {
o.Listeners = append(o.Listeners, listener{
Address: host,
Port: portInt,
AuthenticationMethod: o.KafkaAuthenticationMethod,
})
}
}
49 changes: 44 additions & 5 deletions modules/redpanda/redpanda.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
_ "embed"
"fmt"
"math"
"os"
"path/filepath"
"text/template"
Expand All @@ -31,6 +32,7 @@ const (
defaultKafkaAPIPort = "9092/tcp"
defaultAdminAPIPort = "9644/tcp"
defaultSchemaRegistryPort = "8081/tcp"
defaultDockerKafkaApiPort = "29092"

redpandaDir = "/etc/redpanda"
entrypointFile = "/entrypoint-tc.sh"
Expand Down Expand Up @@ -98,6 +100,12 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
return nil, fmt.Errorf("failed to create entrypoint file: %w", err)
}

// 4. Register extra kafka listeners if provided, network aliases will be
// set
if err := registerListeners(ctx, settings, req); err != nil {
return nil, fmt.Errorf("failed to register listeners: %w", err)
}

// Bootstrap config file contains cluster configurations which will only be considered
// the very first time you start a cluster.
bootstrapConfigPath := filepath.Join(tmpDir, bootstrapConfigFile)
Expand All @@ -122,7 +130,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
},
)

// 4. Create certificate and key for TLS connections.
// 5. Create certificate and key for TLS connections.
if settings.EnableTLS {
certPath := filepath.Join(tmpDir, certFile)
if err := os.WriteFile(certPath, settings.cert, 0o600); err != nil {
Expand Down Expand Up @@ -152,7 +160,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
return nil, err
}

// 5. Get mapped port for the Kafka API, so that we can render and then mount
// 6. Get mapped port for the Kafka API, so that we can render and then mount
// the Redpanda config with the advertised Kafka address.
hostIP, err := container.Host(ctx)
if err != nil {
Expand All @@ -164,7 +172,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
return nil, fmt.Errorf("failed to get mapped Kafka port: %w", err)
}

// 6. Render redpanda.yaml config and mount it.
// 7. Render redpanda.yaml config and mount it.
nodeConfig, err := renderNodeConfig(settings, hostIP, kafkaPort.Int())
if err != nil {
return nil, fmt.Errorf("failed to render node config: %w", err)
Expand All @@ -175,7 +183,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
return nil, fmt.Errorf("failed to copy redpanda.yaml into container: %w", err)
}

// 6. Wait until Redpanda is ready to serve requests
// 8. Wait until Redpanda is ready to serve requests
err = wait.ForAll(
wait.ForListeningPort(defaultKafkaAPIPort),
wait.ForLog("Successfully started Redpanda!").WithPollInterval(100*time.Millisecond)).
Expand All @@ -185,7 +193,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
return nil, fmt.Errorf("failed to wait for Redpanda readiness: %w", err)
}

// 7. Create Redpanda Service Accounts if configured to do so.
// 9. Create Redpanda Service Accounts if configured to do so.
if len(settings.ServiceAccounts) > 0 {
adminAPIPort, err := container.MappedPort(ctx, nat.Port(defaultAdminAPIPort))
if err != nil {
Expand Down Expand Up @@ -252,6 +260,29 @@ func renderBootstrapConfig(settings options) ([]byte, error) {
return bootstrapConfig.Bytes(), nil
}

// registerListeners validates that the provided listeners are valid and set network aliases for the provided addresses.
// The container must be attached to at least one network.
func registerListeners(ctx context.Context, settings options, req testcontainers.GenericContainerRequest) error {
if len(settings.Listeners) == 0 {
return nil
}

if len(req.Networks) == 0 {
return fmt.Errorf("container must be attached to at least one network")
}

for _, listener := range settings.Listeners {
if listener.Port < 0 || listener.Port > math.MaxUint16 {
return fmt.Errorf("invalid port on listener %s:%d (must be between 0 and 65535)", listener.Address, listener.Port)
}

for _, network := range req.Networks {
req.NetworkAliases[network] = append(req.NetworkAliases[network], listener.Address)
}
}
return nil
}

// renderNodeConfig renders the redpanda.yaml node config and returns it as
// byte array.
func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int) ([]byte, error) {
Expand All @@ -262,6 +293,7 @@ func renderNodeConfig(settings options, hostIP string, advertisedKafkaPort int)
AdvertisedPort: advertisedKafkaPort,
AuthenticationMethod: settings.KafkaAuthenticationMethod,
EnableAuthorization: settings.KafkaEnableAuthorization,
Listeners: settings.Listeners,
},
SchemaRegistry: redpandaConfigTplParamsSchemaRegistry{
AuthenticationMethod: settings.SchemaRegistryAuthenticationMethod,
Expand Down Expand Up @@ -300,8 +332,15 @@ type redpandaConfigTplParamsKafkaAPI struct {
AdvertisedPort int
AuthenticationMethod string
EnableAuthorization bool
Listeners []listener
}

type redpandaConfigTplParamsSchemaRegistry struct {
AuthenticationMethod string
}

type listener struct {
Address string
Port int
AuthenticationMethod string
}
117 changes: 117 additions & 0 deletions modules/redpanda/redpanda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net/http"
"strings"
"testing"
Expand All @@ -16,6 +17,9 @@ import (
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/scram"

"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/network"
)

func TestRedpanda(t *testing.T) {
Expand Down Expand Up @@ -278,6 +282,119 @@ func TestRedpandaWithTLS(t *testing.T) {
require.Error(t, results.FirstErr(), kerr.UnknownTopicOrPartition)
}

func TestRedpandaListener_Simple(t *testing.T) {
ctx := context.Background()

// 1. Create network
rpNetwork, err := network.New(ctx, network.WithCheckDuplicate())
require.NoError(t, err)

// 2. Start Redpanda container
// withListenerRP {
container, err := RunContainer(ctx,
testcontainers.WithImage("redpandadata/redpanda:v23.2.18"),
network.WithNetwork([]string{"redpanda-host"}, rpNetwork),
WithListener("redpanda:29092"), WithAutoCreateTopics(),
)
// }
require.NoError(t, err)

// 3. Start KCat container
// withListenerKcat {
kcat, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "confluentinc/cp-kcat:7.4.1",
Networks: []string{
rpNetwork.Name,
},
Entrypoint: []string{
"sh",
},
Cmd: []string{
"-c",
"tail -f /dev/null",
},
},
Started: true,
})
// }

require.NoError(t, err)

// 4. Copy message to kcat
err = kcat.CopyToContainer(ctx, []byte("Message produced by kcat"), "/tmp/msgs.txt", 700)
require.NoError(t, err)

// 5. Produce message to Redpanda
// withListenerExec {
_, _, err = kcat.Exec(ctx, []string{"kcat", "-b", "redpanda:29092", "-t", "msgs", "-P", "-l", "/tmp/msgs.txt"})
// }

require.NoError(t, err)

// 6. Consume message from Redpanda
_, stdout, err := kcat.Exec(ctx, []string{"kcat", "-b", "redpanda:29092", "-C", "-t", "msgs", "-c", "1"})
require.NoError(t, err)

// 7. Read Message from stdout
out, err := io.ReadAll(stdout)
require.NoError(t, err)

require.Contains(t, string(out), "Message produced by kcat")

t.Cleanup(func() {
if err := kcat.Terminate(ctx); err != nil {
t.Fatalf("failed to terminate kcat container: %s", err)
}
if err := container.Terminate(ctx); err != nil {
t.Fatalf("failed to terminate redpanda container: %s", err)
}

if err := rpNetwork.Remove(ctx); err != nil {
t.Fatalf("failed to remove network: %s", err)
}
})
}

func TestRedpandaListener_InvalidPort(t *testing.T) {
ctx := context.Background()

// 1. Create network
RPNetwork, err := network.New(ctx, network.WithCheckDuplicate())
require.NoError(t, err)

// 2. Attempt Start Redpanda container
_, err = RunContainer(ctx,
testcontainers.WithImage("redpandadata/redpanda:v23.2.18"),
WithListener("redpanda:99092"),
network.WithNetwork([]string{"redpanda-host"}, RPNetwork),
)

require.Error(t, err)

require.Contains(t, err.Error(), "invalid port on listener redpanda:99092")

t.Cleanup(func() {
if err := RPNetwork.Remove(ctx); err != nil {
t.Fatalf("failed to remove network: %s", err)
}
})
}

func TestRedpandaListener_NoNetwork(t *testing.T) {
ctx := context.Background()

// 1. Attempt Start Redpanda container
_, err := RunContainer(ctx,
testcontainers.WithImage("redpandadata/redpanda:v23.2.18"),
WithListener("redpanda:99092"),
)

require.Error(t, err)

require.Contains(t, err.Error(), "container must be attached to at least one network")
}

// localhostCert is a PEM-encoded TLS cert with SAN IPs
// generated from src/crypto/tls:
// go run generate_cert.go --rsa-bits 2048 --host 127.0.0.1,::1,localhost --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h
Expand Down

0 comments on commit 2309c4e

Please sign in to comment.