Skip to content

Commit

Permalink
Add Kafka exporter and receiver configuration (#5703)
Browse files Browse the repository at this point in the history
<!--
!! Please DELETE this comment before posting.
We appreciate your contribution to the Jaeger project! πŸ‘‹πŸŽ‰
-->

## Which problem is this PR solving?
-  Resolves #4868 

## Description of the changes
- Add config files for replacement of jaeger-collector and
jaeger-ingester.
- Add the architecture for the integration tests for kafka. 
- Add e2e integration tests for Kafka. 

## How was this change tested?
- Manually ran the collector, ingester configs to check if the data is
being pushed and pulled the right way and verified the architecture.

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: joeyyy09 <menteharshith@gmail.com>
Signed-off-by: Harshith Mente <109957201+joeyyy09@users.noreply.github.com>
Signed-off-by: Yuri Shkuro <github@ysh.us>
Co-authored-by: Yuri Shkuro <yurishkuro@users.noreply.github.com>
Co-authored-by: Yuri Shkuro <github@ysh.us>
  • Loading branch information
3 people committed Jul 24, 2024
1 parent d8b2110 commit e437971
Show file tree
Hide file tree
Showing 12 changed files with 179 additions and 23 deletions.
28 changes: 28 additions & 0 deletions cmd/jaeger/collector-with-kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
service:
pipelines:
traces:
receivers: [otlp, jaeger]
processors: [batch]
exporters: [kafka]

receivers:
otlp:
protocols:
grpc:
http:
jaeger:
protocols:
grpc:
thrift_binary:
thrift_compact:
thrift_http:

processors:
batch:

exporters:
kafka:
brokers:
- localhost:9092
topic: "jaeger-spans"
encoding: otlp_proto
37 changes: 37 additions & 0 deletions cmd/jaeger/ingester-remote-storage.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
service:
extensions: [jaeger_storage, jaeger_query]
pipelines:
traces:
receivers: [kafka]
processors: [batch]
exporters: [jaeger_storage_exporter]
telemetry:
metrics:
address: 0.0.0.0:8889
logs:
level: debug

extensions:
jaeger_query:
trace_storage: some_storage

jaeger_storage:
backends:
some_storage:
memory:
max_traces: 100000

receivers:
kafka:
brokers:
- localhost:9092
topic: "jaeger-spans"
encoding: otlp_proto
initial_offset: earliest

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: some_storage
50 changes: 50 additions & 0 deletions cmd/jaeger/internal/integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,56 @@ flowchart LR
end
```

## Kafka Integration

The primary difference between the Kafka integration tests and other integration tests lies in the flow of data. In the standard tests, spans are written by the SpanWriter, sent through an RPC_client directly to a receiver, then to an exporter, and written to a storage backend. Spans are read by the SpanReader, which queries the jaeger_query process accessing the storage backend. In contrast, the Kafka tests introduce Kafka as an intermediary. Spans go from the SpanWriter through an RPC_client to an OTLP receiver in the Jaeger Collector, exported to Kafka, received by the Jaeger Ingester, and then stored. For details, see the [Architecture](#KafkaArchitecture) section below.


## Kafka Architecture

``` mermaid
flowchart LR
Test -->|writeSpan| SpanWriter
SpanWriter --> RPCW[RPC_client]
RPCW --> OTLP_Receiver[Receiver]
OTLP_Receiver --> CollectorExporter[Kafka Exporter]
CollectorExporter --> Kafka[Kafka]
Kafka --> IngesterReceiver[Kafka Receiver]
IngesterReceiver --> IngesterExporter[Exporter]
IngesterExporter --> StorageBackend[(In-Memory Store)]
Test -->|readSpan| SpanReader
SpanReader --> RPCR[RPC_client]
RPCR --> QueryProcess[jaeger_query]
StorageCleaner -->|purge| StorageBackend
QueryProcess --> StorageBackend
subgraph Integration_Test_Executable
Test
SpanWriter
SpanReader
RPCW
RPCR
end
subgraph Jaeger Collector
OTLP_Receiver
CollectorExporter
end
subgraph Jaeger Ingester
IngesterReceiver
IngesterExporter
QueryProcess
StorageBackend
StorageCleaner[Storage Cleaner Extension]
end
subgraph Kafka
Topic
end
```

## Running tests locally

All integration tests can be run locally.
Expand Down
3 changes: 0 additions & 3 deletions cmd/jaeger/internal/integration/badger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,5 @@ func TestBadgerStorage(t *testing.T) {
},
}
s.e2eInitialize(t, "badger")
t.Cleanup(func() {
s.e2eCleanUp(t)
})
s.RunAll(t)
}
3 changes: 0 additions & 3 deletions cmd/jaeger/internal/integration/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,5 @@ func TestCassandraStorage(t *testing.T) {
},
}
s.e2eInitialize(t, "cassandra")
t.Cleanup(func() {
s.e2eCleanUp(t)
})
s.RunSpanStoreTests(t)
}
29 changes: 23 additions & 6 deletions cmd/jaeger/internal/integration/e2e_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const otlpPort = 4317
// - At last, clean up anything declared in its own test functions.
// (e.g. close remote-storage)
type E2EStorageIntegration struct {
SkipStorageCleaner bool
integration.StorageIntegration
ConfigFile string
}
Expand All @@ -46,7 +47,10 @@ type E2EStorageIntegration struct {
// This function should be called before any of the tests start.
func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
configFile := createStorageCleanerConfig(t, s.ConfigFile, storage)
configFile := s.ConfigFile
if !s.SkipStorageCleaner {
configFile = createStorageCleanerConfig(t, s.ConfigFile, storage)
}
t.Logf("Starting Jaeger-v2 in the background with config file %s", configFile)

outFile, err := os.OpenFile(
Expand Down Expand Up @@ -93,19 +97,27 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {
}, 30*time.Second, 500*time.Millisecond, "Jaeger-v2 did not start")
t.Log("Jaeger-v2 is ready")
t.Cleanup(func() {
require.NoError(t, cmd.Process.Kill())
if err := cmd.Process.Kill(); err != nil {
t.Errorf("Failed to kill Jaeger-v2 process: %v", err)
}
if t.Failed() {
// A Github Actions special annotation to create a foldable section
// in the Github runner output.
// https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#grouping-log-lines
fmt.Println("::group::🚧 🚧 🚧 Jaeger-v2 binary logs")
outLogs, err := os.ReadFile(outFile.Name())
require.NoError(t, err)
fmt.Printf("🚧 🚧 🚧 Jaeger-v2 output logs:\n%s", outLogs)
if err != nil {
t.Errorf("Failed to read output logs: %v", err)
} else {
fmt.Printf("🚧 🚧 🚧 Jaeger-v2 output logs:\n%s", outLogs)
}

errLogs, err := os.ReadFile(errFile.Name())
require.NoError(t, err)
fmt.Printf("🚧 🚧 🚧 Jaeger-v2 error logs:\n%s", errLogs)
if err != nil {
t.Errorf("Failed to read error logs: %v", err)
} else {
fmt.Printf("🚧 🚧 🚧 Jaeger-v2 error logs:\n%s", errLogs)
}
// End of Github Actions foldable section annotation.
fmt.Println("::endgroup::")
}
Expand All @@ -115,6 +127,11 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {
require.NoError(t, err)
s.SpanReader, err = createSpanReader(logger, ports.QueryGRPC)
require.NoError(t, err)

t.Cleanup(func() {
// Call e2eCleanUp to close the SpanReader and SpanWriter gRPC connection.
s.e2eCleanUp(t)
})
}

// e2eCleanUp closes the SpanReader and SpanWriter gRPC connection.
Expand Down
3 changes: 0 additions & 3 deletions cmd/jaeger/internal/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,5 @@ func TestElasticsearchStorage(t *testing.T) {
},
}
s.e2eInitialize(t, "elasticsearch")
t.Cleanup(func() {
s.e2eCleanUp(t)
})
s.RunSpanStoreTests(t)
}
1 change: 0 additions & 1 deletion cmd/jaeger/internal/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func TestGRPCStorage(t *testing.T) {
s.initialize(t)
s.e2eInitialize(t, "grpc")
t.Cleanup(func() {
s.e2eCleanUp(t)
s.remoteStorage.Close(t)
})
s.RunSpanStoreTests(t)
Expand Down
40 changes: 40 additions & 0 deletions cmd/jaeger/internal/integration/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package integration

import (
"testing"

"github.com/jaegertracing/jaeger/plugin/storage/integration"
)

func TestKafkaStorage(t *testing.T) {
integration.SkipUnlessEnv(t, "kafka")

collectorConfig := "../../collector-with-kafka.yaml"
ingesterConfig := "../../ingester-remote-storage.yaml"

collector := &E2EStorageIntegration{
SkipStorageCleaner: true,
ConfigFile: collectorConfig,
}

// Initialize and start the collector
collector.e2eInitialize(t, "kafka")

ingester := &E2EStorageIntegration{
ConfigFile: ingesterConfig,
StorageIntegration: integration.StorageIntegration{
CleanUp: purge,
GetDependenciesReturnsSource: true,
SkipArchiveTest: true,
},
}

// Initialize and start the ingester
ingester.e2eInitialize(t, "kafka")

// Run the span store tests
ingester.RunSpanStoreTests(t)
}
3 changes: 0 additions & 3 deletions cmd/jaeger/internal/integration/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,5 @@ func TestMemoryStorage(t *testing.T) {
},
}
s.e2eInitialize(t, "memory")
t.Cleanup(func() {
s.e2eCleanUp(t)
})
s.RunAll(t)
}
3 changes: 0 additions & 3 deletions cmd/jaeger/internal/integration/opensearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,5 @@ func TestOpenSearchStorage(t *testing.T) {
},
}
s.e2eInitialize(t, "opensearch")
t.Cleanup(func() {
s.e2eCleanUp(t)
})
s.RunSpanStoreTests(t)
}
2 changes: 1 addition & 1 deletion plugin/storage/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var fixtures embed.FS
// - in those functions it instantiates and populates this struct
// - it then calls RunAll.
//
// Some implementations may declate multuple tests, with different settings,
// Some implementations may declare multiple tests, with different settings,
// and RunAll() under different conditions.
type StorageIntegration struct {
SpanWriter spanstore.Writer
Expand Down

0 comments on commit e437971

Please sign in to comment.