Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: agent dashboard #3635

Merged
merged 5 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 22 additions & 1 deletion agent/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ import (
"time"

"github.com/kubeshop/tracetest/agent/event"
"github.com/kubeshop/tracetest/agent/ui/dashboard/sensors"
"github.com/kubeshop/tracetest/server/otlp"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

var activeCollector Collector

type Config struct {
HTTPPort int
GRPCPort int
Expand Down Expand Up @@ -48,6 +51,12 @@ func WithObserver(observer event.Observer) CollectorOption {
}
}

func WithSensor(sensor sensors.Sensor) CollectorOption {
return func(ric *remoteIngesterConfig) {
ric.sensor = sensor
}
}

type collector struct {
grpcServer stoppable
httpServer stoppable
Expand All @@ -60,6 +69,8 @@ type Collector interface {

Statistics() Statistics
ResetStatistics()

SetSensor(sensors.Sensor)
}

// Stop implements stoppable.
Expand All @@ -76,12 +87,21 @@ func (c *collector) ResetStatistics() {
c.ingester.ResetStatistics()
}

func (c *collector) SetSensor(sensor sensors.Sensor) {
c.ingester.SetSensor(sensor)
}

func GetActiveCollector() Collector {
return activeCollector
}

func Start(ctx context.Context, config Config, tracer trace.Tracer, opts ...CollectorOption) (Collector, error) {
ingesterConfig := remoteIngesterConfig{
URL: config.RemoteServerURL,
Token: config.RemoteServerToken,
logger: zap.NewNop(),
observer: event.NewNopObserver(),
sensor: sensors.NewSensor(),
}

for _, opt := range opts {
Expand Down Expand Up @@ -115,7 +135,8 @@ func Start(ctx context.Context, config Config, tracer trace.Tracer, opts ...Coll
return nil, fmt.Errorf("could not start HTTP OTLP listener: %w", err)
}

return &collector{grpcServer: grpcServer, httpServer: httpServer, ingester: ingester}, nil
activeCollector = &collector{grpcServer: grpcServer, httpServer: httpServer, ingester: ingester}
return activeCollector, nil
}

func onProcessTermination(callback func()) {
Expand Down
17 changes: 17 additions & 0 deletions agent/collector/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"time"

"github.com/kubeshop/tracetest/agent/event"
"github.com/kubeshop/tracetest/agent/ui/dashboard/events"
"github.com/kubeshop/tracetest/agent/ui/dashboard/sensors"
"github.com/kubeshop/tracetest/server/otlp"
"go.opencensus.io/trace"
pb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
Expand All @@ -27,16 +29,20 @@ type ingester interface {

Statistics() Statistics
ResetStatistics()

SetSensor(sensors.Sensor)
}

func newForwardIngester(ctx context.Context, batchTimeout time.Duration, cfg remoteIngesterConfig, startRemoteServer bool) (ingester, error) {
ingester := &forwardIngester{
BatchTimeout: batchTimeout,
RemoteIngester: cfg,
buffer: &buffer{},
traceIDs: make(map[string]bool, 0),
done: make(chan bool),
traceCache: cfg.traceCache,
logger: cfg.logger,
sensor: cfg.sensor,
}

if startRemoteServer {
Expand All @@ -63,9 +69,11 @@ type forwardIngester struct {
RemoteIngester remoteIngesterConfig
client pb.TraceServiceClient
buffer *buffer
traceIDs map[string]bool
done chan bool
traceCache TraceCache
logger *zap.Logger
sensor sensors.Sensor

statistics Statistics
}
Expand All @@ -77,6 +85,7 @@ type remoteIngesterConfig struct {
startRemoteServer bool
logger *zap.Logger
observer event.Observer
sensor sensors.Sensor
}

type buffer struct {
Expand All @@ -92,6 +101,10 @@ func (i *forwardIngester) ResetStatistics() {
i.statistics = Statistics{}
}

func (i *forwardIngester) SetSensor(sensor sensors.Sensor) {
i.sensor = sensor
}

func (i *forwardIngester) Ingest(ctx context.Context, request *pb.ExportTraceServiceRequest, requestType otlp.RequestType) (*pb.ExportTraceServiceResponse, error) {
spanCount := countSpans(request)
i.buffer.mutex.Lock()
Expand All @@ -100,6 +113,8 @@ func (i *forwardIngester) Ingest(ctx context.Context, request *pb.ExportTraceSer
i.statistics.SpanCount += int64(spanCount)
i.statistics.LastSpanTimestamp = time.Now()

i.sensor.Emit(events.SpanCountUpdated, i.statistics.SpanCount)

i.buffer.mutex.Unlock()
i.logger.Debug("received spans", zap.Int("count", spanCount))

Expand All @@ -108,6 +123,7 @@ func (i *forwardIngester) Ingest(ctx context.Context, request *pb.ExportTraceSer
// In case of OTLP datastore, those spans will be polled from this cache instead
// of a real datastore
i.cacheTestSpans(request.ResourceSpans)
i.sensor.Emit(events.TraceCountUpdated, len(i.traceIDs))
}

return &pb.ExportTraceServiceResponse{
Expand Down Expand Up @@ -208,6 +224,7 @@ func (i *forwardIngester) cacheTestSpans(resourceSpans []*v1.ResourceSpans) {
i.logger.Debug("caching test spans", zap.Int("count", len(spans)))

for traceID, spans := range spans {
i.traceIDs[traceID] = true
if _, ok := i.traceCache.Get(traceID); !ok {
i.logger.Debug("traceID is not part of a test", zap.String("traceID", traceID))
// traceID is not part of a test
Expand Down
6 changes: 6 additions & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"path"
"regexp"
"strings"

"github.com/spf13/viper"
Expand All @@ -17,6 +18,11 @@ type Config struct {
OTLPServer OtlpServer `mapstructure:"otlp_server"`
}

func (c Config) APIEndpoint() string {
regex := regexp.MustCompile(":[0-9]+$")
return string(regex.ReplaceAll([]byte(c.ServerURL), []byte("")))
}

type OtlpServer struct {
GRPCPort int `mapstructure:"grpc_port"`
HTTPPort int `mapstructure:"http_port"`
Expand Down
56 changes: 56 additions & 0 deletions agent/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ package runner

import (
"context"
"errors"
"fmt"
"os"

"github.com/golang-jwt/jwt"
agentConfig "github.com/kubeshop/tracetest/agent/config"
"github.com/kubeshop/tracetest/agent/event"
"github.com/kubeshop/tracetest/agent/ui"

"github.com/kubeshop/tracetest/cli/config"
"github.com/kubeshop/tracetest/cli/pkg/resourcemanager"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

type Runner struct {
Expand All @@ -20,6 +24,8 @@ type Runner struct {
ui ui.ConsoleUI
mode agentConfig.Mode
logger *zap.Logger
loggerLevel *zap.AtomicLevel
claims jwt.MapClaims
}

func NewRunner(configurator config.Configurator, resources *resourcemanager.Registry, ui ui.ConsoleUI) *Runner {
Expand Down Expand Up @@ -49,10 +55,21 @@ Once started, Tracetest Agent exposes OTLP ports 4317 and 4318 to ingest traces

if enableLogging(flags.LogLevel) {
var err error
atom := zap.NewAtomicLevel()
logger, err = zap.NewDevelopment()
if err != nil {
return fmt.Errorf("could not create logger: %w", err)
}

logger = logger.WithOptions(zap.WrapCore(func(c zapcore.Core) zapcore.Core {
return zapcore.NewCore(
zapcore.NewJSONEncoder(zap.NewDevelopmentEncoderConfig()),
zapcore.Lock(os.Stdout),
atom,
)
}))

s.loggerLevel = &atom
}

s.logger = logger
Expand Down Expand Up @@ -110,3 +127,42 @@ func (s *Runner) StartAgent(ctx context.Context, endpoint, agentApiKey, uiEndpoi
func enableLogging(logLevel string) bool {
return os.Getenv("TRACETEST_DEV") == "true" && logLevel == "debug"
}

func (s *Runner) authenticate(ctx context.Context, cfg agentConfig.Config, observer event.Observer) (*Session, jwt.MapClaims, error) {
isStarted := false
session := &Session{}

var err error

for !isStarted {
session, err = StartSession(ctx, cfg, observer, s.logger)
if err != nil && errors.Is(err, ErrOtlpServerStart) {
s.ui.Error("Tracetest Agent binds to the OpenTelemetry ports 4317 and 4318 which are used to receive trace information from your system. The agent tried to bind to these ports, but failed.")
shouldRetry := s.ui.Enter("Please stop the process currently listening on these ports and press enter to try again.")

if !shouldRetry {
s.ui.Finish()
return nil, nil, err
}

continue
}

if err != nil {
return nil, nil, err
}

isStarted = true
}

claims, err := config.GetTokenClaims(session.Token)
if err != nil {
return nil, nil, err
}
s.claims = claims
return session, claims, nil
}

func (s *Runner) getCurrentSessionClaims() jwt.MapClaims {
return s.claims
}