Skip to content

Commit

Permalink
Fix mTLS configuration between Redpanda and Console
Browse files Browse the repository at this point in the history
Console TLS configuration needs to have client certificates for mutual
TLS communication. With this commit Console go package is imported to
have strongly typed Console configuration. As Console configuration has
yaml annotation, not json transpiler is trying to parse yaml annotation
as a fallback.
  • Loading branch information
RafalKorepta committed Jul 4, 2024
1 parent 37927c0 commit 16a8c2d
Show file tree
Hide file tree
Showing 93 changed files with 4,077 additions and 1,619 deletions.
2 changes: 1 addition & 1 deletion charts/redpanda/certs.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func ClientCerts(dot *helmette.Dot) []certmanagerv1.Certificate {
panic(fmt.Sprintf("Certificate %q referenced but not defined", name))
}

if helmette.Empty(data.SecretRef) || !ClientAuthRequired(dot) {
if !ClientAuthRequired(dot) {
return certs
}

Expand Down
212 changes: 211 additions & 1 deletion charts/redpanda/chart_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package redpanda_test

import (
"encoding/json"
"fmt"
"maps"
"os"
"testing"
"time"

"github.com/redpanda-data/helm-charts/charts/redpanda"
"github.com/redpanda-data/helm-charts/pkg/gotohelm/helmette"
Expand All @@ -13,9 +16,14 @@ import (
"github.com/redpanda-data/helm-charts/pkg/testutil"
"github.com/redpanda-data/helm-charts/pkg/valuesutil"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zapio"
"go.uber.org/zap/zaptest"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/yaml"
)

func TieredStorageStatic(t *testing.T) redpanda.PartialValues {
Expand Down Expand Up @@ -100,11 +108,13 @@ func TestChart(t *testing.T) {

redpandaChart := "."

env := helmtest.Setup(t).Namespaced(t)
h := helmtest.Setup(t)

t.Run("tiered-storage-secrets", func(t *testing.T) {
ctx := testutil.Context(t)

env := h.Namespaced(t)

credsSecret, err := kube.Create(ctx, env.Ctl(), TieredStorageSecret(env.Namespace()))
require.NoError(t, err)

Expand Down Expand Up @@ -138,6 +148,206 @@ func TestChart(t *testing.T) {
require.Equal(t, true, config["cloud_storage_enabled"])
require.Equal(t, "from-secret-access-key", config["cloud_storage_access_key"])
})

t.Run("mtls-using-cert-manager", func(t *testing.T) {
ctx := testutil.Context(t)

env := h.Namespaced(t)

partial := redpanda.PartialValues{
ClusterDomain: ptr.To("cluster.local"),
Listeners: &redpanda.PartialListeners{
Admin: &redpanda.PartialAdminListeners{
TLS: &redpanda.PartialInternalTLS{
RequireClientAuth: ptr.To(true),
},
},
HTTP: &redpanda.PartialHTTPListeners{
TLS: &redpanda.PartialInternalTLS{
RequireClientAuth: ptr.To(true),
},
},
Kafka: &redpanda.PartialKafkaListeners{
TLS: &redpanda.PartialInternalTLS{
RequireClientAuth: ptr.To(true),
},
},
SchemaRegistry: &redpanda.PartialSchemaRegistryListeners{
TLS: &redpanda.PartialInternalTLS{
RequireClientAuth: ptr.To(true),
},
},
RPC: &struct {
Port *int32 `json:"port,omitempty" jsonschema:"required"`
TLS *redpanda.PartialInternalTLS `json:"tls,omitempty" jsonschema:"required"`
}{
TLS: &redpanda.PartialInternalTLS{
RequireClientAuth: ptr.To(true),
},
},
},
}

rpRelease := env.Install(redpandaChart, helm.InstallOptions{
Values: partial,
})

var val map[string]any
valByte, err := os.ReadFile("values.yaml")
require.NoError(t, err)

require.NoError(t, yaml.Unmarshal(valByte, &val))

partialB, err := yaml.Marshal(partial)
require.NoError(t, err)

var partialVal map[string]any
require.NoError(t, yaml.Unmarshal(partialB, &partialVal))

dot := helmette.Dot{Values: helmette.Merge(partialVal, val)}

dot.Release.Name = rpRelease.Name
dot.Release.Namespace = rpRelease.Namespace

log := zaptest.NewLogger(t)
w := &zapio.Writer{Log: log, Level: zapcore.InfoLevel}
wErr := &zapio.Writer{Log: log, Level: zapcore.ErrorLevel}

rpk := Client{Ctl: env.Ctl(), Release: &rpRelease}

cleanup, err := rpk.ExposeRedpandaCluster(ctx, w, wErr)
if cleanup != nil {
t.Cleanup(cleanup)
}
require.NoError(t, err)

_, err = rpk.ClusterConfig(ctx)
require.NoError(t, err)

t.Run("kafka-listener", func(t *testing.T) {
input := "test-input"
require.NoError(t, rpk.CreateTopic(ctx, "testTopic"))

_, err = rpk.KafkaProduce(ctx, input, "testTopic")
require.NoError(t, err)

consumeOutput, err := rpk.KafkaConsume(ctx, "testTopic")
require.NoError(t, err)
require.Equal(t, input, consumeOutput["value"])
})

t.Run("admin-listener", func(t *testing.T) {
out, err := rpk.GetClusterHealth(ctx)
require.NoError(t, err)
require.Equal(t, true, out["is_healthy"])
})

t.Run("schema-registry-listener", func(t *testing.T) {
// Test schema registry
// Based on https://docs.redpanda.com/current/manage/schema-reg/schema-reg-api/
formats, err := rpk.QuerySupportedFormats(ctx)
require.NoError(t, err)
require.Len(t, formats, 2)

schema := map[string]any{
"type": "record",
"name": "sensor_sample",
"fields": []map[string]any{
{
"name": "timestamp",
"type": "long",
"logicalType": "timestamp-millis",
},
{
"name": "identifier",
"type": "string",
"logicalType": "uuid",
},
{
"name": "value",
"type": "long",
},
},
}

registeredID, err := rpk.RegisterSchema(ctx, schema)
require.NoError(t, err)

var id float64
if idForSchema, ok := registeredID["id"]; ok {
id = idForSchema.(float64)
}

schemaBytes, err := json.Marshal(schema)
require.NoError(t, err)

retrievedSchema, err := rpk.RetrieveSchema(ctx, int(id))
require.NoError(t, err)
require.JSONEq(t, string(schemaBytes), retrievedSchema)

resp, err := rpk.ListRegistrySubjects(ctx)
require.NoError(t, err)
require.Equal(t, "sensor-value", resp[0])

_, err = rpk.SoftDeleteSchema(ctx, resp[0], int(id))
require.NoError(t, err)

_, err = rpk.HardDeleteSchema(ctx, resp[0], int(id))
require.NoError(t, err)
})

t.Run("http-proxy-listener", func(t *testing.T) {
// Test http proxy
// Based on https://docs.redpanda.com/current/develop/http-proxy/
topics, err := rpk.ListTopics(ctx, &dot)
require.NoError(t, err)
require.Len(t, topics, 2)

records := map[string]any{
"records": []map[string]any{
{
"value": "Redpanda",
"partition": 0,
},
{
"value": "HTTP proxy",
"partition": 1,
},
{
"value": "Test event",
"partition": 2,
},
},
}

httpTestTopic := "httpTestTopic"
require.NoError(t, rpk.CreateTopic(ctx, httpTestTopic))

_, err = rpk.SendEventToTopic(ctx, records, httpTestTopic)
require.NoError(t, err)

time.Sleep(time.Second * 5)

record, err := rpk.RetrieveEventFromTopic(ctx, httpTestTopic, 0)
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("[{\"topic\":\"%s\",\"key\":null,\"value\":\"Redpanda\",\"partition\":0,\"offset\":0}]", httpTestTopic), record)

record, err = rpk.RetrieveEventFromTopic(ctx, httpTestTopic, 1)
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("[{\"topic\":\"%s\",\"key\":null,\"value\":\"HTTP proxy\",\"partition\":1,\"offset\":0}]", httpTestTopic), record)

record, err = rpk.RetrieveEventFromTopic(ctx, httpTestTopic, 2)
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("[{\"topic\":\"%s\",\"key\":null,\"value\":\"Test event\",\"partition\":2,\"offset\":0}]", httpTestTopic), record)
})
})

//t.Run("mtls-using-self-created-certificates", func(t *testing.T) {
// ctx := testutil.Context(t)
//
// env := h.Namespaced(t)
//
//})
}

// getConfigMaps is parsing all manifests (resources) created by helm template
Expand Down
Loading

0 comments on commit 16a8c2d

Please sign in to comment.