Skip to content

Commit

Permalink
feat: adding support for Kafka on Agent (#3226)
Browse files Browse the repository at this point in the history
* feat: adding support for Kafka on Agent

* fix: added nil guard on Kafka response proto
  • Loading branch information
danielbdias committed Oct 10, 2023
1 parent a8cfe95 commit ab7381a
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 1 deletion.
2 changes: 1 addition & 1 deletion agent/proto/orchestrator.proto
Expand Up @@ -178,7 +178,7 @@ message TraceIdResponse {

message DataStoreConnectionTestRequest {
string requestID = 1;
DataStore datastore = 5;
DataStore datastore = 2;
}

message DataStoreConnectionTestResponse {
Expand Down
50 changes: 50 additions & 0 deletions agent/workers/trigger.go
Expand Up @@ -36,6 +36,7 @@ func NewTriggerWorker(client *client.Client, opts ...TriggerOption) *TriggerWork
registry.Add(agentTrigger.HTTP())
registry.Add(agentTrigger.GRPC())
registry.Add(agentTrigger.TRACEID())
registry.Add(agentTrigger.KAFKA())

worker := &TriggerWorker{
client: client,
Expand Down Expand Up @@ -94,6 +95,7 @@ func convertProtoToTrigger(pt *proto.Trigger) trigger.Trigger {
HTTP: convertProtoHttpTriggerToHttpTrigger(pt.Http),
GRPC: convertProtoGrpcTriggerToGrpcTrigger(pt.Grpc),
TraceID: convertProtoTraceIDTriggerToTraceIDTrigger(pt.TraceID),
Kafka: convertProtoKafkaTriggerToKafkaTrigger(pt.Kafka),
}
}

Expand Down Expand Up @@ -189,6 +191,42 @@ func convertProtoTraceIDTriggerToTraceIDTrigger(traceIDRequest *proto.TraceIDReq
}
}

func convertProtoKafkaTriggerToKafkaTrigger(kafkaRequest *proto.KafkaRequest) *trigger.KafkaRequest {
if kafkaRequest == nil {
return nil
}

headers := make([]trigger.KafkaMessageHeader, len(kafkaRequest.Headers))

for i, h := range headers {
headers[i] = trigger.KafkaMessageHeader{Key: h.Key, Value: h.Value}
}

return &trigger.KafkaRequest{
BrokerURLs: kafkaRequest.BrokerUrls,
Topic: kafkaRequest.Topic,
Headers: headers,
Authentication: convertProtoKafkaAuthToKafkaAuth(kafkaRequest.Authentication),
MessageKey: kafkaRequest.MessageKey,
MessageValue: kafkaRequest.MessageValue,
SSLVerification: kafkaRequest.SslVerification,
}
}

func convertProtoKafkaAuthToKafkaAuth(kafkaAuthentication *proto.KafkaAuthentication) *trigger.KafkaAuthenticator {
if kafkaAuthentication == nil {
return nil
}

return &trigger.KafkaAuthenticator{
Type: kafkaAuthentication.Type,
Plain: &trigger.KafkaPlainAuthenticator{
Username: kafkaAuthentication.Plain.Username,
Password: kafkaAuthentication.Plain.Password,
},
}
}

func convertResponseToProtoResponse(request *proto.TriggerRequest, response agentTrigger.Response) *proto.TriggerResponse {
return &proto.TriggerResponse{
TestID: request.TestID,
Expand All @@ -198,6 +236,7 @@ func convertResponseToProtoResponse(request *proto.TriggerRequest, response agen
Http: convertHttpResponseToProto(response.Result.HTTP),
Grpc: convertGrpcResponseToProto(response.Result.GRPC),
TraceID: convertTraceIDResponseToProto(response.Result.TraceID),
Kafka: convertKafkaResponseToProto(response.Result.Kafka),
},
}
}
Expand Down Expand Up @@ -246,3 +285,14 @@ func convertTraceIDResponseToProto(traceID *trigger.TraceIDResponse) *proto.Trac
Id: traceID.ID,
}
}

func convertKafkaResponseToProto(kafka *trigger.KafkaResponse) *proto.KafkaResponse {
if kafka == nil || kafka.Offset == "" {
return nil
}

return &proto.KafkaResponse{
Partition: kafka.Partition,
Offset: kafka.Offset,
}
}
76 changes: 76 additions & 0 deletions agent/workers/trigger/kafka.go
@@ -0,0 +1,76 @@
package trigger

import (
"context"
"fmt"

"github.com/kubeshop/tracetest/server/pkg/kafka"
"github.com/kubeshop/tracetest/server/test/trigger"
"go.opentelemetry.io/otel/propagation"
)

func KAFKA() Triggerer {
return &kafkaTriggerer{}
}

type kafkaTriggerer struct{}

func (te *kafkaTriggerer) Trigger(ctx context.Context, triggerConfig trigger.Trigger, opts *Options) (Response, error) {
response := Response{
Result: trigger.TriggerResult{
Type: te.Type(),
},
}

kafkaTriggerRequest := triggerConfig.Kafka
kafkaConfig := te.getConfig(kafkaTriggerRequest)

kafkaProducer, err := kafka.GetProducer(kafkaConfig)
if err != nil {
return response, fmt.Errorf("error when creating kafka producer: %w", err)
}
defer kafkaProducer.Close()

messageHeaders := kafkaTriggerRequest.GetHeaderAsMap()
propagators().Inject(ctx, propagation.MapCarrier(messageHeaders))

result, err := kafkaProducer.ProduceSyncMessage(ctx, kafkaTriggerRequest.MessageKey, kafkaTriggerRequest.MessageValue, messageHeaders)
if err != nil {
return response, fmt.Errorf("error when sending message to kafka producer: %w", err)
}

response.Result.Kafka = &trigger.KafkaResponse{
Partition: result.Partition,
Offset: result.Offset,
}

response.SpanAttributes = map[string]string{
"tracetest.run.trigger.kafka.partition": result.Partition,
"tracetest.run.trigger.kafka.offset": result.Offset,
}

return response, nil
}

func (t *kafkaTriggerer) Type() trigger.TriggerType {
return trigger.TriggerTypeKafka
}

func (t *kafkaTriggerer) getConfig(request *trigger.KafkaRequest) kafka.Config {
config := kafka.Config{
BrokerURLs: request.BrokerURLs,
Topic: request.Topic,
SSLVerification: request.SSLVerification,
}

if request.Authentication == nil || request.Authentication.Plain == nil {
return config
}

config.Authentication = &kafka.AuthenticationConfig{
Username: request.Authentication.Plain.Username,
Password: request.Authentication.Plain.Password,
}

return config
}

0 comments on commit ab7381a

Please sign in to comment.