From e43797133ad7840543cc41a89f178f37e93e044e Mon Sep 17 00:00:00 2001 From: Harshith Mente <109957201+joeyyy09@users.noreply.github.com> Date: Thu, 25 Jul 2024 02:35:16 +0530 Subject: [PATCH] Add Kafka exporter and receiver configuration (#5703) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Which problem is this PR solving? - Resolves #4868 ## Description of the changes - Add config files for replacement of jaeger-collector and jaeger-ingester. - Add the architecture for the integration tests for kafka. - Add e2e integration tests for Kafka. ## How was this change tested? - Manually ran the collector, ingester configs to check if the data is being pushed and pulled the right way and verified the architecture. ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: joeyyy09 Signed-off-by: Harshith Mente <109957201+joeyyy09@users.noreply.github.com> Signed-off-by: Yuri Shkuro Co-authored-by: Yuri Shkuro Co-authored-by: Yuri Shkuro --- cmd/jaeger/collector-with-kafka.yaml | 28 +++++++++++ cmd/jaeger/ingester-remote-storage.yaml | 37 ++++++++++++++ cmd/jaeger/internal/integration/README.md | 50 +++++++++++++++++++ .../internal/integration/badger_test.go | 3 -- .../internal/integration/cassandra_test.go | 3 -- .../internal/integration/e2e_integration.go | 29 ++++++++--- .../integration/elasticsearch_test.go | 3 -- cmd/jaeger/internal/integration/grpc_test.go | 1 - cmd/jaeger/internal/integration/kafka_test.go | 40 +++++++++++++++ .../internal/integration/memory_test.go | 3 -- .../internal/integration/opensearch_test.go | 3 -- plugin/storage/integration/integration.go | 2 +- 12 files changed, 179 insertions(+), 23 deletions(-) create mode 100644 cmd/jaeger/collector-with-kafka.yaml create mode 100644 cmd/jaeger/ingester-remote-storage.yaml create mode 100644 cmd/jaeger/internal/integration/kafka_test.go diff --git a/cmd/jaeger/collector-with-kafka.yaml b/cmd/jaeger/collector-with-kafka.yaml new file mode 100644 index 00000000000..fb16234bc8c --- /dev/null +++ b/cmd/jaeger/collector-with-kafka.yaml @@ -0,0 +1,28 @@ +service: + pipelines: + traces: + receivers: [otlp, jaeger] + processors: [batch] + exporters: [kafka] + +receivers: + otlp: + protocols: + grpc: + http: + jaeger: + protocols: + grpc: + thrift_binary: + thrift_compact: + thrift_http: + +processors: + batch: + +exporters: + kafka: + brokers: + - localhost:9092 + topic: "jaeger-spans" + encoding: otlp_proto diff --git a/cmd/jaeger/ingester-remote-storage.yaml b/cmd/jaeger/ingester-remote-storage.yaml new file mode 100644 index 00000000000..2f200c6533a --- /dev/null +++ b/cmd/jaeger/ingester-remote-storage.yaml @@ -0,0 +1,37 @@ +service: + extensions: [jaeger_storage, jaeger_query] + pipelines: + traces: + receivers: [kafka] + processors: [batch] + exporters: [jaeger_storage_exporter] + telemetry: + metrics: + address: 0.0.0.0:8889 + logs: + level: debug + +extensions: + jaeger_query: + trace_storage: some_storage + + jaeger_storage: + backends: + some_storage: + memory: + max_traces: 100000 + +receivers: + kafka: + brokers: + - localhost:9092 + topic: "jaeger-spans" + encoding: otlp_proto + initial_offset: earliest + +processors: + batch: + +exporters: + jaeger_storage_exporter: + trace_storage: some_storage diff --git a/cmd/jaeger/internal/integration/README.md b/cmd/jaeger/internal/integration/README.md index 3c97bfddbd0..9237aac4dcf 100644 --- a/cmd/jaeger/internal/integration/README.md +++ b/cmd/jaeger/internal/integration/README.md @@ -63,6 +63,56 @@ flowchart LR end ``` +## Kafka Integration + +The primary difference between the Kafka integration tests and other integration tests lies in the flow of data. In the standard tests, spans are written by the SpanWriter, sent through an RPC_client directly to a receiver, then to an exporter, and written to a storage backend. Spans are read by the SpanReader, which queries the jaeger_query process accessing the storage backend. In contrast, the Kafka tests introduce Kafka as an intermediary. Spans go from the SpanWriter through an RPC_client to an OTLP receiver in the Jaeger Collector, exported to Kafka, received by the Jaeger Ingester, and then stored. For details, see the [Architecture](#KafkaArchitecture) section below. + + +## Kafka Architecture + +``` mermaid +flowchart LR + Test -->|writeSpan| SpanWriter + SpanWriter --> RPCW[RPC_client] + RPCW --> OTLP_Receiver[Receiver] + OTLP_Receiver --> CollectorExporter[Kafka Exporter] + CollectorExporter --> Kafka[Kafka] + Kafka --> IngesterReceiver[Kafka Receiver] + IngesterReceiver --> IngesterExporter[Exporter] + IngesterExporter --> StorageBackend[(In-Memory Store)] + Test -->|readSpan| SpanReader + SpanReader --> RPCR[RPC_client] + RPCR --> QueryProcess[jaeger_query] + StorageCleaner -->|purge| StorageBackend + QueryProcess --> StorageBackend + + + subgraph Integration_Test_Executable + Test + SpanWriter + SpanReader + RPCW + RPCR + end + + subgraph Jaeger Collector + OTLP_Receiver + CollectorExporter + end + + subgraph Jaeger Ingester + IngesterReceiver + IngesterExporter + QueryProcess + StorageBackend + StorageCleaner[Storage Cleaner Extension] + end + + subgraph Kafka + Topic + end +``` + ## Running tests locally All integration tests can be run locally. diff --git a/cmd/jaeger/internal/integration/badger_test.go b/cmd/jaeger/internal/integration/badger_test.go index a0fe00f669a..b5c62d5c21a 100644 --- a/cmd/jaeger/internal/integration/badger_test.go +++ b/cmd/jaeger/internal/integration/badger_test.go @@ -24,8 +24,5 @@ func TestBadgerStorage(t *testing.T) { }, } s.e2eInitialize(t, "badger") - t.Cleanup(func() { - s.e2eCleanUp(t) - }) s.RunAll(t) } diff --git a/cmd/jaeger/internal/integration/cassandra_test.go b/cmd/jaeger/internal/integration/cassandra_test.go index 0ac74c13149..a78f66b3ec9 100644 --- a/cmd/jaeger/internal/integration/cassandra_test.go +++ b/cmd/jaeger/internal/integration/cassandra_test.go @@ -22,8 +22,5 @@ func TestCassandraStorage(t *testing.T) { }, } s.e2eInitialize(t, "cassandra") - t.Cleanup(func() { - s.e2eCleanUp(t) - }) s.RunSpanStoreTests(t) } diff --git a/cmd/jaeger/internal/integration/e2e_integration.go b/cmd/jaeger/internal/integration/e2e_integration.go index 670b7b576d4..4bf341a2a8f 100644 --- a/cmd/jaeger/internal/integration/e2e_integration.go +++ b/cmd/jaeger/internal/integration/e2e_integration.go @@ -37,6 +37,7 @@ const otlpPort = 4317 // - At last, clean up anything declared in its own test functions. // (e.g. close remote-storage) type E2EStorageIntegration struct { + SkipStorageCleaner bool integration.StorageIntegration ConfigFile string } @@ -46,7 +47,10 @@ type E2EStorageIntegration struct { // This function should be called before any of the tests start. func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) - configFile := createStorageCleanerConfig(t, s.ConfigFile, storage) + configFile := s.ConfigFile + if !s.SkipStorageCleaner { + configFile = createStorageCleanerConfig(t, s.ConfigFile, storage) + } t.Logf("Starting Jaeger-v2 in the background with config file %s", configFile) outFile, err := os.OpenFile( @@ -93,19 +97,27 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) { }, 30*time.Second, 500*time.Millisecond, "Jaeger-v2 did not start") t.Log("Jaeger-v2 is ready") t.Cleanup(func() { - require.NoError(t, cmd.Process.Kill()) + if err := cmd.Process.Kill(); err != nil { + t.Errorf("Failed to kill Jaeger-v2 process: %v", err) + } if t.Failed() { // A Github Actions special annotation to create a foldable section // in the Github runner output. // https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#grouping-log-lines fmt.Println("::group::🚧 🚧 🚧 Jaeger-v2 binary logs") outLogs, err := os.ReadFile(outFile.Name()) - require.NoError(t, err) - fmt.Printf("🚧 🚧 🚧 Jaeger-v2 output logs:\n%s", outLogs) + if err != nil { + t.Errorf("Failed to read output logs: %v", err) + } else { + fmt.Printf("🚧 🚧 🚧 Jaeger-v2 output logs:\n%s", outLogs) + } errLogs, err := os.ReadFile(errFile.Name()) - require.NoError(t, err) - fmt.Printf("🚧 🚧 🚧 Jaeger-v2 error logs:\n%s", errLogs) + if err != nil { + t.Errorf("Failed to read error logs: %v", err) + } else { + fmt.Printf("🚧 🚧 🚧 Jaeger-v2 error logs:\n%s", errLogs) + } // End of Github Actions foldable section annotation. fmt.Println("::endgroup::") } @@ -115,6 +127,11 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) { require.NoError(t, err) s.SpanReader, err = createSpanReader(logger, ports.QueryGRPC) require.NoError(t, err) + + t.Cleanup(func() { + // Call e2eCleanUp to close the SpanReader and SpanWriter gRPC connection. + s.e2eCleanUp(t) + }) } // e2eCleanUp closes the SpanReader and SpanWriter gRPC connection. diff --git a/cmd/jaeger/internal/integration/elasticsearch_test.go b/cmd/jaeger/internal/integration/elasticsearch_test.go index 2c78bfdb7af..844c4866321 100644 --- a/cmd/jaeger/internal/integration/elasticsearch_test.go +++ b/cmd/jaeger/internal/integration/elasticsearch_test.go @@ -21,8 +21,5 @@ func TestElasticsearchStorage(t *testing.T) { }, } s.e2eInitialize(t, "elasticsearch") - t.Cleanup(func() { - s.e2eCleanUp(t) - }) s.RunSpanStoreTests(t) } diff --git a/cmd/jaeger/internal/integration/grpc_test.go b/cmd/jaeger/internal/integration/grpc_test.go index b33b7b002a0..cd12984bc62 100644 --- a/cmd/jaeger/internal/integration/grpc_test.go +++ b/cmd/jaeger/internal/integration/grpc_test.go @@ -36,7 +36,6 @@ func TestGRPCStorage(t *testing.T) { s.initialize(t) s.e2eInitialize(t, "grpc") t.Cleanup(func() { - s.e2eCleanUp(t) s.remoteStorage.Close(t) }) s.RunSpanStoreTests(t) diff --git a/cmd/jaeger/internal/integration/kafka_test.go b/cmd/jaeger/internal/integration/kafka_test.go new file mode 100644 index 00000000000..099bddba3e1 --- /dev/null +++ b/cmd/jaeger/internal/integration/kafka_test.go @@ -0,0 +1,40 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package integration + +import ( + "testing" + + "github.com/jaegertracing/jaeger/plugin/storage/integration" +) + +func TestKafkaStorage(t *testing.T) { + integration.SkipUnlessEnv(t, "kafka") + + collectorConfig := "../../collector-with-kafka.yaml" + ingesterConfig := "../../ingester-remote-storage.yaml" + + collector := &E2EStorageIntegration{ + SkipStorageCleaner: true, + ConfigFile: collectorConfig, + } + + // Initialize and start the collector + collector.e2eInitialize(t, "kafka") + + ingester := &E2EStorageIntegration{ + ConfigFile: ingesterConfig, + StorageIntegration: integration.StorageIntegration{ + CleanUp: purge, + GetDependenciesReturnsSource: true, + SkipArchiveTest: true, + }, + } + + // Initialize and start the ingester + ingester.e2eInitialize(t, "kafka") + + // Run the span store tests + ingester.RunSpanStoreTests(t) +} diff --git a/cmd/jaeger/internal/integration/memory_test.go b/cmd/jaeger/internal/integration/memory_test.go index a2592b04a38..81f0bfa94fb 100644 --- a/cmd/jaeger/internal/integration/memory_test.go +++ b/cmd/jaeger/internal/integration/memory_test.go @@ -20,8 +20,5 @@ func TestMemoryStorage(t *testing.T) { }, } s.e2eInitialize(t, "memory") - t.Cleanup(func() { - s.e2eCleanUp(t) - }) s.RunAll(t) } diff --git a/cmd/jaeger/internal/integration/opensearch_test.go b/cmd/jaeger/internal/integration/opensearch_test.go index 909c0be6510..f164bfcda26 100644 --- a/cmd/jaeger/internal/integration/opensearch_test.go +++ b/cmd/jaeger/internal/integration/opensearch_test.go @@ -20,8 +20,5 @@ func TestOpenSearchStorage(t *testing.T) { }, } s.e2eInitialize(t, "opensearch") - t.Cleanup(func() { - s.e2eCleanUp(t) - }) s.RunSpanStoreTests(t) } diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index f9ca020c41d..8ed6d905b83 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -49,7 +49,7 @@ var fixtures embed.FS // - in those functions it instantiates and populates this struct // - it then calls RunAll. // -// Some implementations may declate multuple tests, with different settings, +// Some implementations may declare multiple tests, with different settings, // and RunAll() under different conditions. type StorageIntegration struct { SpanWriter spanstore.Writer