From 483826ea55707774f8f71a7cb3e57f6c4dc06471 Mon Sep 17 00:00:00 2001 From: Daniel Baptista Dias Date: Wed, 22 Nov 2023 13:16:09 -0300 Subject: [PATCH] chore: add verbose mode for Tracetest agent (#3385) * updating logging for agent * updating logs * updating default observer for workers and collector --- agent/collector/collector.go | 14 +++- agent/collector/ingester.go | 7 +- agent/config/flags.go | 1 + agent/event/observer.go | 112 ++++++++++++++++++++++++++ agent/runner/runner.go | 29 ++++++- agent/runner/runstrategy_desktop.go | 20 ++--- agent/runner/runstrategy_verbose.go | 118 ++++++++++++++++++++++++++++ agent/runner/session.go | 61 +++++++------- agent/ui/emoji.go | 14 ++++ agent/workers/poller.go | 15 ++++ agent/workers/testconnnection.go | 31 +++++--- agent/workers/trigger.go | 17 ++++ cli/cmd/start_cmd.go | 3 + go.mod | 5 +- go.sum | 2 + 15 files changed, 385 insertions(+), 64 deletions(-) create mode 100644 agent/event/observer.go create mode 100644 agent/runner/runstrategy_verbose.go create mode 100644 agent/ui/emoji.go diff --git a/agent/collector/collector.go b/agent/collector/collector.go index 96ece1ac91..5328b17f1a 100644 --- a/agent/collector/collector.go +++ b/agent/collector/collector.go @@ -8,6 +8,7 @@ import ( "syscall" "time" + "github.com/kubeshop/tracetest/agent/event" "github.com/kubeshop/tracetest/server/otlp" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -41,6 +42,12 @@ func WithLogger(logger *zap.Logger) CollectorOption { } } +func WithObserver(observer event.Observer) CollectorOption { + return func(ric *remoteIngesterConfig) { + ric.observer = observer + } +} + type collector struct { grpcServer stoppable httpServer stoppable @@ -54,9 +61,10 @@ func (c *collector) Stop() { func Start(ctx context.Context, config Config, tracer trace.Tracer, opts ...CollectorOption) (stoppable, error) { ingesterConfig := remoteIngesterConfig{ - URL: config.RemoteServerURL, - Token: config.RemoteServerToken, - logger: zap.NewNop(), + URL: config.RemoteServerURL, + Token: config.RemoteServerToken, + logger: zap.NewNop(), + observer: event.NewNopObserver(), } for _, opt := range opts { diff --git a/agent/collector/ingester.go b/agent/collector/ingester.go index 0d9fc27307..682c49de9c 100644 --- a/agent/collector/ingester.go +++ b/agent/collector/ingester.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/kubeshop/tracetest/agent/event" "github.com/kubeshop/tracetest/server/otlp" "go.opencensus.io/trace" pb "go.opentelemetry.io/proto/otlp/collector/trace/v1" @@ -60,6 +61,7 @@ type remoteIngesterConfig struct { traceCache TraceCache startRemoteServer bool logger *zap.Logger + observer event.Observer } type buffer struct { @@ -136,7 +138,6 @@ func (i *forwardIngester) executeBatch(ctx context.Context) error { } i.logger.Debug("successfully forwarded spans", zap.Int("count", len(newSpans))) - return nil } @@ -174,8 +175,12 @@ func (i *forwardIngester) cacheTestSpans(resourceSpans []*v1.ResourceSpans) { continue } + i.RemoteIngester.observer.StartSpanReceive(spans) + i.traceCache.Append(traceID, spans) i.logger.Debug("caching test spans", zap.String("traceID", traceID), zap.Int("count", len(spans))) + + i.RemoteIngester.observer.EndSpanReceive(spans, nil) } } diff --git a/agent/config/flags.go b/agent/config/flags.go index 81b9ae82ed..44e5e3d62f 100644 --- a/agent/config/flags.go +++ b/agent/config/flags.go @@ -15,4 +15,5 @@ type Flags struct { AgentApiKey string Token string Mode Mode + LogLevel string } diff --git a/agent/event/observer.go b/agent/event/observer.go new file mode 100644 index 0000000000..24d194f9ab --- /dev/null +++ b/agent/event/observer.go @@ -0,0 +1,112 @@ +package event + +import ( + "github.com/kubeshop/tracetest/agent/proto" + v1 "go.opentelemetry.io/proto/otlp/trace/v1" +) + +type Observer interface { + StartTriggerExecution(*proto.TriggerRequest) + EndTriggerExecution(*proto.TriggerRequest, error) + + StartTracePoll(*proto.PollingRequest) + EndTracePoll(*proto.PollingRequest, error) + + StartSpanReceive([]*v1.Span) + EndSpanReceive([]*v1.Span, error) + + StartDataStoreConnection(*proto.DataStoreConnectionTestRequest) + EndDataStoreConnection(*proto.DataStoreConnectionTestRequest, error) + + Error(error) +} + +type wrapperObserver struct { + wrappedObserver Observer +} + +func NewNopObserver() Observer { + return &wrapperObserver{ + wrappedObserver: nil, + } +} + +func WrapObserver(observer Observer) Observer { + return &wrapperObserver{ + wrappedObserver: observer, + } +} + +var _ Observer = &wrapperObserver{} + +func (o *wrapperObserver) StartDataStoreConnection(request *proto.DataStoreConnectionTestRequest) { + if o.wrappedObserver == nil { + return + } + + o.wrappedObserver.StartDataStoreConnection(request) +} + +func (o *wrapperObserver) EndDataStoreConnection(request *proto.DataStoreConnectionTestRequest, err error) { + if o.wrappedObserver == nil { + return + } + + o.wrappedObserver.EndDataStoreConnection(request, err) +} + +func (o *wrapperObserver) StartSpanReceive(spans []*v1.Span) { + if o.wrappedObserver == nil { + return + } + + o.wrappedObserver.StartSpanReceive(spans) +} + +func (o *wrapperObserver) EndSpanReceive(spans []*v1.Span, err error) { + if o.wrappedObserver == nil { + return + } + + o.wrappedObserver.EndSpanReceive(spans, err) +} + +func (o *wrapperObserver) Error(err error) { + if o.wrappedObserver == nil { + return + } + + o.wrappedObserver.Error(err) +} + +func (o *wrapperObserver) StartTracePoll(request *proto.PollingRequest) { + if o.wrappedObserver == nil { + return + } + + o.wrappedObserver.StartTracePoll(request) +} + +func (o *wrapperObserver) EndTracePoll(request *proto.PollingRequest, err error) { + if o.wrappedObserver == nil { + return + } + + o.wrappedObserver.EndTracePoll(request, err) +} + +func (o *wrapperObserver) StartTriggerExecution(request *proto.TriggerRequest) { + if o.wrappedObserver == nil { + return + } + + o.wrappedObserver.StartTriggerExecution(request) +} + +func (o *wrapperObserver) EndTriggerExecution(request *proto.TriggerRequest, err error) { + if o.wrappedObserver == nil { + return + } + + o.wrappedObserver.EndTriggerExecution(request, err) +} diff --git a/agent/runner/runner.go b/agent/runner/runner.go index 84a545ca8a..76bf5bfe71 100644 --- a/agent/runner/runner.go +++ b/agent/runner/runner.go @@ -2,12 +2,16 @@ package runner import ( "context" + "fmt" + "os" agentConfig "github.com/kubeshop/tracetest/agent/config" "github.com/kubeshop/tracetest/agent/ui" "github.com/kubeshop/tracetest/cli/config" "github.com/kubeshop/tracetest/cli/pkg/resourcemanager" + + "go.uber.org/zap" ) type Runner struct { @@ -15,6 +19,7 @@ type Runner struct { resources *resourcemanager.Registry ui ui.ConsoleUI mode agentConfig.Mode + logger *zap.Logger } func NewRunner(configurator config.Configurator, resources *resourcemanager.Registry, ui ui.ConsoleUI) *Runner { @@ -23,6 +28,7 @@ func NewRunner(configurator config.Configurator, resources *resourcemanager.Regi resources: resources, ui: ui, mode: agentConfig.Mode_Desktop, + logger: nil, } } @@ -30,14 +36,26 @@ func (s *Runner) Run(ctx context.Context, cfg config.Config, flags agentConfig.F s.ui.Banner(config.Version) s.ui.Println(`Tracetest start launches a lightweight agent. It enables you to run tests and collect traces with Tracetest. Once started, Tracetest Agent exposes OTLP ports 4317 and 4318 to ingest traces via gRCP and HTTP.`) + s.ui.Println("") // print empty line if flags.Token == "" || flags.AgentApiKey != "" { s.configurator = s.configurator.WithOnFinish(s.onStartAgent) } + s.mode = flags.Mode s.ui.Infof("Running in %s mode...", s.mode) - s.mode = flags.Mode + logger := zap.NewNop() + + if enableLogging(flags.LogLevel) { + var err error + logger, err = zap.NewDevelopment() + if err != nil { + return fmt.Errorf("could not create logger: %w", err) + } + } + + s.logger = logger return s.configurator.Start(ctx, cfg, flags) } @@ -78,9 +96,12 @@ func (s *Runner) StartAgent(ctx context.Context, endpoint, agentApiKey, uiEndpoi } if s.mode == agentConfig.Mode_Desktop { - return RunDesktopStrategy(ctx, cfg, s.ui, uiEndpoint) + return s.RunDesktopStrategy(ctx, cfg, uiEndpoint) } - // TODO: Add verbose strategy - return nil + return s.RunVerboseStrategy(ctx, cfg) +} + +func enableLogging(logLevel string) bool { + return os.Getenv("TRACETEST_DEV") == "true" && logLevel == "debug" } diff --git a/agent/runner/runstrategy_desktop.go b/agent/runner/runstrategy_desktop.go index 414f2a4092..aa72b7d3df 100644 --- a/agent/runner/runstrategy_desktop.go +++ b/agent/runner/runstrategy_desktop.go @@ -11,8 +11,8 @@ import ( consoleUI "github.com/kubeshop/tracetest/agent/ui" ) -func RunDesktopStrategy(ctx context.Context, cfg agentConfig.Config, ui consoleUI.ConsoleUI, uiEndpoint string) error { - ui.Infof("Starting Agent with name %s...", cfg.Name) +func (s *Runner) RunDesktopStrategy(ctx context.Context, cfg agentConfig.Config, uiEndpoint string) error { + s.ui.Infof("Starting Agent with name %s...", cfg.Name) isStarted := false session := &Session{} @@ -20,13 +20,13 @@ func RunDesktopStrategy(ctx context.Context, cfg agentConfig.Config, ui consoleU var err error for !isStarted { - session, err = StartSession(ctx, cfg) + session, err = StartSession(ctx, cfg, nil, s.logger) if err != nil && errors.Is(err, ErrOtlpServerStart) { - ui.Error("Tracetest Agent binds to the OpenTelemetry ports 4317 and 4318 which are used to receive trace information from your system. The agent tried to bind to these ports, but failed.") - shouldRetry := ui.Enter("Please stop the process currently listening on these ports and press enter to try again.") + s.ui.Error("Tracetest Agent binds to the OpenTelemetry ports 4317 and 4318 which are used to receive trace information from your system. The agent tried to bind to these ports, but failed.") + shouldRetry := s.ui.Enter("Please stop the process currently listening on these ports and press enter to try again.") if !shouldRetry { - ui.Finish() + s.ui.Finish() return err } @@ -51,20 +51,20 @@ You can` options := []consoleUI.Option{{ Text: "Open Tracetest in a browser to this environment", Fn: func(_ consoleUI.ConsoleUI) { - ui.OpenBrowser(fmt.Sprintf("%sorganizations/%s/environments/%s/dashboard", uiEndpoint, claims["organization_id"], claims["environment_id"])) + s.ui.OpenBrowser(fmt.Sprintf("%sorganizations/%s/environments/%s/dashboard", uiEndpoint, claims["organization_id"], claims["environment_id"])) }, }, { Text: "Stop this agent", Fn: func(_ consoleUI.ConsoleUI) { isOpen = false session.Close() - ui.Finish() + s.ui.Finish() }, }} for isOpen { - selected := ui.Select(message, options, 0) - selected.Fn(ui) + selected := s.ui.Select(message, options, 0) + selected.Fn(s.ui) } return nil } diff --git a/agent/runner/runstrategy_verbose.go b/agent/runner/runstrategy_verbose.go new file mode 100644 index 0000000000..ba47898413 --- /dev/null +++ b/agent/runner/runstrategy_verbose.go @@ -0,0 +1,118 @@ +package runner + +import ( + "context" + "errors" + "os" + "os/signal" + "syscall" + + agentConfig "github.com/kubeshop/tracetest/agent/config" + "github.com/kubeshop/tracetest/agent/event" + "github.com/kubeshop/tracetest/agent/proto" + consoleUI "github.com/kubeshop/tracetest/agent/ui" + v1 "go.opentelemetry.io/proto/otlp/trace/v1" +) + +func (s *Runner) RunVerboseStrategy(ctx context.Context, cfg agentConfig.Config) error { + s.ui.Infof("%s Starting Agent with name %s...", consoleUI.Emoji_Truck, cfg.Name) + + session, err := StartSession(ctx, cfg, &verboseObserver{ui: s.ui}, s.logger) + + if err != nil && errors.Is(err, ErrOtlpServerStart) { + s.ui.Errorf("%s Tracetest Agent binds to the OpenTelemetry ports 4317 and 4318 which are used to receive trace information from your system. The agent tried to bind to these ports, but failed.", consoleUI.Emoji_RedCircle) + + s.ui.Finish() + return err + } + + s.ui.Infof("%s Agent is started!", consoleUI.Emoji_Truck) + s.ui.Println("") + + signalChannel := make(chan os.Signal, 1) + signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT) + + sig := <-signalChannel + s.ui.Infof("%s Received stop signal %s. Stopping agent...", consoleUI.Emoji_YellowCircle, sig) + + session.Close() + s.ui.Finish() + + return nil +} + +type verboseObserver struct { + ui consoleUI.ConsoleUI +} + +var _ event.Observer = &verboseObserver{} + +func (o *verboseObserver) EndDataStoreConnection(request *proto.DataStoreConnectionTestRequest, err error) { + if err != nil { + o.ui.Warningf("%s Testing connection to DataStore %s failed!", consoleUI.Emoji_YellowCircle, request.Datastore.Type) + o.ui.Warningf("Error: %s", err.Error()) + o.ui.Println("") + return + } + + o.ui.Infof("%s Testing connection to %s data store succeed", consoleUI.Emoji_WhiteCheckMark, request.Datastore.Type) + o.ui.Println("") +} + +func (o *verboseObserver) EndSpanReceive(spans []*v1.Span, err error) { + if err != nil { + o.ui.Warningf("%s Trace spans reception failed!", consoleUI.Emoji_YellowCircle) + o.ui.Warningf("Error: %s", err.Error()) + o.ui.Println("") + return + } + + o.ui.Infof("%s Trace spans received with success. %d spans received", consoleUI.Emoji_WhiteCheckMark, len(spans)) + o.ui.Println("") +} + +func (o *verboseObserver) EndTracePoll(request *proto.PollingRequest, err error) { + if err != nil { + o.ui.Warningf("%s Test run %d, test %s trace spans fetch failed!", consoleUI.Emoji_YellowCircle, request.RunID, request.TestID) + o.ui.Warningf("Error: %s", err.Error()) + o.ui.Println("") + return + } + + o.ui.Infof("%s Test run %d, test %s trace spans fetch with success", consoleUI.Emoji_WhiteCheckMark, request.RunID, request.TestID) + o.ui.Println("") +} + +func (o *verboseObserver) EndTriggerExecution(request *proto.TriggerRequest, err error) { + if err != nil { + o.ui.Warningf("%s Test run %d, test %s trigger failed!", consoleUI.Emoji_YellowCircle, request.RunID, request.TestID) + o.ui.Warningf("Error: %s", err.Error()) + o.ui.Println("") + return + } + + o.ui.Infof("%s Test run %d, test %s trigger executed with success", consoleUI.Emoji_WhiteCheckMark, request.RunID, request.TestID) + o.ui.Println("") +} + +func (o *verboseObserver) Error(err error) { + o.ui.Errorf("%s An unknown error happened on Tracetest agent.", consoleUI.Emoji_RedCircle) + o.ui.Errorf("Error: %s", err.Error()) + o.ui.Println("") +} + +func (o *verboseObserver) StartDataStoreConnection(request *proto.DataStoreConnectionTestRequest) { + o.ui.Infof("%s Testing connection to %s data store ...", consoleUI.Emoji_Magnifier, request.Datastore.Type) +} + +func (o *verboseObserver) StartSpanReceive(spans []*v1.Span) { + o.ui.Infof("%s Receiving trace spans...", consoleUI.Emoji_Sparkles) +} + +func (o *verboseObserver) StartTracePoll(request *proto.PollingRequest) { + o.ui.Infof("%s Polling traces and spans for test run %d, test %s ...", consoleUI.Emoji_Magnifier, request.RunID, request.TestID) +} + +func (o *verboseObserver) StartTriggerExecution(request *proto.TriggerRequest) { + o.ui.Infof("%s Executing trigger for test run %d, test %s ...", consoleUI.Emoji_Sparkles, request.RunID, request.TestID) +} diff --git a/agent/runner/session.go b/agent/runner/session.go index 246eaabbe6..3f0007a803 100644 --- a/agent/runner/session.go +++ b/agent/runner/session.go @@ -4,11 +4,11 @@ import ( "context" "errors" "fmt" - "os" "github.com/kubeshop/tracetest/agent/client" "github.com/kubeshop/tracetest/agent/collector" "github.com/kubeshop/tracetest/agent/config" + "github.com/kubeshop/tracetest/agent/event" "github.com/kubeshop/tracetest/agent/proto" "github.com/kubeshop/tracetest/agent/workers" "github.com/kubeshop/tracetest/agent/workers/poller" @@ -19,8 +19,6 @@ import ( var ErrOtlpServerStart = errors.New("OTLP server start error") -var logger *zap.Logger - type Session struct { Token string client *client.Client @@ -35,9 +33,11 @@ func (s *Session) WaitUntilDisconnected() { } // Start the agent session with given configuration -func StartSession(ctx context.Context, cfg config.Config) (*Session, error) { +func StartSession(ctx context.Context, cfg config.Config, observer event.Observer, logger *zap.Logger) (*Session, error) { + observer = event.WrapObserver(observer) + traceCache := collector.NewTraceCache() - controlPlaneClient, err := newControlPlaneClient(ctx, cfg, traceCache) + controlPlaneClient, err := newControlPlaneClient(ctx, cfg, traceCache, observer, logger) if err != nil { return nil, err } @@ -47,7 +47,7 @@ func StartSession(ctx context.Context, cfg config.Config) (*Session, error) { return nil, err } - err = StartCollector(ctx, cfg, traceCache) + err = StartCollector(ctx, cfg, traceCache, observer, logger) if err != nil { return nil, err } @@ -58,7 +58,7 @@ func StartSession(ctx context.Context, cfg config.Config) (*Session, error) { }, nil } -func StartCollector(ctx context.Context, config config.Config, traceCache collector.TraceCache) error { +func StartCollector(ctx context.Context, config config.Config, traceCache collector.TraceCache, observer event.Observer, logger *zap.Logger) error { noopTracer := trace.NewNoopTracerProvider().Tracer("noop") collectorConfig := collector.Config{ HTTPPort: config.OTLPServer.HTTPPort, @@ -68,10 +68,8 @@ func StartCollector(ctx context.Context, config config.Config, traceCache collec opts := []collector.CollectorOption{ collector.WithTraceCache(traceCache), collector.WithStartRemoteServer(false), - } - - if enableLogging() { - opts = append(opts, collector.WithLogger(logger)) + collector.WithObserver(observer), + collector.WithLogger(logger), } _, err := collector.Start( @@ -87,39 +85,34 @@ func StartCollector(ctx context.Context, config config.Config, traceCache collec return nil } -func enableLogging() bool { - return os.Getenv("TRACETEST_DEV") == "true" -} - -func newControlPlaneClient(ctx context.Context, config config.Config, traceCache collector.TraceCache) (*client.Client, error) { - if enableLogging() { - var err error - logger, err = zap.NewDevelopment() - if err != nil { - return nil, fmt.Errorf("could not create logger: %w", err) - } - } - +func newControlPlaneClient(ctx context.Context, config config.Config, traceCache collector.TraceCache, observer event.Observer, logger *zap.Logger) (*client.Client, error) { controlPlaneClient, err := client.Connect(ctx, config.ServerURL, client.WithAPIKey(config.APIKey), client.WithAgentName(config.Name), client.WithLogger(logger), ) if err != nil { + observer.Error(err) return nil, err } - triggerWorker := workers.NewTriggerWorker(controlPlaneClient, workers.WithTraceCache(traceCache)) - pollingWorker := workers.NewPollerWorker(controlPlaneClient, workers.WithInMemoryDatastore( - poller.NewInMemoryDatastore(traceCache), - )) - dataStoreTestConnectionWorker := workers.NewTestConnectionWorker(controlPlaneClient) + triggerWorker := workers.NewTriggerWorker( + controlPlaneClient, + workers.WithTraceCache(traceCache), + workers.WithTriggerObserver(observer), + ) - if enableLogging() { - triggerWorker.SetLogger(logger) - pollingWorker.SetLogger(logger) - dataStoreTestConnectionWorker.SetLogger(logger) - } + pollingWorker := workers.NewPollerWorker( + controlPlaneClient, + workers.WithInMemoryDatastore(poller.NewInMemoryDatastore(traceCache)), + workers.WithObserver(observer), + ) + + dataStoreTestConnectionWorker := workers.NewTestConnectionWorker(controlPlaneClient, observer) + + triggerWorker.SetLogger(logger) + pollingWorker.SetLogger(logger) + dataStoreTestConnectionWorker.SetLogger(logger) controlPlaneClient.OnDataStoreTestConnectionRequest(dataStoreTestConnectionWorker.Test) controlPlaneClient.OnTriggerRequest(triggerWorker.Trigger) diff --git a/agent/ui/emoji.go b/agent/ui/emoji.go new file mode 100644 index 0000000000..8d97662f92 --- /dev/null +++ b/agent/ui/emoji.go @@ -0,0 +1,14 @@ +package ui + +import ( + "github.com/kyokomi/emoji/v2" +) + +var ( + Emoji_Magnifier = emoji.Sprint(":mag:") + Emoji_RedCircle = emoji.Sprint(":red_circle:") + Emoji_Sparkles = emoji.Sprint(":sparkles:") + Emoji_Truck = emoji.Sprint(":truck:") + Emoji_WhiteCheckMark = emoji.Sprint(":white_check_mark:") + Emoji_YellowCircle = emoji.Sprint(":large_yellow_circle:") +) diff --git a/agent/workers/poller.go b/agent/workers/poller.go index 455c3c6d18..b3da2fa5c1 100644 --- a/agent/workers/poller.go +++ b/agent/workers/poller.go @@ -10,6 +10,7 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/fluidtruck/deepcopy" "github.com/kubeshop/tracetest/agent/client" + "github.com/kubeshop/tracetest/agent/event" "github.com/kubeshop/tracetest/agent/proto" "github.com/kubeshop/tracetest/server/datastore" "github.com/kubeshop/tracetest/server/tracedb" @@ -25,6 +26,7 @@ type PollerWorker struct { sentSpanIDs *gocache.Cache[string, bool] inmemoryDatastore tracedb.TraceDB logger *zap.Logger + observer event.Observer } type PollerOption func(*PollerWorker) @@ -35,6 +37,12 @@ func WithInMemoryDatastore(datastore tracedb.TraceDB) PollerOption { } } +func WithObserver(observer event.Observer) PollerOption { + return func(pw *PollerWorker) { + pw.observer = observer + } +} + func NewPollerWorker(client *client.Client, opts ...PollerOption) *PollerWorker { // TODO: use a real tracer tracer := trace.NewNoopTracerProvider().Tracer("noop") @@ -44,6 +52,7 @@ func NewPollerWorker(client *client.Client, opts ...PollerOption) *PollerWorker tracer: tracer, sentSpanIDs: gocache.New[string, bool](), logger: zap.NewNop(), + observer: event.NewNopObserver(), } for _, opt := range opts { @@ -59,6 +68,8 @@ func (w *PollerWorker) SetLogger(logger *zap.Logger) { func (w *PollerWorker) Poll(ctx context.Context, request *proto.PollingRequest) error { w.logger.Debug("Received polling request", zap.Any("request", request)) + w.observer.StartTracePoll(request) + err := w.poll(ctx, request) if err != nil { w.logger.Error("Error polling", zap.Error(err)) @@ -74,15 +85,19 @@ func (w *PollerWorker) Poll(ctx context.Context, request *proto.PollingRequest) }, } + w.observer.EndTracePoll(request, err) w.logger.Debug("Sending polling error", zap.Any("response", errorResponse)) sendErr := w.client.SendTrace(ctx, errorResponse) if sendErr != nil { w.logger.Error("Error sending polling error", zap.Error(sendErr)) + w.observer.Error(sendErr) + return fmt.Errorf("could not report polling error back to the server: %w. Original error: %s", sendErr, err.Error()) } } + w.observer.EndTracePoll(request, nil) return err } diff --git a/agent/workers/testconnnection.go b/agent/workers/testconnnection.go index 2ce1315d50..f9a847f01d 100644 --- a/agent/workers/testconnnection.go +++ b/agent/workers/testconnnection.go @@ -6,6 +6,7 @@ import ( "log" "github.com/kubeshop/tracetest/agent/client" + "github.com/kubeshop/tracetest/agent/event" "github.com/kubeshop/tracetest/agent/proto" "github.com/kubeshop/tracetest/server/model" "github.com/kubeshop/tracetest/server/tracedb" @@ -14,19 +15,21 @@ import ( ) type TestConnectionWorker struct { - client *client.Client - tracer trace.Tracer - logger *zap.Logger + client *client.Client + tracer trace.Tracer + logger *zap.Logger + observer event.Observer } -func NewTestConnectionWorker(client *client.Client) *TestConnectionWorker { +func NewTestConnectionWorker(client *client.Client, observer event.Observer) *TestConnectionWorker { // TODO: use a real tracer tracer := trace.NewNoopTracerProvider().Tracer("noop") return &TestConnectionWorker{ - client: client, - tracer: tracer, - logger: zap.NewNop(), + client: client, + tracer: tracer, + logger: zap.NewNop(), + observer: observer, } } @@ -36,16 +39,22 @@ func (w *TestConnectionWorker) SetLogger(logger *zap.Logger) { func (w *TestConnectionWorker) Test(ctx context.Context, request *proto.DataStoreConnectionTestRequest) error { w.logger.Debug("Received datastore connection test request") + w.observer.StartDataStoreConnection(request) + datastoreConfig, err := convertProtoToDataStore(request.Datastore) if err != nil { w.logger.Error("Invalid datastore", zap.Error(err)) + w.observer.EndDataStoreConnection(request, err) return err } w.logger.Debug("Converted datastore", zap.Any("datastore", datastoreConfig)) if datastoreConfig == nil { - w.logger.Error("Invalid datastore: nil") - return fmt.Errorf("invalid datastore: nil") + err = fmt.Errorf("invalid datastore: nil") + + w.logger.Error("nil datastore", zap.Error(err)) + w.observer.EndDataStoreConnection(request, err) + return err } dsFactory := tracedb.Factory(nil) @@ -53,6 +62,8 @@ func (w *TestConnectionWorker) Test(ctx context.Context, request *proto.DataStor if err != nil { w.logger.Error("Invalid datastore", zap.Error(err)) log.Printf("Invalid datastore: %s", err.Error()) + w.observer.EndDataStoreConnection(request, err) + return err } w.logger.Debug("Created datastore", zap.Any("datastore", ds)) @@ -83,10 +94,12 @@ func (w *TestConnectionWorker) Test(ctx context.Context, request *proto.DataStor err = w.client.SendDataStoreConnectionResult(ctx, response) if err != nil { w.logger.Error("Could not send datastore connection test result", zap.Error(err)) + w.observer.Error(err) } else { w.logger.Debug("Sent datastore connection test result") } + w.observer.EndDataStoreConnection(request, nil) return nil } diff --git a/agent/workers/trigger.go b/agent/workers/trigger.go index 5a631e4da2..46f8cfcce3 100644 --- a/agent/workers/trigger.go +++ b/agent/workers/trigger.go @@ -6,6 +6,7 @@ import ( "github.com/kubeshop/tracetest/agent/client" "github.com/kubeshop/tracetest/agent/collector" + "github.com/kubeshop/tracetest/agent/event" "github.com/kubeshop/tracetest/agent/proto" agentTrigger "github.com/kubeshop/tracetest/agent/workers/trigger" "github.com/kubeshop/tracetest/server/pkg/id" @@ -20,6 +21,7 @@ type TriggerWorker struct { client *client.Client registry *agentTrigger.Registry traceCache collector.TraceCache + observer event.Observer } type TriggerOption func(*TriggerWorker) @@ -30,6 +32,12 @@ func WithTraceCache(cache collector.TraceCache) TriggerOption { } } +func WithTriggerObserver(observer event.Observer) TriggerOption { + return func(tw *TriggerWorker) { + tw.observer = observer + } +} + func NewTriggerWorker(client *client.Client, opts ...TriggerOption) *TriggerWorker { // TODO: use a real tracer tracer := trace.NewNoopTracerProvider().Tracer("noop") @@ -44,6 +52,7 @@ func NewTriggerWorker(client *client.Client, opts ...TriggerOption) *TriggerWork client: client, registry: registry, logger: zap.NewNop(), + observer: event.NewNopObserver(), } for _, opt := range opts { @@ -59,9 +68,13 @@ func (w *TriggerWorker) SetLogger(logger *zap.Logger) { func (w *TriggerWorker) Trigger(ctx context.Context, triggerRequest *proto.TriggerRequest) error { w.logger.Debug("Trigger request received", zap.Any("triggerRequest", triggerRequest)) + w.observer.StartTriggerExecution(triggerRequest) + err := w.trigger(ctx, triggerRequest) if err != nil { w.logger.Error("Trigger error", zap.Error(err)) + w.observer.EndTriggerExecution(triggerRequest, err) + sendErr := w.client.SendTriggerResponse(ctx, &proto.TriggerResponse{ RequestID: triggerRequest.RequestID, AgentIdentification: w.client.SessionConfiguration().AgentIdentification, @@ -76,10 +89,14 @@ func (w *TriggerWorker) Trigger(ctx context.Context, triggerRequest *proto.Trigg if sendErr != nil { w.logger.Error("Could not report trigger error back to the server", zap.Error(sendErr)) + w.observer.Error(sendErr) + return fmt.Errorf("could not report trigger error back to the server: %w. Original error: %s", sendErr, err.Error()) } } + w.observer.EndTriggerExecution(triggerRequest, nil) + return err } diff --git a/cli/cmd/start_cmd.go b/cli/cmd/start_cmd.go index e685e32bd5..94a064c2f2 100644 --- a/cli/cmd/start_cmd.go +++ b/cli/cmd/start_cmd.go @@ -34,6 +34,7 @@ var startCmd = &cobra.Command{ AgentApiKey: saveParams.agentApiKey, Token: saveParams.token, Mode: agentConfig.Mode(saveParams.mode), + LogLevel: saveParams.logLevel, } cfg, err := agentConfig.LoadConfig() @@ -58,6 +59,7 @@ func init() { startCmd.Flags().StringVarP(&saveParams.token, "token", "", defaultToken, "token authentication key") startCmd.Flags().StringVarP(&saveParams.endpoint, "endpoint", "e", defaultEndpoint, "set the value for the endpoint, so the CLI won't ask for this value") startCmd.Flags().StringVarP(&saveParams.mode, "mode", "m", "desktop", "set how the agent will start") + startCmd.Flags().StringVarP(&saveParams.logLevel, "log-level", "l", "debug", "set the agent log level") rootCmd.AddCommand(startCmd) } @@ -68,4 +70,5 @@ type saveParameters struct { agentApiKey string token string mode string + logLevel string } diff --git a/go.mod b/go.mod index d89e80f23f..523c0e97b1 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/alecthomas/participle/v2 v2.0.0-alpha8 github.com/alexeyco/simpletable v1.0.0 github.com/alitto/pond v1.8.3 + github.com/avast/retry-go v3.0.0+incompatible github.com/aws/aws-sdk-go v1.44.196 github.com/brianvoe/gofakeit/v6 v6.17.0 github.com/compose-spec/compose-go v1.20.0 @@ -39,6 +40,7 @@ require ( github.com/jackc/pgx/v5 v5.4.2 github.com/jhump/protoreflect v1.12.0 github.com/json-iterator/go v1.1.12 + github.com/kyokomi/emoji/v2 v2.2.12 github.com/labstack/gommon v0.3.0 github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 github.com/nats-io/nats.go v1.31.0 @@ -96,8 +98,6 @@ require ( github.com/Microsoft/go-winio v0.5.2 // indirect github.com/PuerkitoBio/purell v1.1.1 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect - github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect - github.com/avast/retry-go v3.0.0+incompatible // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/containerd/console v1.0.3 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect @@ -198,7 +198,6 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect gopkg.in/ini.v1 v1.67.0 // indirect - gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect ) // Temporary fix until we manage to merge the patch to the gnomock repo (https://github.com/orlangure/gnomock/pull/534) diff --git a/go.sum b/go.sum index f3c23796dc..e6e09c33ff 100644 --- a/go.sum +++ b/go.sum @@ -1269,6 +1269,8 @@ github.com/ktrysmt/go-bitbucket v0.6.4/go.mod h1:9u0v3hsd2rqCHRIpbir1oP7F58uo5dq github.com/kylelemons/godebug v0.0.0-20160406211939-eadb3ce320cb/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/kyokomi/emoji/v2 v2.2.12 h1:sSVA5nH9ebR3Zji1o31wu3yOwD1zKXQA2z0zUyeit60= +github.com/kyokomi/emoji/v2 v2.2.12/go.mod h1:JUcn42DTdsXJo1SWanHh4HKDEyPaR5CqkmoirZZP9qE= github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4FW1e6jwpg= github.com/labstack/gommon v0.3.0 h1:JEeO0bvc78PKdyHxloTKiF8BD5iGrH8T6MSeGvSgob0= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=