Skip to content

Commit

Permalink
chore: add verbose mode for Tracetest agent (#3385)
Browse files Browse the repository at this point in the history
* updating logging for agent

* updating logs

* updating default observer for workers and collector
  • Loading branch information
danielbdias committed Nov 22, 2023
1 parent 5e64dc1 commit 483826e
Show file tree
Hide file tree
Showing 15 changed files with 385 additions and 64 deletions.
14 changes: 11 additions & 3 deletions agent/collector/collector.go
Expand Up @@ -8,6 +8,7 @@ import (
"syscall"
"time"

"github.com/kubeshop/tracetest/agent/event"
"github.com/kubeshop/tracetest/server/otlp"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
Expand Down Expand Up @@ -41,6 +42,12 @@ func WithLogger(logger *zap.Logger) CollectorOption {
}
}

func WithObserver(observer event.Observer) CollectorOption {
return func(ric *remoteIngesterConfig) {
ric.observer = observer
}
}

type collector struct {
grpcServer stoppable
httpServer stoppable
Expand All @@ -54,9 +61,10 @@ func (c *collector) Stop() {

func Start(ctx context.Context, config Config, tracer trace.Tracer, opts ...CollectorOption) (stoppable, error) {
ingesterConfig := remoteIngesterConfig{
URL: config.RemoteServerURL,
Token: config.RemoteServerToken,
logger: zap.NewNop(),
URL: config.RemoteServerURL,
Token: config.RemoteServerToken,
logger: zap.NewNop(),
observer: event.NewNopObserver(),
}

for _, opt := range opts {
Expand Down
7 changes: 6 additions & 1 deletion agent/collector/ingester.go
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/kubeshop/tracetest/agent/event"
"github.com/kubeshop/tracetest/server/otlp"
"go.opencensus.io/trace"
pb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
Expand Down Expand Up @@ -60,6 +61,7 @@ type remoteIngesterConfig struct {
traceCache TraceCache
startRemoteServer bool
logger *zap.Logger
observer event.Observer
}

type buffer struct {
Expand Down Expand Up @@ -136,7 +138,6 @@ func (i *forwardIngester) executeBatch(ctx context.Context) error {
}

i.logger.Debug("successfully forwarded spans", zap.Int("count", len(newSpans)))

return nil
}

Expand Down Expand Up @@ -174,8 +175,12 @@ func (i *forwardIngester) cacheTestSpans(resourceSpans []*v1.ResourceSpans) {
continue
}

i.RemoteIngester.observer.StartSpanReceive(spans)

i.traceCache.Append(traceID, spans)
i.logger.Debug("caching test spans", zap.String("traceID", traceID), zap.Int("count", len(spans)))

i.RemoteIngester.observer.EndSpanReceive(spans, nil)
}
}

Expand Down
1 change: 1 addition & 0 deletions agent/config/flags.go
Expand Up @@ -15,4 +15,5 @@ type Flags struct {
AgentApiKey string
Token string
Mode Mode
LogLevel string
}
112 changes: 112 additions & 0 deletions agent/event/observer.go
@@ -0,0 +1,112 @@
package event

import (
"github.com/kubeshop/tracetest/agent/proto"
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
)

type Observer interface {
StartTriggerExecution(*proto.TriggerRequest)
EndTriggerExecution(*proto.TriggerRequest, error)

StartTracePoll(*proto.PollingRequest)
EndTracePoll(*proto.PollingRequest, error)

StartSpanReceive([]*v1.Span)
EndSpanReceive([]*v1.Span, error)

StartDataStoreConnection(*proto.DataStoreConnectionTestRequest)
EndDataStoreConnection(*proto.DataStoreConnectionTestRequest, error)

Error(error)
}

type wrapperObserver struct {
wrappedObserver Observer
}

func NewNopObserver() Observer {
return &wrapperObserver{
wrappedObserver: nil,
}
}

func WrapObserver(observer Observer) Observer {
return &wrapperObserver{
wrappedObserver: observer,
}
}

var _ Observer = &wrapperObserver{}

func (o *wrapperObserver) StartDataStoreConnection(request *proto.DataStoreConnectionTestRequest) {
if o.wrappedObserver == nil {
return
}

o.wrappedObserver.StartDataStoreConnection(request)
}

func (o *wrapperObserver) EndDataStoreConnection(request *proto.DataStoreConnectionTestRequest, err error) {
if o.wrappedObserver == nil {
return
}

o.wrappedObserver.EndDataStoreConnection(request, err)
}

func (o *wrapperObserver) StartSpanReceive(spans []*v1.Span) {
if o.wrappedObserver == nil {
return
}

o.wrappedObserver.StartSpanReceive(spans)
}

func (o *wrapperObserver) EndSpanReceive(spans []*v1.Span, err error) {
if o.wrappedObserver == nil {
return
}

o.wrappedObserver.EndSpanReceive(spans, err)
}

func (o *wrapperObserver) Error(err error) {
if o.wrappedObserver == nil {
return
}

o.wrappedObserver.Error(err)
}

func (o *wrapperObserver) StartTracePoll(request *proto.PollingRequest) {
if o.wrappedObserver == nil {
return
}

o.wrappedObserver.StartTracePoll(request)
}

func (o *wrapperObserver) EndTracePoll(request *proto.PollingRequest, err error) {
if o.wrappedObserver == nil {
return
}

o.wrappedObserver.EndTracePoll(request, err)
}

func (o *wrapperObserver) StartTriggerExecution(request *proto.TriggerRequest) {
if o.wrappedObserver == nil {
return
}

o.wrappedObserver.StartTriggerExecution(request)
}

func (o *wrapperObserver) EndTriggerExecution(request *proto.TriggerRequest, err error) {
if o.wrappedObserver == nil {
return
}

o.wrappedObserver.EndTriggerExecution(request, err)
}
29 changes: 25 additions & 4 deletions agent/runner/runner.go
Expand Up @@ -2,19 +2,24 @@ package runner

import (
"context"
"fmt"
"os"

agentConfig "github.com/kubeshop/tracetest/agent/config"
"github.com/kubeshop/tracetest/agent/ui"

"github.com/kubeshop/tracetest/cli/config"
"github.com/kubeshop/tracetest/cli/pkg/resourcemanager"

"go.uber.org/zap"
)

type Runner struct {
configurator config.Configurator
resources *resourcemanager.Registry
ui ui.ConsoleUI
mode agentConfig.Mode
logger *zap.Logger
}

func NewRunner(configurator config.Configurator, resources *resourcemanager.Registry, ui ui.ConsoleUI) *Runner {
Expand All @@ -23,21 +28,34 @@ func NewRunner(configurator config.Configurator, resources *resourcemanager.Regi
resources: resources,
ui: ui,
mode: agentConfig.Mode_Desktop,
logger: nil,
}
}

func (s *Runner) Run(ctx context.Context, cfg config.Config, flags agentConfig.Flags) error {
s.ui.Banner(config.Version)
s.ui.Println(`Tracetest start launches a lightweight agent. It enables you to run tests and collect traces with Tracetest.
Once started, Tracetest Agent exposes OTLP ports 4317 and 4318 to ingest traces via gRCP and HTTP.`)
s.ui.Println("") // print empty line

if flags.Token == "" || flags.AgentApiKey != "" {
s.configurator = s.configurator.WithOnFinish(s.onStartAgent)
}

s.mode = flags.Mode
s.ui.Infof("Running in %s mode...", s.mode)

s.mode = flags.Mode
logger := zap.NewNop()

if enableLogging(flags.LogLevel) {
var err error
logger, err = zap.NewDevelopment()
if err != nil {
return fmt.Errorf("could not create logger: %w", err)
}
}

s.logger = logger

return s.configurator.Start(ctx, cfg, flags)
}
Expand Down Expand Up @@ -78,9 +96,12 @@ func (s *Runner) StartAgent(ctx context.Context, endpoint, agentApiKey, uiEndpoi
}

if s.mode == agentConfig.Mode_Desktop {
return RunDesktopStrategy(ctx, cfg, s.ui, uiEndpoint)
return s.RunDesktopStrategy(ctx, cfg, uiEndpoint)
}

// TODO: Add verbose strategy
return nil
return s.RunVerboseStrategy(ctx, cfg)
}

func enableLogging(logLevel string) bool {
return os.Getenv("TRACETEST_DEV") == "true" && logLevel == "debug"
}
20 changes: 10 additions & 10 deletions agent/runner/runstrategy_desktop.go
Expand Up @@ -11,22 +11,22 @@ import (
consoleUI "github.com/kubeshop/tracetest/agent/ui"
)

func RunDesktopStrategy(ctx context.Context, cfg agentConfig.Config, ui consoleUI.ConsoleUI, uiEndpoint string) error {
ui.Infof("Starting Agent with name %s...", cfg.Name)
func (s *Runner) RunDesktopStrategy(ctx context.Context, cfg agentConfig.Config, uiEndpoint string) error {
s.ui.Infof("Starting Agent with name %s...", cfg.Name)

isStarted := false
session := &Session{}

var err error

for !isStarted {
session, err = StartSession(ctx, cfg)
session, err = StartSession(ctx, cfg, nil, s.logger)
if err != nil && errors.Is(err, ErrOtlpServerStart) {
ui.Error("Tracetest Agent binds to the OpenTelemetry ports 4317 and 4318 which are used to receive trace information from your system. The agent tried to bind to these ports, but failed.")
shouldRetry := ui.Enter("Please stop the process currently listening on these ports and press enter to try again.")
s.ui.Error("Tracetest Agent binds to the OpenTelemetry ports 4317 and 4318 which are used to receive trace information from your system. The agent tried to bind to these ports, but failed.")
shouldRetry := s.ui.Enter("Please stop the process currently listening on these ports and press enter to try again.")

if !shouldRetry {
ui.Finish()
s.ui.Finish()
return err
}

Expand All @@ -51,20 +51,20 @@ You can`
options := []consoleUI.Option{{
Text: "Open Tracetest in a browser to this environment",
Fn: func(_ consoleUI.ConsoleUI) {
ui.OpenBrowser(fmt.Sprintf("%sorganizations/%s/environments/%s/dashboard", uiEndpoint, claims["organization_id"], claims["environment_id"]))
s.ui.OpenBrowser(fmt.Sprintf("%sorganizations/%s/environments/%s/dashboard", uiEndpoint, claims["organization_id"], claims["environment_id"]))
},
}, {
Text: "Stop this agent",
Fn: func(_ consoleUI.ConsoleUI) {
isOpen = false
session.Close()
ui.Finish()
s.ui.Finish()
},
}}

for isOpen {
selected := ui.Select(message, options, 0)
selected.Fn(ui)
selected := s.ui.Select(message, options, 0)
selected.Fn(s.ui)
}
return nil
}

0 comments on commit 483826e

Please sign in to comment.