diff --git a/agent/client/client.go b/agent/client/client.go index e9980b19c2..adf15dbcf8 100644 --- a/agent/client/client.go +++ b/agent/client/client.go @@ -13,6 +13,7 @@ import ( retry "github.com/avast/retry-go" "github.com/kubeshop/tracetest/agent/proto" + "go.uber.org/zap" "google.golang.org/grpc" ) @@ -41,6 +42,8 @@ type Client struct { sessionConfig *SessionConfig done chan bool + logger *zap.Logger + triggerListener func(context.Context, *proto.TriggerRequest) error pollListener func(context.Context, *proto.PollingRequest) error shutdownListener func(context.Context, *proto.ShutdownRequest) error diff --git a/agent/client/connector.go b/agent/client/connector.go index 0812f9a2a0..bf52526e8b 100644 --- a/agent/client/connector.go +++ b/agent/client/connector.go @@ -7,6 +7,7 @@ import ( "net" "time" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -24,6 +25,7 @@ func Connect(ctx context.Context, endpoint string, opts ...Option) (*Client, err pollListener: pollListener, shutdownListener: shutdownListener, dataStoreConnectionListener: dataStoreConnectionListener, + logger: zap.NewNop(), } for _, opt := range opts { diff --git a/agent/client/options.go b/agent/client/options.go index d5f71a6b45..02b85e5bf7 100644 --- a/agent/client/options.go +++ b/agent/client/options.go @@ -1,6 +1,10 @@ package client -import "time" +import ( + "time" + + "go.uber.org/zap" +) type Option func(*Client) @@ -21,3 +25,9 @@ func WithPingPeriod(period time.Duration) Option { c.config.PingPeriod = period } } + +func WithLogger(logger *zap.Logger) Option { + return func(c *Client) { + c.logger = logger + } +} diff --git a/agent/collector/cache.go b/agent/collector/cache.go index 279024d451..9760ba742c 100644 --- a/agent/collector/cache.go +++ b/agent/collector/cache.go @@ -9,7 +9,7 @@ import ( type TraceCache interface { Get(string) ([]*v1.Span, bool) - Set(string, []*v1.Span) + Append(string, []*v1.Span) } type traceCache struct { @@ -25,8 +25,8 @@ func (c *traceCache) Get(traceID string) ([]*v1.Span, bool) { return c.internalCache.Get(traceID) } -// Set implements TraceCache. -func (c *traceCache) Set(traceID string, spans []*v1.Span) { +// Append implements TraceCache. +func (c *traceCache) Append(traceID string, spans []*v1.Span) { c.mutex.Lock() defer c.mutex.Unlock() diff --git a/agent/collector/collector.go b/agent/collector/collector.go index 237ac4eb4e..96ece1ac91 100644 --- a/agent/collector/collector.go +++ b/agent/collector/collector.go @@ -10,6 +10,7 @@ import ( "github.com/kubeshop/tracetest/server/otlp" "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" ) type Config struct { @@ -34,6 +35,12 @@ func WithStartRemoteServer(startRemoteServer bool) CollectorOption { } } +func WithLogger(logger *zap.Logger) CollectorOption { + return func(ric *remoteIngesterConfig) { + ric.logger = logger + } +} + type collector struct { grpcServer stoppable httpServer stoppable @@ -47,8 +54,9 @@ func (c *collector) Stop() { func Start(ctx context.Context, config Config, tracer trace.Tracer, opts ...CollectorOption) (stoppable, error) { ingesterConfig := remoteIngesterConfig{ - URL: config.RemoteServerURL, - Token: config.RemoteServerToken, + URL: config.RemoteServerURL, + Token: config.RemoteServerToken, + logger: zap.NewNop(), } for _, opt := range opts { @@ -61,9 +69,12 @@ func Start(ctx context.Context, config Config, tracer trace.Tracer, opts ...Coll } grpcServer := otlp.NewGrpcServer(fmt.Sprintf("0.0.0.0:%d", config.GRPCPort), ingester, tracer) + grpcServer.SetLogger(ingesterConfig.logger) httpServer := otlp.NewHttpServer(fmt.Sprintf("0.0.0.0:%d", config.HTTPPort), ingester) + httpServer.SetLogger(ingesterConfig.logger) onProcessTermination(func() { + ingesterConfig.logger.Debug("Stopping collector") grpcServer.Stop() httpServer.Stop() if stoppableIngester, ok := ingester.(stoppable); ok { diff --git a/agent/collector/collector_test.go b/agent/collector/collector_test.go index b14ce182e0..18496d12bb 100644 --- a/agent/collector/collector_test.go +++ b/agent/collector/collector_test.go @@ -91,7 +91,7 @@ func TestCollectorWatchingSpansFromTest(t *testing.T) { ctx := trace.ContextWithSpanContext(context.Background(), spanContext) - cache.Set(watchedTraceID.String(), []*v1.Span{}) + cache.Append(watchedTraceID.String(), []*v1.Span{}) // 10 spans will be watched and stored in the cache func(ctx context.Context) { diff --git a/agent/collector/ingester.go b/agent/collector/ingester.go index 1434554393..0d9fc27307 100644 --- a/agent/collector/ingester.go +++ b/agent/collector/ingester.go @@ -11,6 +11,7 @@ import ( "go.opencensus.io/trace" pb "go.opentelemetry.io/proto/otlp/collector/trace/v1" v1 "go.opentelemetry.io/proto/otlp/trace/v1" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -19,13 +20,14 @@ type stoppable interface { Stop() } -func newForwardIngester(ctx context.Context, batchTimeout time.Duration, remoteIngesterConfig remoteIngesterConfig, startRemoteServer bool) (otlp.Ingester, error) { +func newForwardIngester(ctx context.Context, batchTimeout time.Duration, cfg remoteIngesterConfig, startRemoteServer bool) (otlp.Ingester, error) { ingester := &forwardIngester{ BatchTimeout: batchTimeout, - RemoteIngester: remoteIngesterConfig, + RemoteIngester: cfg, buffer: &buffer{}, done: make(chan bool), - traceCache: remoteIngesterConfig.traceCache, + traceCache: cfg.traceCache, + logger: cfg.logger, } if startRemoteServer { @@ -49,6 +51,7 @@ type forwardIngester struct { buffer *buffer done chan bool traceCache TraceCache + logger *zap.Logger } type remoteIngesterConfig struct { @@ -56,6 +59,7 @@ type remoteIngesterConfig struct { Token string traceCache TraceCache startRemoteServer bool + logger *zap.Logger } type buffer struct { @@ -67,8 +71,10 @@ func (i *forwardIngester) Ingest(ctx context.Context, request *pb.ExportTraceSer i.buffer.mutex.Lock() i.buffer.spans = append(i.buffer.spans, request.ResourceSpans...) i.buffer.mutex.Unlock() + i.logger.Debug("received spans", zap.Int("count", len(request.ResourceSpans))) if i.traceCache != nil { + i.logger.Debug("caching test spans") // In case of OTLP datastore, those spans will be polled from this cache instead // of a real datastore i.cacheTestSpans(request.ResourceSpans) @@ -84,6 +90,7 @@ func (i *forwardIngester) Ingest(ctx context.Context, request *pb.ExportTraceSer func (i *forwardIngester) connectToRemoteServer(ctx context.Context) error { conn, err := grpc.DialContext(ctx, i.RemoteIngester.URL, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { + i.logger.Error("could not connect to remote server", zap.Error(err)) return fmt.Errorf("could not connect to remote server: %w", err) } @@ -92,15 +99,19 @@ func (i *forwardIngester) connectToRemoteServer(ctx context.Context) error { } func (i *forwardIngester) startBatchWorker() { + i.logger.Debug("starting batch worker", zap.Duration("batch_timeout", i.BatchTimeout)) ticker := time.NewTicker(i.BatchTimeout) done := make(chan bool) for { select { case <-done: + i.logger.Debug("stopping batch worker") return case <-ticker.C: + i.logger.Debug("executing batch") err := i.executeBatch(context.Background()) if err != nil { + i.logger.Error("could not execute batch", zap.Error(err)) log.Println(err) } } @@ -114,14 +125,18 @@ func (i *forwardIngester) executeBatch(ctx context.Context) error { i.buffer.mutex.Unlock() if len(newSpans) == 0 { + i.logger.Debug("no spans to forward") return nil } err := i.forwardSpans(ctx, newSpans) if err != nil { + i.logger.Error("could not forward spans", zap.Error(err)) return err } + i.logger.Debug("successfully forwarded spans", zap.Int("count", len(newSpans))) + return nil } @@ -131,6 +146,7 @@ func (i *forwardIngester) forwardSpans(ctx context.Context, spans []*v1.Resource }) if err != nil { + i.logger.Error("could not forward spans to remote server", zap.Error(err)) return fmt.Errorf("could not forward spans to remote server: %w", err) } @@ -138,6 +154,7 @@ func (i *forwardIngester) forwardSpans(ctx context.Context, spans []*v1.Resource } func (i *forwardIngester) cacheTestSpans(resourceSpans []*v1.ResourceSpans) { + i.logger.Debug("caching test spans") spans := make(map[string][]*v1.Span) for _, resourceSpan := range resourceSpans { for _, scopedSpan := range resourceSpan.ScopeSpans { @@ -148,16 +165,21 @@ func (i *forwardIngester) cacheTestSpans(resourceSpans []*v1.ResourceSpans) { } } + i.logger.Debug("caching test spans", zap.Int("count", len(spans))) + for traceID, spans := range spans { if _, ok := i.traceCache.Get(traceID); !ok { + i.logger.Debug("traceID is not part of a test", zap.String("traceID", traceID)) // traceID is not part of a test continue } - i.traceCache.Set(traceID, spans) + i.traceCache.Append(traceID, spans) + i.logger.Debug("caching test spans", zap.String("traceID", traceID), zap.Int("count", len(spans))) } } func (i *forwardIngester) Stop() { + i.logger.Debug("stopping forward ingester") i.done <- true } diff --git a/agent/initialization/start.go b/agent/initialization/start.go index eb24aa8236..acdb9d97a5 100644 --- a/agent/initialization/start.go +++ b/agent/initialization/start.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "os" "github.com/kubeshop/tracetest/agent/client" "github.com/kubeshop/tracetest/agent/collector" @@ -12,14 +13,26 @@ import ( "github.com/kubeshop/tracetest/agent/workers" "github.com/kubeshop/tracetest/agent/workers/poller" "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" ) var ErrOtlpServerStart = errors.New("OTLP server start error") +var logger *zap.Logger + func NewClient(ctx context.Context, config config.Config, traceCache collector.TraceCache) (*client.Client, error) { + if enableLogging() { + var err error + logger, err = zap.NewDevelopment() + if err != nil { + return nil, fmt.Errorf("could not create logger: %w", err) + } + } + controlPlaneClient, err := client.Connect(ctx, config.ServerURL, client.WithAPIKey(config.APIKey), client.WithAgentName(config.Name), + client.WithLogger(logger), ) if err != nil { return nil, err @@ -31,6 +44,12 @@ func NewClient(ctx context.Context, config config.Config, traceCache collector.T )) dataStoreTestConnectionWorker := workers.NewTestConnectionWorker(controlPlaneClient) + if enableLogging() { + triggerWorker.SetLogger(logger) + pollingWorker.SetLogger(logger) + dataStoreTestConnectionWorker.SetLogger(logger) + } + controlPlaneClient.OnDataStoreTestConnectionRequest(dataStoreTestConnectionWorker.Test) controlPlaneClient.OnTriggerRequest(triggerWorker.Trigger) controlPlaneClient.OnPollingRequest(pollingWorker.Poll) @@ -73,10 +92,28 @@ func StartCollector(ctx context.Context, config config.Config, traceCache collec GRPCPort: config.OTLPServer.GRPCPort, } - _, err := collector.Start(ctx, collectorConfig, noopTracer, collector.WithTraceCache(traceCache), collector.WithStartRemoteServer(false)) + opts := []collector.CollectorOption{ + collector.WithTraceCache(traceCache), + collector.WithStartRemoteServer(false), + } + + if enableLogging() { + opts = append(opts, collector.WithLogger(logger)) + } + + _, err := collector.Start( + ctx, + collectorConfig, + noopTracer, + opts..., + ) if err != nil { return ErrOtlpServerStart } return nil } + +func enableLogging() bool { + return os.Getenv("TRACETEST_DEV") == "true" +} diff --git a/agent/workers/poller.go b/agent/workers/poller.go index fd1a0930cd..455c3c6d18 100644 --- a/agent/workers/poller.go +++ b/agent/workers/poller.go @@ -7,6 +7,7 @@ import ( "log" gocache "github.com/Code-Hex/go-generics-cache" + "github.com/davecgh/go-spew/spew" "github.com/fluidtruck/deepcopy" "github.com/kubeshop/tracetest/agent/client" "github.com/kubeshop/tracetest/agent/proto" @@ -15,6 +16,7 @@ import ( "github.com/kubeshop/tracetest/server/tracedb/connection" "github.com/kubeshop/tracetest/server/traces" "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" ) type PollerWorker struct { @@ -22,6 +24,7 @@ type PollerWorker struct { tracer trace.Tracer sentSpanIDs *gocache.Cache[string, bool] inmemoryDatastore tracedb.TraceDB + logger *zap.Logger } type PollerOption func(*PollerWorker) @@ -40,6 +43,7 @@ func NewPollerWorker(client *client.Client, opts ...PollerOption) *PollerWorker client: client, tracer: tracer, sentSpanIDs: gocache.New[string, bool](), + logger: zap.NewNop(), } for _, opt := range opts { @@ -49,10 +53,16 @@ func NewPollerWorker(client *client.Client, opts ...PollerOption) *PollerWorker return pollerWorker } +func (w *PollerWorker) SetLogger(logger *zap.Logger) { + w.logger = logger +} + func (w *PollerWorker) Poll(ctx context.Context, request *proto.PollingRequest) error { + w.logger.Debug("Received polling request", zap.Any("request", request)) err := w.poll(ctx, request) if err != nil { - sendErr := w.client.SendTrace(ctx, &proto.PollingResponse{ + w.logger.Error("Error polling", zap.Error(err)) + errorResponse := &proto.PollingResponse{ RequestID: request.RequestID, AgentIdentification: w.client.SessionConfiguration().AgentIdentification, TestID: request.GetTestID(), @@ -62,9 +72,13 @@ func (w *PollerWorker) Poll(ctx context.Context, request *proto.PollingRequest) Error: &proto.Error{ Message: err.Error(), }, - }) + } + + w.logger.Debug("Sending polling error", zap.Any("response", errorResponse)) + sendErr := w.client.SendTrace(ctx, errorResponse) if sendErr != nil { + w.logger.Error("Error sending polling error", zap.Error(sendErr)) return fmt.Errorf("could not report polling error back to the server: %w. Original error: %s", sendErr, err.Error()) } } @@ -73,23 +87,30 @@ func (w *PollerWorker) Poll(ctx context.Context, request *proto.PollingRequest) } func (w *PollerWorker) poll(ctx context.Context, request *proto.PollingRequest) error { + w.logger.Debug("Received polling request", zap.Any("request", request)) datastoreConfig, err := convertProtoToDataStore(request.Datastore) if err != nil { + w.logger.Error("Invalid datastore", zap.Error(err)) return err } + w.logger.Debug("Converted datastore", zap.Any("datastore", datastoreConfig)) if datastoreConfig == nil { + w.logger.Error("Invalid datastore: nil") return fmt.Errorf("invalid datastore: nil") } dsFactory := tracedb.Factory(nil) ds, err := dsFactory(*datastoreConfig) if err != nil { + w.logger.Error("Invalid datastore", zap.Error(err)) log.Printf("Invalid datastore: %s", err.Error()) return err } + w.logger.Debug("Created datastore", zap.Any("datastore", ds), zap.Bool("isOTLPBasedProvider", datastoreConfig.IsOTLPBasedProvider())) if datastoreConfig.IsOTLPBasedProvider() && w.inmemoryDatastore != nil { + w.logger.Debug("Using in-memory datastore") ds = w.inmemoryDatastore } @@ -101,33 +122,42 @@ func (w *PollerWorker) poll(ctx context.Context, request *proto.PollingRequest) } trace, err := ds.GetTraceByID(ctx, request.TraceID) - if err != nil { + w.logger.Error("Cannot get trace from datastore", zap.Error(err)) if !errors.Is(err, connection.ErrTraceNotFound) { - // let controlplane know we didn't find the trace + w.logger.Debug("error was %s was not %s", zap.Error(err), zap.Error(connection.ErrTraceNotFound)) log.Printf("cannot get trace from datastore: %s", err.Error()) return err } + w.logger.Debug("Trace not found") pollingResponse.TraceFound = false } else { + w.logger.Debug("Trace found") pollingResponse.TraceFound = true pollingResponse.Spans = convertTraceInToProtoSpans(trace) + w.logger.Debug("Converted trace", zap.Any("trace", trace), zap.Any("pollingResponse", spew.Sdump(pollingResponse))) // remove sent spans newSpans := make([]*proto.Span, 0, len(pollingResponse.Spans)) for _, span := range pollingResponse.Spans { runKey := fmt.Sprintf("%d-%s-%s", request.RunID, request.TestID, span.Id) + w.logger.Debug("Checking if span was already sent", zap.String("runKey", runKey)) _, alreadySent := w.sentSpanIDs.Get(runKey) if !alreadySent { + w.logger.Debug("Span was not sent", zap.String("runKey", runKey)) newSpans = append(newSpans, span) + } else { + w.logger.Debug("Span was already sent", zap.String("runKey", runKey)) } } pollingResponse.Spans = newSpans + w.logger.Debug("Filtered spans", zap.Any("pollingResponse", spew.Sdump(pollingResponse))) } err = w.client.SendTrace(ctx, pollingResponse) if err != nil { + w.logger.Error("Cannot send trace to server", zap.Error(err)) log.Printf("cannot send trace to server: %s", err.Error()) return err } @@ -135,6 +165,7 @@ func (w *PollerWorker) poll(ctx context.Context, request *proto.PollingRequest) // mark spans as sent for _, span := range pollingResponse.Spans { runKey := fmt.Sprintf("%d-%s-%s", request.RunID, request.TestID, span.Id) + w.logger.Debug("Marking span as sent", zap.String("runKey", runKey)) // TODO: we can set the expiration for this key to be // 1 second after the pollingProfile max waiting time // but we need to get that info here from controlplane diff --git a/agent/workers/poller_test.go b/agent/workers/poller_test.go index b63b287e5c..d90583d950 100644 --- a/agent/workers/poller_test.go +++ b/agent/workers/poller_test.go @@ -161,7 +161,7 @@ func TestPollerWorkerWithInmemoryDatastore(t *testing.T) { span1ID := id.NewRandGenerator().SpanID() span2ID := id.NewRandGenerator().SpanID() - cache.Set(traceID.String(), []*v1.Span{ + cache.Append(traceID.String(), []*v1.Span{ {Name: "span 1", ParentSpanId: nil, SpanId: span1ID[:], TraceId: traceID[:]}, {Name: "span 2", ParentSpanId: span1ID[:], SpanId: span2ID[:], TraceId: traceID[:]}, }) diff --git a/agent/workers/testconnnection.go b/agent/workers/testconnnection.go index 2e4a7d98c6..2ce1315d50 100644 --- a/agent/workers/testconnnection.go +++ b/agent/workers/testconnnection.go @@ -10,11 +10,13 @@ import ( "github.com/kubeshop/tracetest/server/model" "github.com/kubeshop/tracetest/server/tracedb" "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" ) type TestConnectionWorker struct { client *client.Client tracer trace.Tracer + logger *zap.Logger } func NewTestConnectionWorker(client *client.Client) *TestConnectionWorker { @@ -24,42 +26,65 @@ func NewTestConnectionWorker(client *client.Client) *TestConnectionWorker { return &TestConnectionWorker{ client: client, tracer: tracer, + logger: zap.NewNop(), } } +func (w *TestConnectionWorker) SetLogger(logger *zap.Logger) { + w.logger = logger +} + func (w *TestConnectionWorker) Test(ctx context.Context, request *proto.DataStoreConnectionTestRequest) error { - fmt.Println("Data Store Test Connection handled by agent") + w.logger.Debug("Received datastore connection test request") datastoreConfig, err := convertProtoToDataStore(request.Datastore) if err != nil { + w.logger.Error("Invalid datastore", zap.Error(err)) return err } + w.logger.Debug("Converted datastore", zap.Any("datastore", datastoreConfig)) if datastoreConfig == nil { + w.logger.Error("Invalid datastore: nil") return fmt.Errorf("invalid datastore: nil") } dsFactory := tracedb.Factory(nil) ds, err := dsFactory(*datastoreConfig) if err != nil { + w.logger.Error("Invalid datastore", zap.Error(err)) log.Printf("Invalid datastore: %s", err.Error()) return err } + w.logger.Debug("Created datastore", zap.Any("datastore", ds)) + + response := &proto.DataStoreConnectionTestResponse{ + RequestID: request.RequestID, + Successful: false, + Steps: nil, + } if testableTraceDB, ok := ds.(tracedb.TestableTraceDB); ok { + w.logger.Debug("Datastore is testable") connectionResult := testableTraceDB.TestConnection(ctx) + w.logger.Debug("Tested datastore", zap.Any("connectionResult", connectionResult)) success, steps := convertConnectionResultToProto(connectionResult) + w.logger.Debug("Converted connection result", zap.Bool("success", success), zap.Any("steps", steps)) - w.client.SendDataStoreConnectionResult(ctx, &proto.DataStoreConnectionTestResponse{ + response = &proto.DataStoreConnectionTestResponse{ RequestID: request.RequestID, Successful: success, Steps: steps, - }) + } } else { - w.client.SendDataStoreConnectionResult(ctx, &proto.DataStoreConnectionTestResponse{ - RequestID: request.RequestID, - Successful: false, - Steps: nil, - }) + w.logger.Debug("Datastore is not testable") + } + + w.logger.Debug("Sending datastore connection test result", zap.Any("response", response)) + err = w.client.SendDataStoreConnectionResult(ctx, response) + if err != nil { + w.logger.Error("Could not send datastore connection test result", zap.Error(err)) + } else { + w.logger.Debug("Sent datastore connection test result") } return nil diff --git a/agent/workers/trigger.go b/agent/workers/trigger.go index add27f0341..5a631e4da2 100644 --- a/agent/workers/trigger.go +++ b/agent/workers/trigger.go @@ -12,9 +12,11 @@ import ( "github.com/kubeshop/tracetest/server/test/trigger" "go.opentelemetry.io/otel/trace" v1 "go.opentelemetry.io/proto/otlp/trace/v1" + "go.uber.org/zap" ) type TriggerWorker struct { + logger *zap.Logger client *client.Client registry *agentTrigger.Registry traceCache collector.TraceCache @@ -41,6 +43,7 @@ func NewTriggerWorker(client *client.Client, opts ...TriggerOption) *TriggerWork worker := &TriggerWorker{ client: client, registry: registry, + logger: zap.NewNop(), } for _, opt := range opts { @@ -50,9 +53,15 @@ func NewTriggerWorker(client *client.Client, opts ...TriggerOption) *TriggerWork return worker } +func (w *TriggerWorker) SetLogger(logger *zap.Logger) { + w.logger = logger +} + func (w *TriggerWorker) Trigger(ctx context.Context, triggerRequest *proto.TriggerRequest) error { + w.logger.Debug("Trigger request received", zap.Any("triggerRequest", triggerRequest)) err := w.trigger(ctx, triggerRequest) if err != nil { + w.logger.Error("Trigger error", zap.Error(err)) sendErr := w.client.SendTriggerResponse(ctx, &proto.TriggerResponse{ RequestID: triggerRequest.RequestID, AgentIdentification: w.client.SessionConfiguration().AgentIdentification, @@ -66,6 +75,7 @@ func (w *TriggerWorker) Trigger(ctx context.Context, triggerRequest *proto.Trigg }) if sendErr != nil { + w.logger.Error("Could not report trigger error back to the server", zap.Error(sendErr)) return fmt.Errorf("could not report trigger error back to the server: %w. Original error: %s", sendErr, err.Error()) } } @@ -75,20 +85,26 @@ func (w *TriggerWorker) Trigger(ctx context.Context, triggerRequest *proto.Trigg func (w *TriggerWorker) trigger(ctx context.Context, triggerRequest *proto.TriggerRequest) error { triggerConfig := convertProtoToTrigger(triggerRequest.Trigger) + w.logger.Debug("Triggering test", zap.Any("triggerConfig", triggerConfig)) triggerer, err := w.registry.Get(triggerConfig.Type) if err != nil { + w.logger.Error("Could not get triggerer", zap.Error(err)) return err } + w.logger.Debug("Triggerer found", zap.Any("triggerer", triggerer)) traceID, err := trace.TraceIDFromHex(triggerRequest.TraceID) if err != nil { + w.logger.Error("Invalid traceID was received in TriggerRequest", zap.Error(err)) return fmt.Errorf("invalid traceID was received in TriggerRequest: %w", err) } + w.logger.Debug("TraceID parsed", zap.Any("traceID", traceID)) if w.traceCache != nil { // Set traceID to cache so the collector starts watching for incoming traces // with same id - w.traceCache.Set(triggerRequest.TraceID, []*v1.Span{}) + w.logger.Debug("Appending traceID to trace cache", zap.Any("traceID", traceID)) + w.traceCache.Append(triggerRequest.TraceID, []*v1.Span{}) } response, err := triggerer.Trigger(ctx, triggerConfig, &agentTrigger.Options{ @@ -96,17 +112,22 @@ func (w *TriggerWorker) trigger(ctx context.Context, triggerRequest *proto.Trigg SpanID: id.NewRandGenerator().SpanID(), TestID: id.ID(triggerRequest.TestID), }) - if err != nil { + w.logger.Error("Could not trigger test", zap.Error(err)) return fmt.Errorf("could not trigger test: %w", err) } + w.logger.Debug("Test triggered", zap.Any("response", response)) + protoResponse := convertResponseToProtoResponse(triggerRequest, response) protoResponse.RequestID = triggerRequest.RequestID + w.logger.Debug("Sending trigger response to server", zap.Any("protoResponse", protoResponse)) err = w.client.SendTriggerResponse(ctx, protoResponse) if err != nil { + w.logger.Error("Could not send trigger response to server", zap.Error(err)) return fmt.Errorf("could not send trigger response to server: %w", err) } + w.logger.Debug("Trigger response sent to server") return nil } diff --git a/server/otlp/grpc_server.go b/server/otlp/grpc_server.go index 010cbf66a4..ae44e54c39 100644 --- a/server/otlp/grpc_server.go +++ b/server/otlp/grpc_server.go @@ -7,6 +7,7 @@ import ( "go.opentelemetry.io/otel/trace" pb "go.opentelemetry.io/proto/otlp/collector/trace/v1" + "go.uber.org/zap" "google.golang.org/grpc" ) @@ -18,6 +19,7 @@ type grpcServer struct { gServer *grpc.Server tracer trace.Tracer + logger *zap.Logger } func NewGrpcServer(addr string, ingester Ingester, tracer trace.Tracer) *grpcServer { @@ -25,9 +27,14 @@ func NewGrpcServer(addr string, ingester Ingester, tracer trace.Tracer) *grpcSer addr: addr, ingester: ingester, tracer: tracer, + logger: zap.NewNop(), } } +func (s *grpcServer) SetLogger(logger *zap.Logger) { + s.logger = logger +} + func (s *grpcServer) Start() error { s.gServer = grpc.NewServer() listener, err := net.Listen("tcp", s.addr) @@ -47,5 +54,11 @@ func (s grpcServer) Export(ctx context.Context, request *pb.ExportTraceServiceRe ctx, span := s.tracer.Start(ctx, "Export trace") defer span.End() - return s.ingester.Ingest(ctx, request, "gRPC") + s.logger.Debug("Received ExportTraceServiceRequest", zap.Any("request", request)) + + response, err := s.ingester.Ingest(ctx, request, RequestTypeGRPC) + + s.logger.Debug("Sending ExportTraceServiceResponse", zap.Any("response", response), zap.Error(err)) + + return response, err } diff --git a/server/otlp/http_server.go b/server/otlp/http_server.go index b20653e940..aed687f4f7 100644 --- a/server/otlp/http_server.go +++ b/server/otlp/http_server.go @@ -12,6 +12,7 @@ import ( "github.com/gorilla/handlers" "github.com/gorilla/mux" + "go.uber.org/zap" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" pb "go.opentelemetry.io/proto/otlp/collector/trace/v1" @@ -28,6 +29,7 @@ const ( type httpServer struct { addr string ingester Ingester + logger *zap.Logger hServer *http.Server } @@ -36,23 +38,31 @@ func NewHttpServer(addr string, ingester Ingester) *httpServer { return &httpServer{ addr: addr, ingester: ingester, + logger: zap.NewNop(), } } +func (s *httpServer) SetLogger(logger *zap.Logger) { + s.logger = logger +} + func (s *httpServer) Start() error { + s.logger.Debug("Starting HTTP server", zap.String("addr", s.addr)) r := mux.NewRouter() r.HandleFunc("/v1/traces", s.Export).Methods("POST") s.hServer = &http.Server{ Addr: s.addr, - Handler: handlers.CompressHandler(decompressBodyHandler(handlers.ContentTypeHandler(r, protoBufContentType, jsonContentType))), + Handler: handlers.CompressHandler(decompressBodyHandler(s.logger, handlers.ContentTypeHandler(r, protoBufContentType, jsonContentType))), } listener, err := net.Listen("tcp", s.addr) if err != nil { + s.logger.Error("cannot listen on address", zap.String("addr", s.addr), zap.Error(err)) return fmt.Errorf("cannot listen on address %s: %w", s.addr, err) } go s.hServer.Serve(listener) + s.logger.Debug("HTTP server started", zap.String("addr", s.addr)) return nil } @@ -64,19 +74,36 @@ func (s httpServer) Export(w http.ResponseWriter, r *http.Request) { contentType := r.Header.Get("content-type") response := newHttpResponse(w, contentType) + s.logger.Debug("Received ExportTraceServiceRequest", zap.String("content-type", contentType)) + request, err := s.parseBody(r.Body, contentType) if err != nil { - response.sendError(http.StatusUnprocessableEntity, status.Errorf(codes.InvalidArgument, "Could not parse request body %s", err.Error())) + s.logger.Error("Could not parse request body", zap.Error(err)) + sendErr := response.sendError(http.StatusUnprocessableEntity, status.Errorf(codes.InvalidArgument, "Could not parse request body %s", err.Error())) + if sendErr != nil { + s.logger.Error("Could not send error response", zap.Error(sendErr)) + } return } + s.logger.Debug("Parsed ExportTraceServiceRequest", zap.Any("request", request)) result, err := s.ingester.Ingest(r.Context(), request, "HTTP") if err != nil { - response.sendError(http.StatusInternalServerError, status.Errorf(codes.InvalidArgument, "Error when ingesting spans %s", err.Error())) + s.logger.Error("Error when ingesting spans", zap.Error(err)) + sendErr := response.sendError(http.StatusInternalServerError, status.Errorf(codes.InvalidArgument, "Error when ingesting spans %s", err.Error())) + if sendErr != nil { + s.logger.Error("Could not send error response", zap.Error(sendErr)) + } return } + s.logger.Debug("Ingested spans", zap.Any("result", result)) + + err = response.send(http.StatusOK, result) + if err != nil { + s.logger.Error("Error when sending response", zap.Error(err)) + } - response.send(http.StatusOK, result) + s.logger.Debug("Sent ExportTraceServiceResponse") } func (s httpServer) parseProtoBuf(body []byte) (*pb.ExportTraceServiceRequest, error) { @@ -84,7 +111,7 @@ func (s httpServer) parseProtoBuf(body []byte) (*pb.ExportTraceServiceRequest, e err := proto.Unmarshal(body, &request) if err != nil { - return nil, err + return nil, fmt.Errorf("cannot parse protobuf: %w", err) } return &request, nil @@ -95,12 +122,12 @@ func (s httpServer) parseJson(body []byte) (*pb.ExportTraceServiceRequest, error err := exportRequest.UnmarshalJSON(body) if err != nil { - return nil, err + return nil, fmt.Errorf("cannot parse json: %w", err) } protoBody, err := exportRequest.MarshalProto() if err != nil { - return nil, err + return nil, fmt.Errorf("cannot marshalProto: %w", err) } return s.parseProtoBuf(protoBody) @@ -109,7 +136,7 @@ func (s httpServer) parseJson(body []byte) (*pb.ExportTraceServiceRequest, error func (s httpServer) parseBody(reqBody io.ReadCloser, contentType string) (*pb.ExportTraceServiceRequest, error) { body, err := io.ReadAll(reqBody) if err != nil { - return nil, err + return nil, fmt.Errorf("cannot read request body: %w", err) } if len(body) == 0 { @@ -148,10 +175,10 @@ func (r httpResponse) send(statusCode int, message proto.Message) error { return nil } -func (r httpResponse) sendError(code int, err error) { +func (r httpResponse) sendError(code int, err error) error { rpcError, _ := status.FromError(err) - r.send(code, rpcError.Proto()) + return r.send(code, rpcError.Proto()) } func (r httpResponse) paseResponseBody(data proto.Message) ([]byte, error) { @@ -162,13 +189,18 @@ func (r httpResponse) paseResponseBody(data proto.Message) ([]byte, error) { return json.Marshal(data) } -func decompressBodyHandler(h http.Handler) http.Handler { +func decompressBodyHandler(logger *zap.Logger, h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if strings.Contains(r.Header.Get("content-encoding"), "gzip") { + logger.Debug("Decompressing request body") compressedBody, err := decompressBody(r.Body) if err != nil { + logger.Error("Could not decompress request body", zap.Error(err)) response := newHttpResponse(w, r.Header.Get("content-type")) - response.sendError(http.StatusUnprocessableEntity, status.Errorf(codes.InvalidArgument, "Could not parse request body %s", err.Error())) + sendErr := response.sendError(http.StatusUnprocessableEntity, status.Errorf(codes.InvalidArgument, "Could not parse request body %s", err.Error())) + if sendErr != nil { + logger.Error("Could not send error response", zap.Error(sendErr)) + } return }