Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add verbose mode for Tracetest agent #3385

Merged
merged 3 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 11 additions & 3 deletions agent/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"syscall"
"time"

"github.com/kubeshop/tracetest/agent/event"
"github.com/kubeshop/tracetest/server/otlp"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
Expand Down Expand Up @@ -41,6 +42,12 @@ func WithLogger(logger *zap.Logger) CollectorOption {
}
}

func WithObserver(observer event.Observer) CollectorOption {
return func(ric *remoteIngesterConfig) {
ric.observer = observer
}
}

type collector struct {
grpcServer stoppable
httpServer stoppable
Expand All @@ -54,9 +61,10 @@ func (c *collector) Stop() {

func Start(ctx context.Context, config Config, tracer trace.Tracer, opts ...CollectorOption) (stoppable, error) {
ingesterConfig := remoteIngesterConfig{
URL: config.RemoteServerURL,
Token: config.RemoteServerToken,
logger: zap.NewNop(),
URL: config.RemoteServerURL,
Token: config.RemoteServerToken,
logger: zap.NewNop(),
observer: event.NewNopObserver(),
}

for _, opt := range opts {
Expand Down
7 changes: 6 additions & 1 deletion agent/collector/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/kubeshop/tracetest/agent/event"
"github.com/kubeshop/tracetest/server/otlp"
"go.opencensus.io/trace"
pb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
Expand Down Expand Up @@ -60,6 +61,7 @@ type remoteIngesterConfig struct {
traceCache TraceCache
startRemoteServer bool
logger *zap.Logger
observer event.Observer
}

type buffer struct {
Expand Down Expand Up @@ -136,7 +138,6 @@ func (i *forwardIngester) executeBatch(ctx context.Context) error {
}

i.logger.Debug("successfully forwarded spans", zap.Int("count", len(newSpans)))

return nil
}

Expand Down Expand Up @@ -174,8 +175,12 @@ func (i *forwardIngester) cacheTestSpans(resourceSpans []*v1.ResourceSpans) {
continue
}

i.RemoteIngester.observer.StartSpanReceive(spans)

i.traceCache.Append(traceID, spans)
i.logger.Debug("caching test spans", zap.String("traceID", traceID), zap.Int("count", len(spans)))

i.RemoteIngester.observer.EndSpanReceive(spans, nil)
}
}

Expand Down
1 change: 1 addition & 0 deletions agent/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ type Flags struct {
AgentApiKey string
Token string
Mode Mode
LogLevel string
}
112 changes: 112 additions & 0 deletions agent/event/observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package event

import (
"github.com/kubeshop/tracetest/agent/proto"
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
)

type Observer interface {
StartTriggerExecution(*proto.TriggerRequest)
EndTriggerExecution(*proto.TriggerRequest, error)

StartTracePoll(*proto.PollingRequest)
EndTracePoll(*proto.PollingRequest, error)

StartSpanReceive([]*v1.Span)
EndSpanReceive([]*v1.Span, error)

StartDataStoreConnection(*proto.DataStoreConnectionTestRequest)
EndDataStoreConnection(*proto.DataStoreConnectionTestRequest, error)

Error(error)
}

type wrapperObserver struct {
wrappedObserver Observer
}

func NewNopObserver() Observer {
return &wrapperObserver{
wrappedObserver: nil,
}
}

func WrapObserver(observer Observer) Observer {
return &wrapperObserver{
wrappedObserver: observer,
}
}

var _ Observer = &wrapperObserver{}

func (o *wrapperObserver) StartDataStoreConnection(request *proto.DataStoreConnectionTestRequest) {
if o.wrappedObserver == nil {
return
}

o.wrappedObserver.StartDataStoreConnection(request)
}

func (o *wrapperObserver) EndDataStoreConnection(request *proto.DataStoreConnectionTestRequest, err error) {
if o.wrappedObserver == nil {
return
}

o.wrappedObserver.EndDataStoreConnection(request, err)
}

func (o *wrapperObserver) StartSpanReceive(spans []*v1.Span) {
if o.wrappedObserver == nil {
return
}

o.wrappedObserver.StartSpanReceive(spans)
}

func (o *wrapperObserver) EndSpanReceive(spans []*v1.Span, err error) {
if o.wrappedObserver == nil {
return
}

o.wrappedObserver.EndSpanReceive(spans, err)
}

func (o *wrapperObserver) Error(err error) {
if o.wrappedObserver == nil {
return
}

o.wrappedObserver.Error(err)
}

func (o *wrapperObserver) StartTracePoll(request *proto.PollingRequest) {
if o.wrappedObserver == nil {
return
}

o.wrappedObserver.StartTracePoll(request)
}

func (o *wrapperObserver) EndTracePoll(request *proto.PollingRequest, err error) {
if o.wrappedObserver == nil {
return
}

o.wrappedObserver.EndTracePoll(request, err)
}

func (o *wrapperObserver) StartTriggerExecution(request *proto.TriggerRequest) {
if o.wrappedObserver == nil {
return
}

o.wrappedObserver.StartTriggerExecution(request)
}

func (o *wrapperObserver) EndTriggerExecution(request *proto.TriggerRequest, err error) {
if o.wrappedObserver == nil {
return
}

o.wrappedObserver.EndTriggerExecution(request, err)
}
29 changes: 25 additions & 4 deletions agent/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@ package runner

import (
"context"
"fmt"
"os"

agentConfig "github.com/kubeshop/tracetest/agent/config"
"github.com/kubeshop/tracetest/agent/ui"

"github.com/kubeshop/tracetest/cli/config"
"github.com/kubeshop/tracetest/cli/pkg/resourcemanager"

"go.uber.org/zap"
)

type Runner struct {
configurator config.Configurator
resources *resourcemanager.Registry
ui ui.ConsoleUI
mode agentConfig.Mode
logger *zap.Logger
}

func NewRunner(configurator config.Configurator, resources *resourcemanager.Registry, ui ui.ConsoleUI) *Runner {
Expand All @@ -23,21 +28,34 @@ func NewRunner(configurator config.Configurator, resources *resourcemanager.Regi
resources: resources,
ui: ui,
mode: agentConfig.Mode_Desktop,
logger: nil,
}
}

func (s *Runner) Run(ctx context.Context, cfg config.Config, flags agentConfig.Flags) error {
s.ui.Banner(config.Version)
s.ui.Println(`Tracetest start launches a lightweight agent. It enables you to run tests and collect traces with Tracetest.
Once started, Tracetest Agent exposes OTLP ports 4317 and 4318 to ingest traces via gRCP and HTTP.`)
s.ui.Println("") // print empty line

if flags.Token == "" || flags.AgentApiKey != "" {
s.configurator = s.configurator.WithOnFinish(s.onStartAgent)
}

s.mode = flags.Mode
s.ui.Infof("Running in %s mode...", s.mode)

s.mode = flags.Mode
logger := zap.NewNop()

if enableLogging(flags.LogLevel) {
var err error
logger, err = zap.NewDevelopment()
if err != nil {
return fmt.Errorf("could not create logger: %w", err)
}
}

s.logger = logger

return s.configurator.Start(ctx, cfg, flags)
}
Expand Down Expand Up @@ -78,9 +96,12 @@ func (s *Runner) StartAgent(ctx context.Context, endpoint, agentApiKey, uiEndpoi
}

if s.mode == agentConfig.Mode_Desktop {
return RunDesktopStrategy(ctx, cfg, s.ui, uiEndpoint)
return s.RunDesktopStrategy(ctx, cfg, uiEndpoint)
}

// TODO: Add verbose strategy
return nil
return s.RunVerboseStrategy(ctx, cfg)
}

func enableLogging(logLevel string) bool {
return os.Getenv("TRACETEST_DEV") == "true" && logLevel == "debug"
}
20 changes: 10 additions & 10 deletions agent/runner/runstrategy_desktop.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,22 @@ import (
consoleUI "github.com/kubeshop/tracetest/agent/ui"
)

func RunDesktopStrategy(ctx context.Context, cfg agentConfig.Config, ui consoleUI.ConsoleUI, uiEndpoint string) error {
ui.Infof("Starting Agent with name %s...", cfg.Name)
func (s *Runner) RunDesktopStrategy(ctx context.Context, cfg agentConfig.Config, uiEndpoint string) error {
s.ui.Infof("Starting Agent with name %s...", cfg.Name)

isStarted := false
session := &Session{}

var err error

for !isStarted {
session, err = StartSession(ctx, cfg)
session, err = StartSession(ctx, cfg, nil, s.logger)
if err != nil && errors.Is(err, ErrOtlpServerStart) {
ui.Error("Tracetest Agent binds to the OpenTelemetry ports 4317 and 4318 which are used to receive trace information from your system. The agent tried to bind to these ports, but failed.")
shouldRetry := ui.Enter("Please stop the process currently listening on these ports and press enter to try again.")
s.ui.Error("Tracetest Agent binds to the OpenTelemetry ports 4317 and 4318 which are used to receive trace information from your system. The agent tried to bind to these ports, but failed.")
shouldRetry := s.ui.Enter("Please stop the process currently listening on these ports and press enter to try again.")

if !shouldRetry {
ui.Finish()
s.ui.Finish()
return err
}

Expand All @@ -51,20 +51,20 @@ You can`
options := []consoleUI.Option{{
Text: "Open Tracetest in a browser to this environment",
Fn: func(_ consoleUI.ConsoleUI) {
ui.OpenBrowser(fmt.Sprintf("%sorganizations/%s/environments/%s/dashboard", uiEndpoint, claims["organization_id"], claims["environment_id"]))
s.ui.OpenBrowser(fmt.Sprintf("%sorganizations/%s/environments/%s/dashboard", uiEndpoint, claims["organization_id"], claims["environment_id"]))
},
}, {
Text: "Stop this agent",
Fn: func(_ consoleUI.ConsoleUI) {
isOpen = false
session.Close()
ui.Finish()
s.ui.Finish()
},
}}

for isOpen {
selected := ui.Select(message, options, 0)
selected.Fn(ui)
selected := s.ui.Select(message, options, 0)
selected.Fn(s.ui)
}
return nil
}