From 5e8e92fa35e87d635432621e4e37d5f4f3bd9272 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Wed, 31 May 2023 15:31:05 -0600 Subject: [PATCH] [chore][scraperinttest] Add support for NetworkRequest (#22790) * [chore][scraperinttest] Add support for NetworkRequest and migrate flink and kafka --- .../coreinternal/scraperinttest/scraperint.go | 43 +++- .../flinkmetricsreceiver/integration_test.go | 237 +++++------------- .../integration/Dockerfile.flink-master | 4 - .../integration/Dockerfile.flink-worker | 3 - .../testdata/integration/expected.yaml | 42 ---- .../testdata/integration/setup.sh | 2 +- receiver/kafkametricsreceiver/go.mod | 5 +- receiver/kafkametricsreceiver/go.sum | 2 +- .../kafkametricsreceiver/integration_test.go | 131 +++++----- .../testdata/integration/expected.yaml | 15 ++ 10 files changed, 178 insertions(+), 306 deletions(-) delete mode 100644 receiver/flinkmetricsreceiver/testdata/integration/Dockerfile.flink-master delete mode 100644 receiver/flinkmetricsreceiver/testdata/integration/Dockerfile.flink-worker create mode 100644 receiver/kafkametricsreceiver/testdata/integration/expected.yaml diff --git a/internal/coreinternal/scraperinttest/scraperint.go b/internal/coreinternal/scraperinttest/scraperint.go index caf5c9bc5e8ff..6c176bbfe1736 100644 --- a/internal/coreinternal/scraperinttest/scraperint.go +++ b/internal/coreinternal/scraperinttest/scraperint.go @@ -45,6 +45,7 @@ func NewIntegrationTest(f receiver.Factory, opts ...TestOption) *IntegrationTest } type IntegrationTest struct { + networkRequest *testcontainers.NetworkRequest containerRequests []testcontainers.ContainerRequest createContainerTimeout time.Duration @@ -61,6 +62,13 @@ type IntegrationTest struct { func (it *IntegrationTest) Run(t *testing.T) { it.validate(t) + if it.networkRequest != nil { + network := it.createNetwork(t) + defer func() { + require.NoError(t, network.Remove(context.Background())) + }() + } + ci := it.createContainers(t) defer ci.terminate(t) @@ -113,6 +121,26 @@ func (it *IntegrationTest) Run(t *testing.T) { it.compareTimeout, it.compareTimeout/20) } +func (it *IntegrationTest) createNetwork(t *testing.T) testcontainers.Network { + var errs error + + var network testcontainers.Network + var err error + require.Eventuallyf(t, func() bool { + network, err = testcontainers.GenericNetwork( + context.Background(), + testcontainers.GenericNetworkRequest{ + NetworkRequest: *it.networkRequest, + }) + if err != nil { + errs = multierr.Append(errs, err) + return false + } + return true + }, it.createContainerTimeout, time.Second, "create network timeout: %v", errs) + return network +} + func (it *IntegrationTest) createContainers(t *testing.T) *ContainerInfo { var wg sync.WaitGroup ci := &ContainerInfo{ @@ -122,12 +150,7 @@ func (it *IntegrationTest) createContainers(t *testing.T) *ContainerInfo { for _, cr := range it.containerRequests { go func(req testcontainers.ContainerRequest) { var errs error - defer func() { - if t.Failed() && errs != nil { - t.Errorf("create container: %v", errs) - } - }() - require.Eventually(t, func() bool { + require.Eventuallyf(t, func() bool { c, err := testcontainers.GenericContainer( context.Background(), testcontainers.GenericContainerRequest{ @@ -140,7 +163,7 @@ func (it *IntegrationTest) createContainers(t *testing.T) *ContainerInfo { } ci.add(req.Name, c) return true - }, it.createContainerTimeout, time.Second) + }, it.createContainerTimeout, time.Second, "create container timeout: %v", errs) wg.Done() }(cr) } @@ -165,6 +188,12 @@ func (it *IntegrationTest) validate(t *testing.T) { type TestOption func(*IntegrationTest) +func WithNetworkRequest(nr testcontainers.NetworkRequest) TestOption { + return func(it *IntegrationTest) { + it.networkRequest = &nr + } +} + func WithContainerRequest(cr testcontainers.ContainerRequest) TestOption { return func(it *IntegrationTest) { it.containerRequests = append(it.containerRequests, cr) diff --git a/receiver/flinkmetricsreceiver/integration_test.go b/receiver/flinkmetricsreceiver/integration_test.go index 8d683b8b87b82..871b54f98bc95 100644 --- a/receiver/flinkmetricsreceiver/integration_test.go +++ b/receiver/flinkmetricsreceiver/integration_test.go @@ -7,191 +7,84 @@ package flinkmetricsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver" import ( - "context" "fmt" - "io" "net" - "net/http" - "os" - "path" "path/filepath" - "strings" "testing" "time" - "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/collector/component" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/scraperinttest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" ) -func TestIntegration(t *testing.T) { - t.Parallel() - networkName := "new-network" - ctx := context.Background() - newNetwork, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{ - NetworkRequest: testcontainers.NetworkRequest{ - Name: networkName, - CheckDuplicate: true, - }, - }) - if err != nil { - require.NoError(t, err) - } - defer func() { - require.NoError(t, newNetwork.Remove(ctx)) - }() +const ( + networkName = "flink-network" + flinkPort = "8081" +) - masterContainer := getContainer(t, testcontainers.ContainerRequest{ - FromDockerfile: testcontainers.FromDockerfile{ - Context: path.Join("testdata", "integration"), - Dockerfile: "Dockerfile.flink-master", - }, - Hostname: "flink-master", - Networks: []string{networkName}, - ExposedPorts: []string{"8080:8080", "8081:8081"}, - WaitingFor: waitStrategy{endpoint: "http://localhost:8081/jobmanager/metrics"}, - LifecycleHooks: []testcontainers.ContainerLifecycleHooks{{ - PostStarts: []testcontainers.ContainerHook{ - scraperinttest.RunScript([]string{"/setup.sh"}), +func TestIntegration(t *testing.T) { + scraperinttest.NewIntegrationTest( + NewFactory(), + scraperinttest.WithNetworkRequest( + testcontainers.NetworkRequest{ + Name: networkName, + CheckDuplicate: true, }, - }}, - }) - - workerContainer := getContainer(t, testcontainers.ContainerRequest{ - FromDockerfile: testcontainers.FromDockerfile{ - Context: path.Join("testdata", "integration"), - Dockerfile: "Dockerfile.flink-worker", - }, - Hostname: "worker", - Networks: []string{networkName}, - }) - defer func() { - require.NoError(t, masterContainer.Terminate(context.Background())) - }() - defer func() { - require.NoError(t, workerContainer.Terminate(context.Background())) - }() - - hostname, err := masterContainer.Host(context.Background()) - require.NoError(t, err) - - // Required to start the taskmanager - ws := waitStrategy{"http://localhost:8081/taskmanagers/metrics"} - err = ws.waitFor(context.Background(), "") - require.NoError(t, err) - - // Required to prevent empty value jobs call - ws = waitStrategy{endpoint: "http://localhost:8081/jobs"} - err = ws.waitFor(context.Background(), "") - require.NoError(t, err) - - // Required to prevent empty value for job, operator and task metrics call - ws = waitStrategy{endpoint: "http://localhost:8081/jobs/metrics"} - err = ws.waitFor(context.Background(), "") - require.NoError(t, err) - - // Override function to return deterministic field - defer func() { osHostname = os.Hostname }() - osHostname = func() (string, error) { return "job-localhost", nil } - - // Override function to return deterministic field - defer func() { taskmanagerHost = strings.Split }() - taskmanagerHost = func(id string, sep string) []string { return []string{"taskmanager-localhost"} } - - // Override function to return deterministic field - defer func() { taskmanagerID = reflect }() - taskmanagerID = func(id string) string { return "taskmanagerID" } - - f := NewFactory() - cfg := f.CreateDefaultConfig().(*Config) - cfg.ScraperControllerSettings.CollectionInterval = 100 * time.Millisecond - cfg.Endpoint = fmt.Sprintf("http://%s", net.JoinHostPort(hostname, "8081")) - - consumer := new(consumertest.MetricsSink) - settings := receivertest.NewNopCreateSettings() - rcvr, err := f.CreateMetricsReceiver(context.Background(), settings, cfg, consumer) - require.NoError(t, err, "failed creating metrics receiver") - require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost())) - require.Eventuallyf(t, func() bool { - return consumer.DataPointCount() > 0 - }, 2*time.Minute, 1*time.Second, "failed to receive more than 0 metrics") - require.NoError(t, rcvr.Shutdown(context.Background())) - - actualMetrics := consumer.AllMetrics()[0] - expectedFile := filepath.Join("testdata", "integration", "expected.yaml") - expectedMetrics, err := golden.ReadMetrics(expectedFile) - require.NoError(t, err) - require.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, actualMetrics, pmetrictest.IgnoreMetricValues(), - pmetrictest.IgnoreStartTimestamp(), pmetrictest.IgnoreTimestamp())) -} - -func getContainer(t *testing.T, req testcontainers.ContainerRequest) testcontainers.Container { - require.NoError(t, req.Validate()) - container, err := testcontainers.GenericContainer( - context.Background(), - testcontainers.GenericContainerRequest{ - ContainerRequest: req, - Started: true, - }) - require.NoError(t, err) - return container -} - -type waitStrategy struct { - endpoint string -} - -func (ws waitStrategy) WaitUntilReady(ctx context.Context, st wait.StrategyTarget) error { - if err := wait.ForListeningPort("8081"). - WithStartupTimeout(2*time.Minute). - WaitUntilReady(ctx, st); err != nil { - return err - } - - hostname, err := st.Host(ctx) - if err != nil { - return err - } - - return ws.waitFor(ctx, hostname) -} - -// waitFor waits until an endpoint is ready with an id response. -func (ws waitStrategy) waitFor(ctx context.Context, _ string) error { - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(5 * time.Second): - return fmt.Errorf("server startup problem") - case <-ticker.C: - resp, err := http.Get(ws.endpoint) - if err != nil { - continue - } - - body, err := io.ReadAll(resp.Body) - if err != nil { - continue - } - - if resp.Body.Close() != nil { - continue - } - - // The server needs a moment to generate some stats - if strings.Contains(string(body), "id") { - return nil - } - } - } + ), + scraperinttest.WithContainerRequest(testcontainers.ContainerRequest{ + Image: "flink:1.17.0", + Name: "jobmanager", + Networks: []string{networkName}, + ExposedPorts: []string{flinkPort}, + Cmd: []string{"jobmanager"}, + Env: map[string]string{ + "FLINK_PROPERTIES": "jobmanager.rpc.address: jobmanager", + }, + Files: []testcontainers.ContainerFile{{ + HostFilePath: filepath.Join("testdata", "integration", "setup.sh"), + ContainerFilePath: "/setup.sh", + FileMode: 700, + }}, + WaitingFor: wait.ForAll( + wait.ForListeningPort(flinkPort), + wait.ForHTTP("jobmanager/metrics").WithPort(flinkPort), + wait.ForHTTP("taskmanagers/metrics").WithPort(flinkPort), + ), + LifecycleHooks: []testcontainers.ContainerLifecycleHooks{{ + PostStarts: []testcontainers.ContainerHook{ + scraperinttest.RunScript([]string{"/setup.sh"}), + }, + }}, + }), + scraperinttest.WithContainerRequest(testcontainers.ContainerRequest{ + Image: "flink:1.17.0", + Name: "taskmanager", + Networks: []string{networkName}, + Cmd: []string{"taskmanager"}, + Env: map[string]string{ + "FLINK_PROPERTIES": "jobmanager.rpc.address: jobmanager", + }, + }), + scraperinttest.WithCustomConfig(func(t *testing.T, cfg component.Config, ci *scraperinttest.ContainerInfo) { + rCfg := cfg.(*Config) + rCfg.CollectionInterval = 100 * time.Millisecond + rCfg.Endpoint = fmt.Sprintf("http://%s", + net.JoinHostPort( + ci.HostForNamedContainer(t, "jobmanager"), + ci.MappedPortForNamedContainer(t, "jobmanager", flinkPort), + )) + }), + scraperinttest.WithCompareOptions( + pmetrictest.IgnoreResourceAttributeValue("host.name"), + pmetrictest.IgnoreResourceAttributeValue("flink.task.name"), + pmetrictest.IgnoreResourceAttributeValue("flink.taskmanager.id"), + pmetrictest.IgnoreMetricValues(), + pmetrictest.IgnoreStartTimestamp(), + pmetrictest.IgnoreTimestamp(), + ), + ).Run(t) } diff --git a/receiver/flinkmetricsreceiver/testdata/integration/Dockerfile.flink-master b/receiver/flinkmetricsreceiver/testdata/integration/Dockerfile.flink-master deleted file mode 100644 index f037c42232578..0000000000000 --- a/receiver/flinkmetricsreceiver/testdata/integration/Dockerfile.flink-master +++ /dev/null @@ -1,4 +0,0 @@ -FROM bde2020/flink-master:1.14.5-hadoop3.2 - -COPY /setup.sh /setup.sh -RUN chmod +x setup.sh diff --git a/receiver/flinkmetricsreceiver/testdata/integration/Dockerfile.flink-worker b/receiver/flinkmetricsreceiver/testdata/integration/Dockerfile.flink-worker deleted file mode 100644 index 8cd30e613f833..0000000000000 --- a/receiver/flinkmetricsreceiver/testdata/integration/Dockerfile.flink-worker +++ /dev/null @@ -1,3 +0,0 @@ -FROM bde2020/flink-worker:1.14.5-hadoop3.2 - -ENV FLINK_MASTER_PORT_6123_TCP_ADDR=flink-master diff --git a/receiver/flinkmetricsreceiver/testdata/integration/expected.yaml b/receiver/flinkmetricsreceiver/testdata/integration/expected.yaml index d78e2f77f2b25..195a796570793 100644 --- a/receiver/flinkmetricsreceiver/testdata/integration/expected.yaml +++ b/receiver/flinkmetricsreceiver/testdata/integration/expected.yaml @@ -37,48 +37,6 @@ resourceMetrics: timeUnixNano: "1656013053528397000" isMonotonic: true unit: ns - - description: The total number of collections that have occurred. - name: flink.jvm.gc.collections.count - sum: - aggregationTemporality: 2 - dataPoints: - - asInt: "2" - attributes: - - key: name - value: - stringValue: PS_MarkSweep - startTimeUnixNano: "1656013053428227000" - timeUnixNano: "1656013053528397000" - - asInt: "2" - attributes: - - key: name - value: - stringValue: PS_Scavenge - startTimeUnixNano: "1656013053428227000" - timeUnixNano: "1656013053528397000" - isMonotonic: true - unit: '{collections}' - - description: The total time spent performing garbage collection. - name: flink.jvm.gc.collections.time - sum: - aggregationTemporality: 2 - dataPoints: - - asInt: "117" - attributes: - - key: name - value: - stringValue: PS_MarkSweep - startTimeUnixNano: "1656013053428227000" - timeUnixNano: "1656013053528397000" - - asInt: "30" - attributes: - - key: name - value: - stringValue: PS_Scavenge - startTimeUnixNano: "1656013053428227000" - timeUnixNano: "1656013053528397000" - isMonotonic: true - unit: ms - description: The total capacity of all buffers in the direct buffer pool. name: flink.jvm.memory.direct.total_capacity sum: diff --git a/receiver/flinkmetricsreceiver/testdata/integration/setup.sh b/receiver/flinkmetricsreceiver/testdata/integration/setup.sh index 63221702461af..dca512b20b6a6 100755 --- a/receiver/flinkmetricsreceiver/testdata/integration/setup.sh +++ b/receiver/flinkmetricsreceiver/testdata/integration/setup.sh @@ -5,6 +5,6 @@ set -e -./usr/local/flink/bin/flink run --detached /usr/local/flink/examples/streaming/StateMachineExample.jar +/opt/flink/bin/flink run --detached /opt/flink/examples/streaming/StateMachineExample.jar exit 0 diff --git a/receiver/kafkametricsreceiver/go.mod b/receiver/kafkametricsreceiver/go.mod index 6f4b38f189dbe..6b79ccf42edbe 100644 --- a/receiver/kafkametricsreceiver/go.mod +++ b/receiver/kafkametricsreceiver/go.mod @@ -6,6 +6,8 @@ require ( github.com/Shopify/sarama v1.38.1 github.com/google/go-cmp v0.5.9 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter v0.78.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.78.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.78.0 github.com/stretchr/testify v1.8.3 github.com/testcontainers/testcontainers-go v0.20.1 go.opentelemetry.io/collector v0.78.3-0.20230525165144-87dd85a6c034 @@ -24,6 +26,7 @@ require ( github.com/apache/thrift v0.18.1 // indirect github.com/aws/aws-sdk-go v1.44.271 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/containerd/containerd v1.6.19 // indirect github.com/cpuguy83/dockercfg v0.3.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -63,7 +66,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/morikuni/aec v1.0.0 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.78.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.78.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.78.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0-rc2 // indirect diff --git a/receiver/kafkametricsreceiver/go.sum b/receiver/kafkametricsreceiver/go.sum index a0bbd9a83d2cf..d5b395d3f8cfe 100644 --- a/receiver/kafkametricsreceiver/go.sum +++ b/receiver/kafkametricsreceiver/go.sum @@ -45,9 +45,9 @@ github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kB github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/checkpoint-restore/go-criu/v5 v5.3.0/go.mod h1:E/eQpaFtUKGOOSEBZgmKAcn+zUUwWxqcaKZlF54wK8E= github.com/cilium/ebpf v0.7.0/go.mod h1:/oI2+1shJiTGAMgl6/RgJr36Eo1jzrRcAWbcXO2usCA= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= diff --git a/receiver/kafkametricsreceiver/integration_test.go b/receiver/kafkametricsreceiver/integration_test.go index a22f03f59aa93..d946e72a135e1 100644 --- a/receiver/kafkametricsreceiver/integration_test.go +++ b/receiver/kafkametricsreceiver/integration_test.go @@ -7,92 +7,73 @@ package kafkametricsreceiver import ( - "context" "fmt" "testing" "time" - "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/receiver" - "go.opentelemetry.io/collector/receiver/receivertest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/scraperinttest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" ) const ( - zkPort = 2181 - kafkaPort = 9092 - kafkaZkImage = "johnnypark/kafka-zookeeper" - // only one metric, number of brokers, will be reported. - expectedMetrics = 1 + networkName = "kafka-network" + kafkaPort = "9092" + zookeeperPort = "2181" + zookeeperHost = "zookeeper" ) -type testHost struct { - component.Host - t *testing.T -} - -func (h *testHost) ReportFatalError(err error) { - h.t.Fatalf("receiver reported a fatal error: %v", err) -} - -var _ component.Host = (*testHost)(nil) - func TestIntegration(t *testing.T) { - t.Skip("Skip failing test, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/17065") - ctx := context.Background() - - req := testcontainers.ContainerRequest{ - Image: kafkaZkImage, - ExposedPorts: []string{ - fmt.Sprintf("%d/tcp", kafkaPort), - fmt.Sprintf("%d/tcp", zkPort), - }, - WaitingFor: wait.ForAll( - wait.ForListeningPort("2181/tcp").WithStartupTimeout(time.Minute*2), - wait.ForListeningPort("9092/tcp").WithStartupTimeout(time.Minute*2), - ).WithDeadline(time.Minute * 2), - } - container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: req, - Started: true, - }) - require.Nil(t, err) - - mappedKafkaPort, err := container.MappedPort(ctx, "9092") - require.Nil(t, err) - - hostIP, err := container.Host(ctx) - require.Nil(t, err) - - kafkaAddress := fmt.Sprintf("%s:%s", hostIP, mappedKafkaPort.Port()) - - f := NewFactory() - cfg := f.CreateDefaultConfig().(*Config) - cfg.Scrapers = []string{ - "brokers", - "consumers", - "topics", - } - cfg.Brokers = []string{kafkaAddress} - cfg.CollectionInterval = 5 * time.Second - consumer := new(consumertest.MetricsSink) - - var receiver receiver.Metrics - - receiver, err = f.CreateMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, consumer) - require.NoError(t, err, "failed to create receiver") - require.Eventuallyf(t, func() bool { - err = receiver.Start(context.Background(), &testHost{t: t}) - return err == nil - }, 30*time.Second, 5*time.Second, fmt.Sprintf("failed to start metrics receiver. %v", err)) - t.Logf("waiting for metrics...") - require.Eventuallyf(t, - func() bool { - return consumer.DataPointCount() >= expectedMetrics - }, 30*time.Second, 5*time.Second, - "expected metrics not received", - ) + scraperinttest.NewIntegrationTest( + NewFactory(), + scraperinttest.WithNetworkRequest( + testcontainers.NetworkRequest{ + Name: networkName, + CheckDuplicate: true, + }, + ), + scraperinttest.WithContainerRequest( + testcontainers.ContainerRequest{ + Name: "zookeeper", + Image: "ubuntu/zookeeper:3.1-22.04_beta", + Networks: []string{networkName}, + Hostname: zookeeperHost, + ExposedPorts: []string{zookeeperPort}, + WaitingFor: wait.ForAll( + wait.ForListeningPort(zookeeperPort).WithStartupTimeout(2 * time.Minute), + ), + }), + scraperinttest.WithContainerRequest( + testcontainers.ContainerRequest{ + Name: "kafka", + Image: "ubuntu/kafka:3.1-22.04_beta", + Networks: []string{networkName}, + ExposedPorts: []string{kafkaPort}, + Env: map[string]string{ + "ZOOKEEPER_HOST": zookeeperHost, + "ZOOKEEPER_PORT": zookeeperPort, + }, + WaitingFor: wait.ForAll( + wait.ForListeningPort(kafkaPort).WithStartupTimeout(2 * time.Minute), + ), + }), + scraperinttest.WithCustomConfig( + func(t *testing.T, cfg component.Config, ci *scraperinttest.ContainerInfo) { + rCfg := cfg.(*Config) + rCfg.CollectionInterval = 5 * time.Second + rCfg.Brokers = []string{fmt.Sprintf("%s:%s", + ci.HostForNamedContainer(t, "kafka"), + ci.MappedPortForNamedContainer(t, "kafka", kafkaPort))} + rCfg.Scrapers = []string{"brokers", "consumers", "topics"} + }), + // scraperinttest.WriteExpected(), // TODO remove + scraperinttest.WithCompareOptions( + // pmetrictest.IgnoreMetricValues(), + pmetrictest.IgnoreStartTimestamp(), + pmetrictest.IgnoreTimestamp(), + ), + ).Run(t) } diff --git a/receiver/kafkametricsreceiver/testdata/integration/expected.yaml b/receiver/kafkametricsreceiver/testdata/integration/expected.yaml new file mode 100644 index 0000000000000..baf81546fd9e1 --- /dev/null +++ b/receiver/kafkametricsreceiver/testdata/integration/expected.yaml @@ -0,0 +1,15 @@ +resourceMetrics: + - resource: {} + scopeMetrics: + - metrics: + - description: Number of brokers in the cluster. + gauge: + dataPoints: + - asInt: "1" + startTimeUnixNano: "1685063120199110000" + timeUnixNano: "1685063125236251000" + name: kafka.brokers + unit: '{brokers}' + scope: + name: otelcol/kafkametricsreceiver + version: latest