Skip to content

Commit

Permalink
feat: agent dashboard (#3635)
Browse files Browse the repository at this point in the history
* feat: agent dashboard header layout (#3592)

* add initial dashboard code

* implement banner for messages in header

* commit rest of files

* fix build and set logo static

* feat: agent UI test run list (#3619)

* adapt layout to match @olha23's design

* add test run list

* docs: comment some layout explanation

* feat: agent dashboard start (#3631)

* start dashboard if agent mode is set as dashboard

* update environment and collector statistics

* feat: update list of tests based on test runs received by the agent

* update test run counter

* make dashboard as part of the desktop option list

* open test run in browser

* fix panic

* make it possible to exit and go back to dashboard

* remove dashboard mode from CLI

* Chore performance improvements (#3616)

* chore(frontend): indentifying performance improvements

* chore(frontend): identify performance improvements

* feat(frontend): hide dag view for big traces (#3606)

* Chore performance improvements tech debt 1 (#3607)

* adding async functionality

* chore(frontend): DAG improvements

* chore(frontend): DAG improvements

* chore(frontend): fixing spinner styles

* chore(frontend): fixing spinner styles

* feat(frontend): fixing tests

* chore(frontend): fixing spinner styles

* chore(frontend): FE Improvements for the Test View (#3613)

* adding async functionality

* chore(frontend): DAG improvements

* chore(frontend): DAG improvements

* chore(frontend): fixing spinner styles

* chore(frontend): fixing spinner styles

* feat(frontend): fixing tests

* chore(frontend): fixing spinner styles

* chore(frontend): FE Improvements for the Test View

* chore(frontend): reverting editor changes

* chore(frontend): Adding memoization for useComplete hook

* chore(frontend): Adding Search Service usage

* chore(frontend): cleanup

* feat(frontend): implement virtual list for timeline view (#3617)

* feat(frontend): implement virtual list for timeline view

* remove prop

* add header and collapse

* feat: add timeline view connectors (#3623)

* feat(frontend): Implemeting new Timeline for the Test Page (#3627)

* feat(frontend): Implemeting new Timeline for the Test Page

* feat(frontend): Fixing tests

* feat(frontend): Fixing tests

* Chore performance improvements span search (#3629)

* feat(frontend): Implemeting new Timeline for the Test Page

* feat(frontend): Implementing span search for analyzer and test views

* feat(frontend): read improvements

* feat(frontend): adding single line input component

* feat(frontend): updating analyzer styles

* feat(frontend): Fixing tests

* feat(frontend): Fixing tests

---------

Co-authored-by: Jorge Padilla <jorge.esteban.padilla@gmail.com>

* disable logger in dashboard mode

* keep old log level

* add comment about why we are disabling logs in dashboard mode

---------

Co-authored-by: Oscar Reyes <oscar-rreyes1@hotmail.com>
Co-authored-by: Jorge Padilla <jorge.esteban.padilla@gmail.com>

---------

Co-authored-by: Oscar Reyes <oscar-rreyes1@hotmail.com>
Co-authored-by: Jorge Padilla <jorge.esteban.padilla@gmail.com>
  • Loading branch information
3 people committed Feb 13, 2024
1 parent d9b8d5a commit 7892f7b
Show file tree
Hide file tree
Showing 30 changed files with 1,358 additions and 43 deletions.
23 changes: 22 additions & 1 deletion agent/collector/collector.go
Expand Up @@ -9,11 +9,14 @@ import (
"time"

"github.com/kubeshop/tracetest/agent/event"
"github.com/kubeshop/tracetest/agent/ui/dashboard/sensors"
"github.com/kubeshop/tracetest/server/otlp"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

var activeCollector Collector

type Config struct {
HTTPPort int
GRPCPort int
Expand Down Expand Up @@ -48,6 +51,12 @@ func WithObserver(observer event.Observer) CollectorOption {
}
}

func WithSensor(sensor sensors.Sensor) CollectorOption {
return func(ric *remoteIngesterConfig) {
ric.sensor = sensor
}
}

type collector struct {
grpcServer stoppable
httpServer stoppable
Expand All @@ -60,6 +69,8 @@ type Collector interface {

Statistics() Statistics
ResetStatistics()

SetSensor(sensors.Sensor)
}

// Stop implements stoppable.
Expand All @@ -76,12 +87,21 @@ func (c *collector) ResetStatistics() {
c.ingester.ResetStatistics()
}

func (c *collector) SetSensor(sensor sensors.Sensor) {
c.ingester.SetSensor(sensor)
}

func GetActiveCollector() Collector {
return activeCollector
}

func Start(ctx context.Context, config Config, tracer trace.Tracer, opts ...CollectorOption) (Collector, error) {
ingesterConfig := remoteIngesterConfig{
URL: config.RemoteServerURL,
Token: config.RemoteServerToken,
logger: zap.NewNop(),
observer: event.NewNopObserver(),
sensor: sensors.NewSensor(),
}

for _, opt := range opts {
Expand Down Expand Up @@ -115,7 +135,8 @@ func Start(ctx context.Context, config Config, tracer trace.Tracer, opts ...Coll
return nil, fmt.Errorf("could not start HTTP OTLP listener: %w", err)
}

return &collector{grpcServer: grpcServer, httpServer: httpServer, ingester: ingester}, nil
activeCollector = &collector{grpcServer: grpcServer, httpServer: httpServer, ingester: ingester}
return activeCollector, nil
}

func onProcessTermination(callback func()) {
Expand Down
17 changes: 17 additions & 0 deletions agent/collector/ingester.go
Expand Up @@ -8,6 +8,8 @@ import (
"time"

"github.com/kubeshop/tracetest/agent/event"
"github.com/kubeshop/tracetest/agent/ui/dashboard/events"
"github.com/kubeshop/tracetest/agent/ui/dashboard/sensors"
"github.com/kubeshop/tracetest/server/otlp"
"go.opencensus.io/trace"
pb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
Expand All @@ -27,16 +29,20 @@ type ingester interface {

Statistics() Statistics
ResetStatistics()

SetSensor(sensors.Sensor)
}

func newForwardIngester(ctx context.Context, batchTimeout time.Duration, cfg remoteIngesterConfig, startRemoteServer bool) (ingester, error) {
ingester := &forwardIngester{
BatchTimeout: batchTimeout,
RemoteIngester: cfg,
buffer: &buffer{},
traceIDs: make(map[string]bool, 0),
done: make(chan bool),
traceCache: cfg.traceCache,
logger: cfg.logger,
sensor: cfg.sensor,
}

if startRemoteServer {
Expand All @@ -63,9 +69,11 @@ type forwardIngester struct {
RemoteIngester remoteIngesterConfig
client pb.TraceServiceClient
buffer *buffer
traceIDs map[string]bool
done chan bool
traceCache TraceCache
logger *zap.Logger
sensor sensors.Sensor

statistics Statistics
}
Expand All @@ -77,6 +85,7 @@ type remoteIngesterConfig struct {
startRemoteServer bool
logger *zap.Logger
observer event.Observer
sensor sensors.Sensor
}

type buffer struct {
Expand All @@ -92,6 +101,10 @@ func (i *forwardIngester) ResetStatistics() {
i.statistics = Statistics{}
}

func (i *forwardIngester) SetSensor(sensor sensors.Sensor) {
i.sensor = sensor
}

func (i *forwardIngester) Ingest(ctx context.Context, request *pb.ExportTraceServiceRequest, requestType otlp.RequestType) (*pb.ExportTraceServiceResponse, error) {
spanCount := countSpans(request)
i.buffer.mutex.Lock()
Expand All @@ -100,6 +113,8 @@ func (i *forwardIngester) Ingest(ctx context.Context, request *pb.ExportTraceSer
i.statistics.SpanCount += int64(spanCount)
i.statistics.LastSpanTimestamp = time.Now()

i.sensor.Emit(events.SpanCountUpdated, i.statistics.SpanCount)

i.buffer.mutex.Unlock()
i.logger.Debug("received spans", zap.Int("count", spanCount))

Expand All @@ -108,6 +123,7 @@ func (i *forwardIngester) Ingest(ctx context.Context, request *pb.ExportTraceSer
// In case of OTLP datastore, those spans will be polled from this cache instead
// of a real datastore
i.cacheTestSpans(request.ResourceSpans)
i.sensor.Emit(events.TraceCountUpdated, len(i.traceIDs))
}

return &pb.ExportTraceServiceResponse{
Expand Down Expand Up @@ -208,6 +224,7 @@ func (i *forwardIngester) cacheTestSpans(resourceSpans []*v1.ResourceSpans) {
i.logger.Debug("caching test spans", zap.Int("count", len(spans)))

for traceID, spans := range spans {
i.traceIDs[traceID] = true
if _, ok := i.traceCache.Get(traceID); !ok {
i.logger.Debug("traceID is not part of a test", zap.String("traceID", traceID))
// traceID is not part of a test
Expand Down
6 changes: 6 additions & 0 deletions agent/config/config.go
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"path"
"regexp"
"strings"

"github.com/spf13/viper"
Expand All @@ -17,6 +18,11 @@ type Config struct {
OTLPServer OtlpServer `mapstructure:"otlp_server"`
}

func (c Config) APIEndpoint() string {
regex := regexp.MustCompile(":[0-9]+$")
return string(regex.ReplaceAll([]byte(c.ServerURL), []byte("")))
}

type OtlpServer struct {
GRPCPort int `mapstructure:"grpc_port"`
HTTPPort int `mapstructure:"http_port"`
Expand Down
56 changes: 56 additions & 0 deletions agent/runner/runner.go
Expand Up @@ -2,16 +2,20 @@ package runner

import (
"context"
"errors"
"fmt"
"os"

"github.com/golang-jwt/jwt"
agentConfig "github.com/kubeshop/tracetest/agent/config"
"github.com/kubeshop/tracetest/agent/event"
"github.com/kubeshop/tracetest/agent/ui"

"github.com/kubeshop/tracetest/cli/config"
"github.com/kubeshop/tracetest/cli/pkg/resourcemanager"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

type Runner struct {
Expand All @@ -20,6 +24,8 @@ type Runner struct {
ui ui.ConsoleUI
mode agentConfig.Mode
logger *zap.Logger
loggerLevel *zap.AtomicLevel
claims jwt.MapClaims
}

func NewRunner(configurator config.Configurator, resources *resourcemanager.Registry, ui ui.ConsoleUI) *Runner {
Expand Down Expand Up @@ -49,10 +55,21 @@ Once started, Tracetest Agent exposes OTLP ports 4317 and 4318 to ingest traces

if enableLogging(flags.LogLevel) {
var err error
atom := zap.NewAtomicLevel()
logger, err = zap.NewDevelopment()
if err != nil {
return fmt.Errorf("could not create logger: %w", err)
}

logger = logger.WithOptions(zap.WrapCore(func(c zapcore.Core) zapcore.Core {
return zapcore.NewCore(
zapcore.NewJSONEncoder(zap.NewDevelopmentEncoderConfig()),
zapcore.Lock(os.Stdout),
atom,
)
}))

s.loggerLevel = &atom
}

s.logger = logger
Expand Down Expand Up @@ -110,3 +127,42 @@ func (s *Runner) StartAgent(ctx context.Context, endpoint, agentApiKey, uiEndpoi
func enableLogging(logLevel string) bool {
return os.Getenv("TRACETEST_DEV") == "true" && logLevel == "debug"
}

func (s *Runner) authenticate(ctx context.Context, cfg agentConfig.Config, observer event.Observer) (*Session, jwt.MapClaims, error) {
isStarted := false
session := &Session{}

var err error

for !isStarted {
session, err = StartSession(ctx, cfg, observer, s.logger)
if err != nil && errors.Is(err, ErrOtlpServerStart) {
s.ui.Error("Tracetest Agent binds to the OpenTelemetry ports 4317 and 4318 which are used to receive trace information from your system. The agent tried to bind to these ports, but failed.")
shouldRetry := s.ui.Enter("Please stop the process currently listening on these ports and press enter to try again.")

if !shouldRetry {
s.ui.Finish()
return nil, nil, err
}

continue
}

if err != nil {
return nil, nil, err
}

isStarted = true
}

claims, err := config.GetTokenClaims(session.Token)
if err != nil {
return nil, nil, err
}
s.claims = claims
return session, claims, nil
}

func (s *Runner) getCurrentSessionClaims() jwt.MapClaims {
return s.claims
}

0 comments on commit 7892f7b

Please sign in to comment.