Skip to content

Commit

Permalink
[chore][scraperinttest] Add support for NetworkRequest (open-telemetr…
Browse files Browse the repository at this point in the history
…y#22790)

* [chore][scraperinttest] Add support for NetworkRequest and migrate flink and kafka
  • Loading branch information
djaglowski authored and Caleb-Hurshman committed Jul 6, 2023
1 parent 61e0cb6 commit 5e8e92f
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 306 deletions.
43 changes: 36 additions & 7 deletions internal/coreinternal/scraperinttest/scraperint.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func NewIntegrationTest(f receiver.Factory, opts ...TestOption) *IntegrationTest
}

type IntegrationTest struct {
networkRequest *testcontainers.NetworkRequest
containerRequests []testcontainers.ContainerRequest
createContainerTimeout time.Duration

Expand All @@ -61,6 +62,13 @@ type IntegrationTest struct {
func (it *IntegrationTest) Run(t *testing.T) {
it.validate(t)

if it.networkRequest != nil {
network := it.createNetwork(t)
defer func() {
require.NoError(t, network.Remove(context.Background()))
}()
}

ci := it.createContainers(t)
defer ci.terminate(t)

Expand Down Expand Up @@ -113,6 +121,26 @@ func (it *IntegrationTest) Run(t *testing.T) {
it.compareTimeout, it.compareTimeout/20)
}

func (it *IntegrationTest) createNetwork(t *testing.T) testcontainers.Network {
var errs error

var network testcontainers.Network
var err error
require.Eventuallyf(t, func() bool {
network, err = testcontainers.GenericNetwork(
context.Background(),
testcontainers.GenericNetworkRequest{
NetworkRequest: *it.networkRequest,
})
if err != nil {
errs = multierr.Append(errs, err)
return false
}
return true
}, it.createContainerTimeout, time.Second, "create network timeout: %v", errs)
return network
}

func (it *IntegrationTest) createContainers(t *testing.T) *ContainerInfo {
var wg sync.WaitGroup
ci := &ContainerInfo{
Expand All @@ -122,12 +150,7 @@ func (it *IntegrationTest) createContainers(t *testing.T) *ContainerInfo {
for _, cr := range it.containerRequests {
go func(req testcontainers.ContainerRequest) {
var errs error
defer func() {
if t.Failed() && errs != nil {
t.Errorf("create container: %v", errs)
}
}()
require.Eventually(t, func() bool {
require.Eventuallyf(t, func() bool {
c, err := testcontainers.GenericContainer(
context.Background(),
testcontainers.GenericContainerRequest{
Expand All @@ -140,7 +163,7 @@ func (it *IntegrationTest) createContainers(t *testing.T) *ContainerInfo {
}
ci.add(req.Name, c)
return true
}, it.createContainerTimeout, time.Second)
}, it.createContainerTimeout, time.Second, "create container timeout: %v", errs)
wg.Done()
}(cr)
}
Expand All @@ -165,6 +188,12 @@ func (it *IntegrationTest) validate(t *testing.T) {

type TestOption func(*IntegrationTest)

func WithNetworkRequest(nr testcontainers.NetworkRequest) TestOption {
return func(it *IntegrationTest) {
it.networkRequest = &nr
}
}

func WithContainerRequest(cr testcontainers.ContainerRequest) TestOption {
return func(it *IntegrationTest) {
it.containerRequests = append(it.containerRequests, cr)
Expand Down
237 changes: 65 additions & 172 deletions receiver/flinkmetricsreceiver/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,191 +7,84 @@
package flinkmetricsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver"

import (
"context"
"fmt"
"io"
"net"
"net/http"
"os"
"path"
"path/filepath"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.opentelemetry.io/collector/component"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/scraperinttest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
)

func TestIntegration(t *testing.T) {
t.Parallel()
networkName := "new-network"
ctx := context.Background()
newNetwork, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{
NetworkRequest: testcontainers.NetworkRequest{
Name: networkName,
CheckDuplicate: true,
},
})
if err != nil {
require.NoError(t, err)
}
defer func() {
require.NoError(t, newNetwork.Remove(ctx))
}()
const (
networkName = "flink-network"
flinkPort = "8081"
)

masterContainer := getContainer(t, testcontainers.ContainerRequest{
FromDockerfile: testcontainers.FromDockerfile{
Context: path.Join("testdata", "integration"),
Dockerfile: "Dockerfile.flink-master",
},
Hostname: "flink-master",
Networks: []string{networkName},
ExposedPorts: []string{"8080:8080", "8081:8081"},
WaitingFor: waitStrategy{endpoint: "http://localhost:8081/jobmanager/metrics"},
LifecycleHooks: []testcontainers.ContainerLifecycleHooks{{
PostStarts: []testcontainers.ContainerHook{
scraperinttest.RunScript([]string{"/setup.sh"}),
func TestIntegration(t *testing.T) {
scraperinttest.NewIntegrationTest(
NewFactory(),
scraperinttest.WithNetworkRequest(
testcontainers.NetworkRequest{
Name: networkName,
CheckDuplicate: true,
},
}},
})

workerContainer := getContainer(t, testcontainers.ContainerRequest{
FromDockerfile: testcontainers.FromDockerfile{
Context: path.Join("testdata", "integration"),
Dockerfile: "Dockerfile.flink-worker",
},
Hostname: "worker",
Networks: []string{networkName},
})
defer func() {
require.NoError(t, masterContainer.Terminate(context.Background()))
}()
defer func() {
require.NoError(t, workerContainer.Terminate(context.Background()))
}()

hostname, err := masterContainer.Host(context.Background())
require.NoError(t, err)

// Required to start the taskmanager
ws := waitStrategy{"http://localhost:8081/taskmanagers/metrics"}
err = ws.waitFor(context.Background(), "")
require.NoError(t, err)

// Required to prevent empty value jobs call
ws = waitStrategy{endpoint: "http://localhost:8081/jobs"}
err = ws.waitFor(context.Background(), "")
require.NoError(t, err)

// Required to prevent empty value for job, operator and task metrics call
ws = waitStrategy{endpoint: "http://localhost:8081/jobs/metrics"}
err = ws.waitFor(context.Background(), "")
require.NoError(t, err)

// Override function to return deterministic field
defer func() { osHostname = os.Hostname }()
osHostname = func() (string, error) { return "job-localhost", nil }

// Override function to return deterministic field
defer func() { taskmanagerHost = strings.Split }()
taskmanagerHost = func(id string, sep string) []string { return []string{"taskmanager-localhost"} }

// Override function to return deterministic field
defer func() { taskmanagerID = reflect }()
taskmanagerID = func(id string) string { return "taskmanagerID" }

f := NewFactory()
cfg := f.CreateDefaultConfig().(*Config)
cfg.ScraperControllerSettings.CollectionInterval = 100 * time.Millisecond
cfg.Endpoint = fmt.Sprintf("http://%s", net.JoinHostPort(hostname, "8081"))

consumer := new(consumertest.MetricsSink)
settings := receivertest.NewNopCreateSettings()
rcvr, err := f.CreateMetricsReceiver(context.Background(), settings, cfg, consumer)
require.NoError(t, err, "failed creating metrics receiver")
require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))
require.Eventuallyf(t, func() bool {
return consumer.DataPointCount() > 0
}, 2*time.Minute, 1*time.Second, "failed to receive more than 0 metrics")
require.NoError(t, rcvr.Shutdown(context.Background()))

actualMetrics := consumer.AllMetrics()[0]
expectedFile := filepath.Join("testdata", "integration", "expected.yaml")
expectedMetrics, err := golden.ReadMetrics(expectedFile)
require.NoError(t, err)
require.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, actualMetrics, pmetrictest.IgnoreMetricValues(),
pmetrictest.IgnoreStartTimestamp(), pmetrictest.IgnoreTimestamp()))
}

func getContainer(t *testing.T, req testcontainers.ContainerRequest) testcontainers.Container {
require.NoError(t, req.Validate())
container, err := testcontainers.GenericContainer(
context.Background(),
testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
require.NoError(t, err)
return container
}

type waitStrategy struct {
endpoint string
}

func (ws waitStrategy) WaitUntilReady(ctx context.Context, st wait.StrategyTarget) error {
if err := wait.ForListeningPort("8081").
WithStartupTimeout(2*time.Minute).
WaitUntilReady(ctx, st); err != nil {
return err
}

hostname, err := st.Host(ctx)
if err != nil {
return err
}

return ws.waitFor(ctx, hostname)
}

// waitFor waits until an endpoint is ready with an id response.
func (ws waitStrategy) waitFor(ctx context.Context, _ string) error {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(5 * time.Second):
return fmt.Errorf("server startup problem")
case <-ticker.C:
resp, err := http.Get(ws.endpoint)
if err != nil {
continue
}

body, err := io.ReadAll(resp.Body)
if err != nil {
continue
}

if resp.Body.Close() != nil {
continue
}

// The server needs a moment to generate some stats
if strings.Contains(string(body), "id") {
return nil
}
}
}
),
scraperinttest.WithContainerRequest(testcontainers.ContainerRequest{
Image: "flink:1.17.0",
Name: "jobmanager",
Networks: []string{networkName},
ExposedPorts: []string{flinkPort},
Cmd: []string{"jobmanager"},
Env: map[string]string{
"FLINK_PROPERTIES": "jobmanager.rpc.address: jobmanager",
},
Files: []testcontainers.ContainerFile{{
HostFilePath: filepath.Join("testdata", "integration", "setup.sh"),
ContainerFilePath: "/setup.sh",
FileMode: 700,
}},
WaitingFor: wait.ForAll(
wait.ForListeningPort(flinkPort),
wait.ForHTTP("jobmanager/metrics").WithPort(flinkPort),
wait.ForHTTP("taskmanagers/metrics").WithPort(flinkPort),
),
LifecycleHooks: []testcontainers.ContainerLifecycleHooks{{
PostStarts: []testcontainers.ContainerHook{
scraperinttest.RunScript([]string{"/setup.sh"}),
},
}},
}),
scraperinttest.WithContainerRequest(testcontainers.ContainerRequest{
Image: "flink:1.17.0",
Name: "taskmanager",
Networks: []string{networkName},
Cmd: []string{"taskmanager"},
Env: map[string]string{
"FLINK_PROPERTIES": "jobmanager.rpc.address: jobmanager",
},
}),
scraperinttest.WithCustomConfig(func(t *testing.T, cfg component.Config, ci *scraperinttest.ContainerInfo) {
rCfg := cfg.(*Config)
rCfg.CollectionInterval = 100 * time.Millisecond
rCfg.Endpoint = fmt.Sprintf("http://%s",
net.JoinHostPort(
ci.HostForNamedContainer(t, "jobmanager"),
ci.MappedPortForNamedContainer(t, "jobmanager", flinkPort),
))
}),
scraperinttest.WithCompareOptions(
pmetrictest.IgnoreResourceAttributeValue("host.name"),
pmetrictest.IgnoreResourceAttributeValue("flink.task.name"),
pmetrictest.IgnoreResourceAttributeValue("flink.taskmanager.id"),
pmetrictest.IgnoreMetricValues(),
pmetrictest.IgnoreStartTimestamp(),
pmetrictest.IgnoreTimestamp(),
),
).Run(t)
}

This file was deleted.

This file was deleted.

42 changes: 0 additions & 42 deletions receiver/flinkmetricsreceiver/testdata/integration/expected.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,48 +37,6 @@ resourceMetrics:
timeUnixNano: "1656013053528397000"
isMonotonic: true
unit: ns
- description: The total number of collections that have occurred.
name: flink.jvm.gc.collections.count
sum:
aggregationTemporality: 2
dataPoints:
- asInt: "2"
attributes:
- key: name
value:
stringValue: PS_MarkSweep
startTimeUnixNano: "1656013053428227000"
timeUnixNano: "1656013053528397000"
- asInt: "2"
attributes:
- key: name
value:
stringValue: PS_Scavenge
startTimeUnixNano: "1656013053428227000"
timeUnixNano: "1656013053528397000"
isMonotonic: true
unit: '{collections}'
- description: The total time spent performing garbage collection.
name: flink.jvm.gc.collections.time
sum:
aggregationTemporality: 2
dataPoints:
- asInt: "117"
attributes:
- key: name
value:
stringValue: PS_MarkSweep
startTimeUnixNano: "1656013053428227000"
timeUnixNano: "1656013053528397000"
- asInt: "30"
attributes:
- key: name
value:
stringValue: PS_Scavenge
startTimeUnixNano: "1656013053428227000"
timeUnixNano: "1656013053528397000"
isMonotonic: true
unit: ms
- description: The total capacity of all buffers in the direct buffer pool.
name: flink.jvm.memory.direct.total_capacity
sum:
Expand Down
Loading

0 comments on commit 5e8e92f

Please sign in to comment.