From eb63c40e06bbed21946b447d282eae8a88ca3b2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Mon, 6 Mar 2023 12:13:38 +0100 Subject: [PATCH] feat: convert pulsar example into a Go module (#872) * chore: move pulsar example to modules * chore: export Pulsar container type * chore: export Start container func * chore: support changing the pulsar image * chore: support defining custom image for pulsar * chore: support overriding the default waiting strategy * chore: support overriding the default env * chore: support overriding the default command * docs: document StartContainer function * chore: leverate modifiers * chore: create PulsarContainerRequest abstraction * chore: make a more advanced setup for the test * chore: support for setting function workers * chore: support for setting transactions * chore: support passing log consumers * chore: simplify initialisation of env * fix: use right module path * chore: migrate tests to its own package * chore: remove Pulsar from exported fields * chore: support retrieving broker and http admin URLs from the container * chore: support passing pulsar envs to the container request * docs: copy Java docs * docs: document modifiers * docs: fix heading * chore: improve tests to check that subscriptions are there * chore: simplify asserts * fix: update paths in GH workflow * fix: update dependabot for pulsar module * fix: typos --- .github/dependabot.yml | 12 +- .../{pulsar-example.yml => module-pulsar.yml} | 8 +- docs/examples/pulsar.md | 9 - docs/modules/pulsar.md | 96 +++++++++ examples/pulsar/pulsar.go | 64 ------ examples/pulsar/pulsar_test.go | 72 ------- mkdocs.yml | 2 +- {examples => modules}/pulsar/Makefile | 0 {examples => modules}/pulsar/go.mod | 11 +- {examples => modules}/pulsar/go.sum | 1 + modules/pulsar/pulsar.go | 203 ++++++++++++++++++ modules/pulsar/pulsar_test.go | 203 ++++++++++++++++++ {examples => modules}/pulsar/tools/tools.go | 0 13 files changed, 521 insertions(+), 160 deletions(-) rename .github/workflows/{pulsar-example.yml => module-pulsar.yml} (84%) delete mode 100644 docs/examples/pulsar.md create mode 100644 docs/modules/pulsar.md delete mode 100644 examples/pulsar/pulsar.go delete mode 100644 examples/pulsar/pulsar_test.go rename {examples => modules}/pulsar/Makefile (100%) rename {examples => modules}/pulsar/go.mod (91%) rename {examples => modules}/pulsar/go.sum (99%) create mode 100644 modules/pulsar/pulsar.go create mode 100644 modules/pulsar/pulsar_test.go rename {examples => modules}/pulsar/tools/tools.go (100%) diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 1ec44b7968..6c03258837 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -66,12 +66,6 @@ updates: interval: monthly open-pull-requests-limit: 3 rebase-strategy: disabled - - package-ecosystem: gomod - directory: /examples/pulsar - schedule: - interval: monthly - open-pull-requests-limit: 3 - rebase-strategy: disabled - package-ecosystem: gomod directory: /examples/redis schedule: @@ -108,3 +102,9 @@ updates: interval: monthly open-pull-requests-limit: 3 rebase-strategy: disabled + - package-ecosystem: gomod + directory: /modules/pulsar + schedule: + interval: monthly + open-pull-requests-limit: 3 + rebase-strategy: disabled diff --git a/.github/workflows/pulsar-example.yml b/.github/workflows/module-pulsar.yml similarity index 84% rename from .github/workflows/pulsar-example.yml rename to .github/workflows/module-pulsar.yml index df7ee7c9ab..ef4b544eb6 100644 --- a/.github/workflows/pulsar-example.yml +++ b/.github/workflows/module-pulsar.yml @@ -1,4 +1,4 @@ -name: Pulsar example pipeline +name: Pulsar module pipeline on: [push, pull_request] @@ -24,15 +24,15 @@ jobs: uses: actions/checkout@v3 - name: modVerify - working-directory: ./examples/pulsar + working-directory: ./modules/pulsar run: go mod verify - name: modTidy - working-directory: ./examples/pulsar + working-directory: ./modules/pulsar run: make tools-tidy - name: gotestsum - working-directory: ./examples/pulsar + working-directory: ./modules/pulsar run: make test-unit - name: Run checker diff --git a/docs/examples/pulsar.md b/docs/examples/pulsar.md deleted file mode 100644 index b10430f60f..0000000000 --- a/docs/examples/pulsar.md +++ /dev/null @@ -1,9 +0,0 @@ -# Pulsar - - -[Creating an Apache Pulsar container](../../examples/pulsar/pulsar.go) - - - -[Test for an Apache Pulsar container](../../examples/pulsar/pulsar_test.go) - diff --git a/docs/modules/pulsar.md b/docs/modules/pulsar.md new file mode 100644 index 0000000000..a28d7693d7 --- /dev/null +++ b/docs/modules/pulsar.md @@ -0,0 +1,96 @@ +# Apache Pulsar + +Testcontainers can be used to automatically create [Apache Pulsar](https://pulsar.apache.org) containers without external services. + +It's based on the official Apache Pulsar docker image, so it is recommended to read the [official guide](https://pulsar.apache.org/docs/next/getting-started-docker/). + +## Adding this module to your project dependencies + +Please run the following command to add the Apache Pulsar module to your Go dependencies: + +``` +go get github.com/testcontainers/testcontainers-go/modules/pulsar +``` + +## Usage example + +Create a `Pulsar` container to use it in your tests: + + +[Creating a Pulsar container](../../modules/pulsar/pulsar_test.go) inside_block:startPulsarContainer + + +where the `tt.opts` are the options to configure the container. See the [Container Options](#container-options) section for more details. + +Then you can retrieve the broker and the admin url: + + +[Get broker and admin urls](../../modules/pulsar/pulsar_test.go) inside_block:getPulsarURLs + + +## Container Options + +When starting the Pulsar container, you can pass options in a variadic way to configure it. + +### Pulsar Image +If you need to set a different Pulsar image you can use the `WithPulsarImage`. + + +[Set Pulsar image](../../modules/pulsar/pulsar_test.go) inside_block:setPulsarImage + + +### Pulsar Configuration +If you need to set Pulsar configuration variables you can use the `WithPulsarEnv` to set Pulsar environment variables: the `PULSAR_PREFIX_` prefix will be automatically added for you. + +For example, if you want to enable `brokerDeduplicationEnabled`: + + +[Set configuration variables](../../modules/pulsar/pulsar_test.go) inside_block:addPulsarEnv + + +It will result in the `PULSAR_PREFIX_brokerDeduplicationEnabled=true` environment variable being set in the container request. + +### Pulsar IO + +If you need to test Pulsar IO framework you can enable the Pulsar Functions Worker with the `WithFunctionsWorker` option: + + +[Create a Pulsar container with functions worker](../../modules/pulsar/pulsar_test.go) inside_block:withFunctionsWorker + + +### Pulsar Transactions + +If you need to test Pulsar Transactions you can enable the transactions feature: + + +[Create a Pulsar container with transactions](../../modules/pulsar/pulsar_test.go) inside_block:withTransactions + + +### Log consumers +If you need to collect the logs from the Pulsar container, you can add your own LogConsumer with the `WithLogConsumers` function, which accepts a variadic argument of LogConsumers. + + +[Adding LogConsumers](../../modules/pulsar/pulsar_test.go) inside_block:withLogConsumers + + +An example of a LogConsumer could be the following: + + +[Example LogConsumer](../../modules/pulsar/pulsar_test.go) inside_block:logConsumerForTesting + + +!!!warning + You will need to explicitly stop the producer in your tests. + +If you want to know more about LogConsumers, please check the [Following Container Logs](../features/follow_logs.md) documentation. + +## Advanced configuration + +In the case you need a more advanced configuration regarding the config, host config and endpoint settings Docker types, you can leverage the modifier functions that are available in +the ContainerRequest. The Pulsar container exposes a way to interact with those modifiers in a simple manner, using the aforementioned options in the `StartContainer` function: + + +[Advanced Docker settings](../../modules/pulsar/pulsar_test.go) inside_block:advancedDockerSettings + + +Please check out the [Advanced Settings](../features/creating_container.md#advanced-settings) for creating containers documentation. diff --git a/examples/pulsar/pulsar.go b/examples/pulsar/pulsar.go deleted file mode 100644 index 00af752733..0000000000 --- a/examples/pulsar/pulsar.go +++ /dev/null @@ -1,64 +0,0 @@ -package pulsar - -import ( - "context" - "fmt" - "io" - - "github.com/testcontainers/testcontainers-go" - "github.com/testcontainers/testcontainers-go/wait" -) - -type pulsarContainer struct { - testcontainers.Container - URI string -} - -func startContainer(ctx context.Context) (*pulsarContainer, error) { - matchAdminResponse := func(r io.Reader) bool { - respBytes, _ := io.ReadAll(r) - resp := string(respBytes) - return resp == `["standalone"]` - } - pulsarRequest := testcontainers.ContainerRequest{ - Image: "docker.io/apachepulsar/pulsar:2.10.2", - ExposedPorts: []string{"6650/tcp", "8080/tcp"}, - WaitingFor: wait.ForAll( - wait.ForHTTP("/admin/v2/clusters").WithPort("8080/tcp").WithResponseMatcher(matchAdminResponse), - wait.ForLog("Successfully updated the policies on namespace public/default"), - ), - Cmd: []string{ - "/bin/bash", - "-c", - "/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf && bin/pulsar standalone --no-functions-worker -nss", - }, - } - c, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: pulsarRequest, - Started: true, - }) - if err != nil { - return nil, err - } - - c.StartLogProducer(ctx) - defer c.StopLogProducer() - lc := logConsumer{} - c.FollowOutput(&lc) - - pulsarPort, err := c.MappedPort(ctx, "6650/tcp") - if err != nil { - return nil, err - } - - return &pulsarContainer{ - Container: c, - URI: fmt.Sprintf("pulsar://127.0.0.1:%v", pulsarPort.Int()), - }, nil -} - -type logConsumer struct{} - -func (lc *logConsumer) Accept(l testcontainers.Log) { - fmt.Print(string(l.Content)) -} diff --git a/examples/pulsar/pulsar_test.go b/examples/pulsar/pulsar_test.go deleted file mode 100644 index 024a4f14e5..0000000000 --- a/examples/pulsar/pulsar_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package pulsar - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/apache/pulsar-client-go/pulsar" -) - -func TestPulsar(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - c, err := startContainer(ctx) - if err != nil { - t.Fatal(err) - } - - pc, err := pulsar.NewClient(pulsar.ClientOptions{ - URL: c.URI, - OperationTimeout: 30 * time.Second, - ConnectionTimeout: 30 * time.Second, - }) - if err != nil { - t.Fatal(err) - } - t.Cleanup(func() { pc.Close() }) - - consumer, err := pc.Subscribe(pulsar.ConsumerOptions{ - Topic: "test-topic", - SubscriptionName: "pulsar-test", - Type: pulsar.Exclusive, - }) - if err != nil { - t.Fatal(err) - } - t.Cleanup(func() { consumer.Close() }) - - msgChan := make(chan []byte) - go func() { - msg, err := consumer.Receive(ctx) - if err != nil { - fmt.Println("failed to receive message", err) - return - } - msgChan <- msg.Payload() - consumer.Ack(msg) - }() - - producer, err := pc.CreateProducer(pulsar.ProducerOptions{ - Topic: "test-topic", - }) - if err != nil { - t.Fatal(err) - } - - producer.Send(ctx, &pulsar.ProducerMessage{ - Payload: []byte("hello world"), - }) - - ticker := time.NewTicker(1 * time.Minute) - select { - case <-ticker.C: - t.Fatal("did not receive message in time") - case msg := <-msgChan: - if string(msg) != "hello world" { - t.Fatal("received unexpected message bytes") - } - } -} diff --git a/mkdocs.yml b/mkdocs.yml index fdc1f309bc..15158c1591 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -50,6 +50,7 @@ nav: - Modules: - modules/index.md - modules/localstack.md + - modules/pulsar.md - Examples: - examples/index.md - examples/bigtable.md @@ -62,7 +63,6 @@ nav: - examples/nginx.md - examples/postgres.md - examples/pubsub.md - - examples/pulsar.md - examples/redis.md - examples/spanner.md - examples/toxiproxy.md diff --git a/examples/pulsar/Makefile b/modules/pulsar/Makefile similarity index 100% rename from examples/pulsar/Makefile rename to modules/pulsar/Makefile diff --git a/examples/pulsar/go.mod b/modules/pulsar/go.mod similarity index 91% rename from examples/pulsar/go.mod rename to modules/pulsar/go.mod index f77a5d7ce7..b6495a59af 100644 --- a/examples/pulsar/go.mod +++ b/modules/pulsar/go.mod @@ -1,9 +1,12 @@ -module github.com/testcontainers/testcontainers-go/examples/pulsar +module github.com/testcontainers/testcontainers-go/modules/pulsar go 1.18 require ( github.com/apache/pulsar-client-go v0.9.0 + github.com/docker/docker v23.0.1+incompatible + github.com/docker/go-connections v0.4.0 + github.com/stretchr/testify v1.8.2 github.com/testcontainers/testcontainers-go v0.18.0 gotest.tools/gotestsum v1.9.0 ) @@ -23,10 +26,9 @@ require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/containerd/containerd v1.6.19 // indirect github.com/danieljoos/wincred v1.1.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/dnephin/pflag v1.0.7 // indirect github.com/docker/distribution v2.8.1+incompatible // indirect - github.com/docker/docker v23.0.1+incompatible // indirect - github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/dvsekhvalnov/jose2go v1.5.0 // indirect github.com/fatih/color v1.13.0 // indirect @@ -56,13 +58,13 @@ require ( github.com/opencontainers/runc v1.1.3 // indirect github.com/pierrec/lz4 v2.0.5+incompatible // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.12.2 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect - github.com/stretchr/objx v0.5.0 // indirect go.uber.org/atomic v1.7.0 // indirect golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/net v0.7.0 // indirect @@ -75,5 +77,6 @@ require ( google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad // indirect google.golang.org/grpc v1.47.0 // indirect google.golang.org/protobuf v1.28.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect gotest.tools/v3 v3.4.0 // indirect ) diff --git a/examples/pulsar/go.sum b/modules/pulsar/go.sum similarity index 99% rename from examples/pulsar/go.sum rename to modules/pulsar/go.sum index 5b115f5c51..1762c38ca8 100644 --- a/examples/pulsar/go.sum +++ b/modules/pulsar/go.sum @@ -439,6 +439,7 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= diff --git a/modules/pulsar/pulsar.go b/modules/pulsar/pulsar.go new file mode 100644 index 0000000000..4260ac9305 --- /dev/null +++ b/modules/pulsar/pulsar.go @@ -0,0 +1,203 @@ +package pulsar + +import ( + "context" + "fmt" + "io" + "strings" + + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/network" + "github.com/docker/go-connections/nat" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" +) + +const defaultPulsarImage = "docker.io/apachepulsar/pulsar:2.10.2" +const defaultPulsarPort = "6650/tcp" +const defaultPulsarAdminPort = "8080/tcp" +const defaultPulsarCmd = "/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf && bin/pulsar standalone" +const detaultPulsarCmdWithoutFunctionsWorker = "--no-functions-worker -nss" +const transactionTopicEndpoint = "/admin/v2/persistent/pulsar/system/transaction_coordinator_assign/partitions" + +var defaultWaitStrategies = wait.ForAll( + wait.ForHTTP("/admin/v2/clusters").WithPort(defaultPulsarAdminPort).WithResponseMatcher(func(r io.Reader) bool { + respBytes, _ := io.ReadAll(r) + resp := string(respBytes) + return resp == `["standalone"]` + }), + wait.ForLog("Successfully updated the policies on namespace public/default"), +) + +type Container struct { + testcontainers.Container + LogConsumers []testcontainers.LogConsumer // Needs to be exported to control the stop from the caller +} + +func (c *Container) BrokerURL(ctx context.Context) (string, error) { + return c.resolveURL(ctx, defaultPulsarPort) +} + +func (c *Container) HTTPServiceURL(ctx context.Context) (string, error) { + return c.resolveURL(ctx, defaultPulsarAdminPort) +} + +func (c *Container) resolveURL(ctx context.Context, port nat.Port) (string, error) { + provider, err := testcontainers.NewDockerProvider() + if err != nil { + return "", err + } + + host, err := provider.DaemonHost(ctx) + if err != nil { + return "", err + } + + pulsarPort, err := c.MappedPort(ctx, port) + if err != nil { + return "", err + } + + proto := "pulsar" + if port == defaultPulsarAdminPort { + proto = "http" + } + + return fmt.Sprintf("%s://%s:%v", proto, host, pulsarPort.Int()), nil +} + +type ContainerRequest struct { + testcontainers.ContainerRequest + logConsumers []testcontainers.LogConsumer +} + +// ContainerOptions is a function that can be used to configure the Pulsar container +type ContainerOptions func(req *ContainerRequest) + +// WithConfigModifier allows to override the default container config +func WithConfigModifier(modifier func(config *container.Config)) ContainerOptions { + return func(req *ContainerRequest) { + req.ConfigModifier = modifier + } +} + +// WithEndpointSettingsModifier allows to override the default endpoint settings +func WithEndpointSettingsModifier(modifier func(settings map[string]*network.EndpointSettings)) ContainerOptions { + return func(req *ContainerRequest) { + req.EnpointSettingsModifier = modifier + } +} + +// WithFunctionsWorker enables the functions worker, which will override the default pulsar command +// and add a waiting strategy for the functions worker +func WithFunctionsWorker() ContainerOptions { + return func(req *ContainerRequest) { + req.Cmd = []string{"/bin/bash", "-c", defaultPulsarCmd} + + // add the waiting strategy for the functions worker + defaultWaitStrategies.Strategies = append( + defaultWaitStrategies.Strategies, + wait.ForLog("Function worker service started"), + ) + + req.WaitingFor = defaultWaitStrategies + } +} + +// WithHostConfigModifier allows to override the default host config +func WithHostConfigModifier(modifier func(hostConfig *container.HostConfig)) ContainerOptions { + return func(req *ContainerRequest) { + req.HostConfigModifier = modifier + } +} + +// WithLogConsumer allows to add log consumers to the container. They will be automatically started and stopped by the StartContainer function +// but it's a responsibility of the caller to stop them calling StopLogProducer +func WithLogConsumers(consumer ...testcontainers.LogConsumer) ContainerOptions { + return func(req *ContainerRequest) { + req.logConsumers = append(req.logConsumers, consumer...) + } +} + +// WithPulsarEnv allows to use the native APIs and set each variable with PULSAR_PREFIX_ as prefix. +func WithPulsarEnv(configVar string, configValue string) ContainerOptions { + return func(req *ContainerRequest) { + req.ContainerRequest.Env["PULSAR_PREFIX_"+configVar] = configValue + } +} + +// WithPulsarImage allows to override the default Pulsar image +func WithPulsarImage(image string) ContainerOptions { + return func(req *ContainerRequest) { + if image == "" { + image = defaultPulsarImage + } + + req.Image = image + } +} + +func WithTransactions() ContainerOptions { + return func(req *ContainerRequest) { + WithPulsarEnv("transactionCoordinatorEnabled", "true")(req) + + // add the waiting strategy for the transaction topic + defaultWaitStrategies.Strategies = append( + defaultWaitStrategies.Strategies, + wait.ForHTTP(transactionTopicEndpoint).WithPort(defaultPulsarAdminPort).WithStatusCodeMatcher(func(statusCode int) bool { + return statusCode == 200 + }), + ) + + req.WaitingFor = defaultWaitStrategies + } +} + +// StartContainer creates an instance of the Pulsar container type, being possible to pass a custom request and options +// The created container will use the following defaults: +// - image: docker.io/apachepulsar/pulsar:2.10.2 +// - exposed ports: 6650/tcp, 8080/tcp +// - waiting strategy: wait for all the following strategies: +// - the Pulsar admin API ("/admin/v2/clusters") to be ready on port 8080/tcp and return the response `["standalone"]` +// - the log message "Successfully updated the policies on namespace public/default" +// - command: "/bin/bash -c /pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf && bin/pulsar standalone --no-functions-worker -nss" +func StartContainer(ctx context.Context, opts ...ContainerOptions) (*Container, error) { + req := testcontainers.ContainerRequest{ + Image: defaultPulsarImage, + Env: map[string]string{}, + ExposedPorts: []string{defaultPulsarPort, defaultPulsarAdminPort}, + WaitingFor: defaultWaitStrategies, + Cmd: []string{"/bin/bash", "-c", strings.Join([]string{defaultPulsarCmd, detaultPulsarCmdWithoutFunctionsWorker}, " ")}, + } + + pulsarRequest := ContainerRequest{ + ContainerRequest: req, + logConsumers: []testcontainers.LogConsumer{}, + } + + for _, opt := range opts { + opt(&pulsarRequest) + } + + c, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: pulsarRequest.ContainerRequest, + Started: true, + }) + if err != nil { + return nil, err + } + + pc := &Container{ + Container: c, + LogConsumers: pulsarRequest.logConsumers, + } + + if len(pc.LogConsumers) > 0 { + c.StartLogProducer(ctx) + } + for _, lc := range pc.LogConsumers { + c.FollowOutput(lc) + } + + return pc, nil +} diff --git a/modules/pulsar/pulsar_test.go b/modules/pulsar/pulsar_test.go new file mode 100644 index 0000000000..833e457fb1 --- /dev/null +++ b/modules/pulsar/pulsar_test.go @@ -0,0 +1,203 @@ +package pulsar_test + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "testing" + "time" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/network" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + testcontainerspulsar "github.com/testcontainers/testcontainers-go/modules/pulsar" +) + +// logConsumerForTesting { +// logConsumer is a testcontainers.LogConsumer that prints the log to stdout +type testLogConsumer struct{} + +// Accept prints the log to stdout +func (lc *testLogConsumer) Accept(l testcontainers.Log) { + fmt.Print(string(l.Content)) +} + +// } + +func TestPulsar(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + nwName := "pulsar-test" + _, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{ + NetworkRequest: testcontainers.NetworkRequest{ + Name: nwName, + }, + }) + require.NoError(t, err) + + tests := []struct { + name string + opts []testcontainerspulsar.ContainerOptions + }{ + { + name: "default", + }, + { + name: "with modifiers", + opts: []testcontainerspulsar.ContainerOptions{ + // setPulsarImage { + testcontainerspulsar.WithPulsarImage("docker.io/apachepulsar/pulsar:2.10.2"), + // } + // addPulsarEnv { + testcontainerspulsar.WithPulsarEnv("brokerDeduplicationEnabled", "true"), + // } + // advancedDockerSettings { + testcontainerspulsar.WithConfigModifier(func(config *container.Config) { + config.Env = append(config.Env, "PULSAR_MEM= -Xms512m -Xmx512m -XX:MaxDirectMemorySize=512m") + }), + testcontainerspulsar.WithHostConfigModifier(func(hostConfig *container.HostConfig) { + hostConfig.Resources = container.Resources{ + Memory: 1024 * 1024 * 1024, + } + }), + testcontainerspulsar.WithEndpointSettingsModifier(func(settings map[string]*network.EndpointSettings) { + settings[nwName] = &network.EndpointSettings{ + Aliases: []string{"pulsar"}, + } + }), + // } + }, + }, + { + name: "with functions worker", + opts: []testcontainerspulsar.ContainerOptions{ + // withFunctionsWorker { + testcontainerspulsar.WithFunctionsWorker(), + // } + }, + }, + { + name: "with transactions", + opts: []testcontainerspulsar.ContainerOptions{ + // withTransactions { + testcontainerspulsar.WithTransactions(), + // } + }, + }, + { + name: "with log consumers", + opts: []testcontainerspulsar.ContainerOptions{ + // withLogConsumers { + testcontainerspulsar.WithLogConsumers(&testLogConsumer{}), + // } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // startPulsarContainer { + c, err := testcontainerspulsar.StartContainer( + ctx, + tt.opts..., + ) + // } + require.Nil(t, err) + + if len(c.LogConsumers) > 0 { + defer c.StopLogProducer() + } + + // getPulsarURLs { + brokerURL, err := c.BrokerURL(ctx) + require.Nil(t, err) + + serviceURL, err := c.HTTPServiceURL(ctx) + require.Nil(t, err) + // } + + assert.True(t, strings.HasPrefix(brokerURL, "pulsar://")) + assert.True(t, strings.HasPrefix(serviceURL, "http://")) + + pc, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: brokerURL, + OperationTimeout: 30 * time.Second, + ConnectionTimeout: 30 * time.Second, + }) + require.Nil(t, err) + t.Cleanup(func() { pc.Close() }) + + subscriptionName := "pulsar-test" + + consumer, err := pc.Subscribe(pulsar.ConsumerOptions{ + Topic: "test-topic", + SubscriptionName: subscriptionName, + Type: pulsar.Exclusive, + }) + require.Nil(t, err) + t.Cleanup(func() { consumer.Close() }) + + msgChan := make(chan []byte) + go func() { + msg, err := consumer.Receive(ctx) + if err != nil { + fmt.Println("failed to receive message", err) + return + } + msgChan <- msg.Payload() + consumer.Ack(msg) + }() + + producer, err := pc.CreateProducer(pulsar.ProducerOptions{ + Topic: "test-topic", + }) + require.Nil(t, err) + + producer.Send(ctx, &pulsar.ProducerMessage{ + Payload: []byte("hello world"), + }) + + ticker := time.NewTicker(1 * time.Minute) + select { + case <-ticker.C: + t.Fatal("did not receive message in time") + case msg := <-msgChan: + if string(msg) != "hello world" { + t.Fatal("received unexpected message bytes") + } + } + + // get topic statistics using the Admin endpoint + httpClient := http.Client{ + Timeout: 30 * time.Second, + } + + resp, err := httpClient.Get(serviceURL + "/admin/v2/persistent/public/default/test-topic/stats") + require.Nil(t, err) + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + require.Nil(t, err) + + var stats map[string]interface{} + err = json.Unmarshal(body, &stats) + require.Nil(t, err) + + subscriptions := stats["subscriptions"] + require.NotNil(t, subscriptions) + + subscriptionsMap := subscriptions.(map[string]interface{}) + + // check that the subscription exists + _, ok := subscriptionsMap[subscriptionName] + assert.True(t, ok) + }) + } +} diff --git a/examples/pulsar/tools/tools.go b/modules/pulsar/tools/tools.go similarity index 100% rename from examples/pulsar/tools/tools.go rename to modules/pulsar/tools/tools.go