Skip to content

Commit

Permalink
add support for wasm transforms
Browse files Browse the repository at this point in the history
  • Loading branch information
gene-redpanda committed Jan 26, 2024
1 parent ef13101 commit e7f1642
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 2 deletions.
4 changes: 3 additions & 1 deletion modules/k3s/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/testcontainers/testcontainers-go/modules/k3s

go 1.20
go 1.21

toolchain go1.21.5

require (
github.com/docker/docker v25.0.1+incompatible
Expand Down
1 change: 1 addition & 0 deletions modules/redpanda/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func ExampleRunContainer() {
redpandaContainer, err := redpanda.RunContainer(ctx,
redpanda.WithEnableSASL(),
redpanda.WithEnableKafkaAuthorization(),
redpanda.WithEnableWasmTransform(),
redpanda.WithNewServiceAccount("superuser-1", "test"),
redpanda.WithNewServiceAccount("superuser-2", "test"),
redpanda.WithNewServiceAccount("no-superuser", "test"),
Expand Down
1 change: 1 addition & 0 deletions modules/redpanda/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/testcontainers/testcontainers-go/modules/redpanda
go 1.20

require (
github.com/Masterminds/semver/v3 v3.2.1
github.com/docker/go-connections v0.5.0
github.com/stretchr/testify v1.8.4
github.com/testcontainers/testcontainers-go v0.27.0
Expand Down
2 changes: 2 additions & 0 deletions modules/redpanda/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU=
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0=
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0=
github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
github.com/Microsoft/hcsshim v0.11.4 h1:68vKo2VN8DE9AdN4tnkWnmdhqdbpUFM8OF3Airm7fz8=
Expand Down
4 changes: 4 additions & 0 deletions modules/redpanda/mounts/bootstrap.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ superusers:
kafka_enable_authorization: true
{{- end }}

{{- if .EnableWasmTransform }}
data_transforms_enabled: true
{{- end }}

{{- if .AutoCreateTopics }}
auto_create_topics_enabled: true
{{- end }}
11 changes: 11 additions & 0 deletions modules/redpanda/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type options struct {
// or "http_basic" for HTTP basic authentication.
SchemaRegistryAuthenticationMethod string

// EnableWasmTransform is a flag to enable wasm transform.
EnableWasmTransform bool

// ServiceAccounts is a map of username (key) to password (value) of users
// that shall be created, so that you can use these to authenticate against
// Redpanda (either for the Kafka API or Schema Registry HTTP access).
Expand Down Expand Up @@ -97,6 +100,14 @@ func WithEnableKafkaAuthorization() Option {
}
}

// WithEnableWasmTransform enables wasm transform.
// Should not be used with RP versions before 23.3
func WithEnableWasmTransform() Option {
return func(o *options) {
o.EnableWasmTransform = true
}
}

// WithEnableSchemaRegistryHTTPBasicAuth enables HTTP basic authentication for
// Schema Registry.
func WithEnableSchemaRegistryHTTPBasicAuth() Option {
Expand Down
19 changes: 18 additions & 1 deletion modules/redpanda/redpanda.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"context"
_ "embed"
"fmt"
semver "github.com/Masterminds/semver/v3"
"math"
"os"
"path/filepath"
"strings"
"text/template"
"time"

Expand Down Expand Up @@ -59,7 +61,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
// Some (e.g. Image) may be overridden by providing an option argument to this function.
req := testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "docker.redpanda.com/redpandadata/redpanda:v23.1.7",
Image: "docker.redpanda.com/redpandadata/redpanda:v23.3.3",
User: "root:root",
// Files: Will be added later after we've rendered our YAML templates.
ExposedPorts: []string{
Expand Down Expand Up @@ -89,6 +91,19 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
opt.Customize(&req)
}

ver, err := semver.NewVersion(strings.Split(req.ContainerRequest.Image, ":")[1])
if err != nil {
return nil, err
}

minVerForTransform, err := semver.NewVersion("v23.3")
if err != nil {
return nil, err
}
if ver.LessThan(minVerForTransform) {
settings.EnableWasmTransform = false
}

// 3. Create temporary entrypoint file. We need a custom entrypoint that waits
// until the actual Redpanda node config is mounted. Once the redpanda config is
// mounted we will call the original entrypoint with the same parameters.
Expand Down Expand Up @@ -245,6 +260,7 @@ func renderBootstrapConfig(settings options) ([]byte, error) {
Superusers: settings.Superusers,
KafkaAPIEnableAuthorization: settings.KafkaEnableAuthorization,
AutoCreateTopics: settings.AutoCreateTopics,
EnableWasmTransform: settings.EnableWasmTransform,
}

tpl, err := template.New("bootstrap.yaml").Parse(bootstrapConfigTpl)
Expand Down Expand Up @@ -318,6 +334,7 @@ type redpandaBootstrapConfigTplParams struct {
Superusers []string
KafkaAPIEnableAuthorization bool
AutoCreateTopics bool
EnableWasmTransform bool
}

type redpandaConfigTplParams struct {
Expand Down
113 changes: 113 additions & 0 deletions modules/redpanda/redpanda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,119 @@ func TestRedpandaWithAuthentication(t *testing.T) {
container, err := RunContainer(ctx,
WithEnableSASL(),
WithEnableKafkaAuthorization(),
WithEnableWasmTransform(),
WithNewServiceAccount("superuser-1", "test"),
WithNewServiceAccount("superuser-2", "test"),
WithNewServiceAccount("no-superuser", "test"),
WithSuperusers("superuser-1", "superuser-2"),
WithEnableSchemaRegistryHTTPBasicAuth(),
)
require.NoError(t, err)
// }

// Clean up the container after the test is complete
t.Cleanup(func() {
if err := container.Terminate(ctx); err != nil {
t.Fatalf("failed to terminate container: %s", err)
}
})

// kafkaSeedBroker {
seedBroker, err := container.KafkaSeedBroker(ctx)
// }
require.NoError(t, err)

// Test successful authentication & authorization with all created superusers
serviceAccounts := map[string]string{
"superuser-1": "test",
"superuser-2": "test",
}

for user, password := range serviceAccounts {
kafkaCl, err := kgo.NewClient(
kgo.SeedBrokers(seedBroker),
kgo.SASL(scram.Auth{
User: user,
Pass: password,
}.AsSha256Mechanism()),
)
require.NoError(t, err)

kafkaAdmCl := kadm.NewClient(kafkaCl)
_, err = kafkaAdmCl.CreateTopic(ctx, 1, 1, nil, fmt.Sprintf("test-%v", user))
require.NoError(t, err)
kafkaCl.Close()
}

// Test successful authentication, but failed authorization with a non-superuser account
{
kafkaCl, err := kgo.NewClient(
kgo.SeedBrokers(seedBroker),
kgo.SASL(scram.Auth{
User: "no-superuser",
Pass: "test",
}.AsSha256Mechanism()),
)
require.NoError(t, err)

kafkaAdmCl := kadm.NewClient(kafkaCl)
_, err = kafkaAdmCl.CreateTopic(ctx, 1, 1, nil, "test-2")
require.Error(t, err)
require.ErrorContains(t, err, "TOPIC_AUTHORIZATION_FAILED")
kafkaCl.Close()
}

// Test failed authentication
{
kafkaCl, err := kgo.NewClient(
kgo.SeedBrokers(seedBroker),
kgo.SASL(scram.Auth{
User: "wrong",
Pass: "wrong",
}.AsSha256Mechanism()),
)
require.NoError(t, err)

kafkaAdmCl := kadm.NewClient(kafkaCl)
_, err = kafkaAdmCl.Metadata(ctx)
require.Error(t, err)
require.ErrorContains(t, err, "SASL_AUTHENTICATION_FAILED")
}

// Test Schema Registry API
httpCl := &http.Client{Timeout: 5 * time.Second}
// schemaRegistryAddress {
schemaRegistryURL, err := container.SchemaRegistryAddress(ctx)
// }
require.NoError(t, err)

// Failed authentication
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/subjects", schemaRegistryURL), nil)
require.NoError(t, err)
resp, err := httpCl.Do(req)
require.NoError(t, err)
assert.Equal(t, http.StatusUnauthorized, resp.StatusCode)
resp.Body.Close()

// Successful authentication
for user, password := range serviceAccounts {
req.SetBasicAuth(user, password)
resp, err = httpCl.Do(req)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
resp.Body.Close()
}
}

func TestRedpandaWithOldVersionAndWasm(t *testing.T) {
ctx := context.Background()
// redpandaCreateContainer {
// this would fail to start if we weren't ignoring wasm transforms for older versions
container, err := RunContainer(ctx,
testcontainers.WithImage("redpandadata/redpanda:v23.2.18"),
WithEnableSASL(),
WithEnableKafkaAuthorization(),
WithEnableWasmTransform(),
WithNewServiceAccount("superuser-1", "test"),
WithNewServiceAccount("superuser-2", "test"),
WithNewServiceAccount("no-superuser", "test"),
Expand Down

0 comments on commit e7f1642

Please sign in to comment.