Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): add open telemetry support for Tracetest agent #3662

Merged
merged 6 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/kubeshop/tracetest/agent/proto"
"go.uber.org/zap"
"google.golang.org/grpc"

"go.opentelemetry.io/otel/trace"
)

const (
Expand Down Expand Up @@ -43,6 +45,7 @@ type Client struct {
done chan bool

logger *zap.Logger
tracer trace.Tracer

stopListener func(context.Context, *proto.StopRequest) error
triggerListener func(context.Context, *proto.TriggerRequest) error
Expand Down Expand Up @@ -98,7 +101,7 @@ func (c *Client) Start(ctx context.Context) error {
return err
}

err = c.startHearthBeat(ctx)
err = c.startHeartBeat(ctx)
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions agent/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"time"

"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -31,3 +32,9 @@ func WithLogger(logger *zap.Logger) Option {
c.logger = logger
}
}

func WithTracer(tracer trace.Tracer) Option {
return func(c *Client) {
c.tracer = tracer
}
}
2 changes: 1 addition & 1 deletion agent/client/workflow_ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/kubeshop/tracetest/agent/proto"
)

func (c *Client) startHearthBeat(ctx context.Context) error {
func (c *Client) startHeartBeat(ctx context.Context) error {
client := proto.NewOrchestratorClient(c.conn)
ticker := time.NewTicker(c.config.PingPeriod)

Expand Down
10 changes: 7 additions & 3 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
)

type Config struct {
APIKey string `mapstructure:"api_key"`
Name string `mapstructure:"agent_name"`
ServerURL string `mapstructure:"server_url"`
APIKey string `mapstructure:"api_key"`
Name string `mapstructure:"agent_name"`
ServerURL string `mapstructure:"server_url"`
CollectorEndpoint string `mapstructure:"collector_endpoint"`
Mode string `mapstructure:"mode"`

OTLPServer OtlpServer `mapstructure:"otlp_server"`
}
Expand Down Expand Up @@ -45,6 +47,8 @@ func LoadConfig() (Config, error) {
vp.SetDefault("AGENT_NAME", getHostname())
vp.SetDefault("API_KEY", "")
vp.SetDefault("SERVER_URL", "https://app.tracetest.io")
vp.SetDefault("COLLECTOR_ENDPOINT", "")
vp.SetDefault("MODE", "")
vp.SetDefault("OTLP_SERVER.GRPC_PORT", 4317)
vp.SetDefault("OTLP_SERVER.HTTP_PORT", 4318)

Expand Down
17 changes: 9 additions & 8 deletions agent/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ const (
)

type Flags struct {
ServerURL string
OrganizationID string
EnvironmentID string
CI bool
AgentApiKey string
Token string
Mode Mode
LogLevel string
ServerURL string
OrganizationID string
EnvironmentID string
CI bool
AgentApiKey string
Token string
Mode Mode
LogLevel string
CollectorEndpoint string
}
39 changes: 26 additions & 13 deletions agent/runner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/kubeshop/tracetest/agent/config"
"github.com/kubeshop/tracetest/agent/event"
"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/agent/telemetry"
"github.com/kubeshop/tracetest/agent/workers"
"github.com/kubeshop/tracetest/agent/workers/poller"

Expand All @@ -36,8 +37,14 @@ func (s *Session) WaitUntilDisconnected() {
func StartSession(ctx context.Context, cfg config.Config, observer event.Observer, logger *zap.Logger) (*Session, error) {
observer = event.WrapObserver(observer)

tracer, err := telemetry.GetTracer(ctx, cfg.CollectorEndpoint, cfg.Name)
if err != nil {
observer.Error(err)
return nil, err
}

traceCache := collector.NewTraceCache()
controlPlaneClient, err := newControlPlaneClient(ctx, cfg, traceCache, observer, logger)
controlPlaneClient, err := newControlPlaneClient(ctx, cfg, traceCache, observer, logger, tracer)
if err != nil {
return nil, err
}
Expand All @@ -47,7 +54,7 @@ func StartSession(ctx context.Context, cfg config.Config, observer event.Observe
return nil, err
}

agentCollector, err := StartCollector(ctx, cfg, traceCache, observer, logger)
agentCollector, err := StartCollector(ctx, cfg, traceCache, observer, logger, tracer)
if err != nil {
return nil, err
}
Expand All @@ -73,8 +80,8 @@ func StartSession(ctx context.Context, cfg config.Config, observer event.Observe
}, nil
}

func StartCollector(ctx context.Context, config config.Config, traceCache collector.TraceCache, observer event.Observer, logger *zap.Logger) (collector.Collector, error) {
noopTracer := trace.NewNoopTracerProvider().Tracer("noop")
func StartCollector(ctx context.Context, config config.Config, traceCache collector.TraceCache, observer event.Observer, logger *zap.Logger, tracer trace.Tracer) (collector.Collector, error) {

collectorConfig := collector.Config{
HTTPPort: config.OTLPServer.HTTPPort,
GRPCPort: config.OTLPServer.GRPCPort,
Expand All @@ -90,7 +97,7 @@ func StartCollector(ctx context.Context, config config.Config, traceCache collec
collector, err := collector.Start(
ctx,
collectorConfig,
noopTracer,
tracer,
opts...,
)
if err != nil {
Expand All @@ -100,7 +107,7 @@ func StartCollector(ctx context.Context, config config.Config, traceCache collec
return collector, nil
}

func newControlPlaneClient(ctx context.Context, config config.Config, traceCache collector.TraceCache, observer event.Observer, logger *zap.Logger) (*client.Client, error) {
func newControlPlaneClient(ctx context.Context, config config.Config, traceCache collector.TraceCache, observer event.Observer, logger *zap.Logger, tracer trace.Tracer) (*client.Client, error) {
controlPlaneClient, err := client.Connect(ctx, config.ServerURL,
client.WithAPIKey(config.APIKey),
client.WithAgentName(config.Name),
Expand All @@ -116,34 +123,40 @@ func newControlPlaneClient(ctx context.Context, config config.Config, traceCache
stopWorker := workers.NewStopperWorker(
workers.WithStopperObserver(observer),
workers.WithStopperCancelFuncList(processStopper.CancelMap()),
workers.WithStopperTracer(tracer),
)

triggerWorker := workers.NewTriggerWorker(
controlPlaneClient,
workers.WithTraceCache(traceCache),
workers.WithTriggerObserver(observer),
workers.WithTriggerStoppableProcessRunner(processStopper.RunStoppableProcess),
workers.WithTriggerLogger(logger),
workers.WithTriggerTracer(tracer),
)

pollingWorker := workers.NewPollerWorker(
controlPlaneClient,
workers.WithInMemoryDatastore(poller.NewInMemoryDatastore(traceCache)),
workers.WithObserver(observer),
workers.WithPollerObserver(observer),
workers.WithPollerStoppableProcessRunner(processStopper.RunStoppableProcess),
workers.WithPollerLogger(logger),
workers.WithPollerTracer(tracer),
)

dataStoreTestConnectionWorker := workers.NewTestConnectionWorker(controlPlaneClient, observer)

triggerWorker.SetLogger(logger)
pollingWorker.SetLogger(logger)
dataStoreTestConnectionWorker.SetLogger(logger)
dataStoreTestConnectionWorker := workers.NewTestConnectionWorker(
controlPlaneClient,
workers.WithTestConnectionLogger(logger),
workers.WithTestConnectionObserver(observer),
workers.WithTestConnectionTracer(tracer),
)

controlPlaneClient.OnDataStoreTestConnectionRequest(dataStoreTestConnectionWorker.Test)
controlPlaneClient.OnStopRequest(stopWorker.Stop)
controlPlaneClient.OnTriggerRequest(triggerWorker.Trigger)
controlPlaneClient.OnPollingRequest(pollingWorker.Poll)
controlPlaneClient.OnConnectionClosed(func(ctx context.Context, sr *proto.ShutdownRequest) error {
fmt.Printf("Server terminated the connection with the agent. Reason: %s\n", sr.Reason)
logger.Info(fmt.Sprintf("Server terminated the connection with the agent. Reason: %s\n", sr.Reason))
return controlPlaneClient.Close()
})

Expand Down
88 changes: 88 additions & 0 deletions agent/telemetry/tracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package telemetry

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdkTrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const spanExporterTimeout = 1 * time.Minute

func GetTracer(ctx context.Context, otelExporterEndpoint, serviceName string) (trace.Tracer, error) {
if otelExporterEndpoint == "" {
// fallback, return noop
return trace.NewNoopTracerProvider().Tracer("noop"), nil
}

realServiceName := fmt.Sprintf("tracetestAgent_%s", serviceName)

spanExporter, err := newSpanExporter(ctx, otelExporterEndpoint)
if err != nil {
return nil, fmt.Errorf("failed to setup span exporter: %w", err)
}

traceProvider, err := newTraceProvider(ctx, spanExporter, realServiceName)
if err != nil {
return nil, fmt.Errorf("failed to setup trace provider: %w", err)
}

return traceProvider.Tracer(realServiceName), nil
}

func newSpanExporter(ctx context.Context, otelExporterEndpoint string) (sdkTrace.SpanExporter, error) {
ctx, cancel := context.WithTimeout(ctx, spanExporterTimeout)
defer cancel()

conn, err := grpc.DialContext(ctx, otelExporterEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
if err != nil {
return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err)
}

traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn))
if err != nil {
return nil, fmt.Errorf("failed to create trace exporter: %w", err)
}

return traceExporter, nil
}

func newTraceProvider(ctx context.Context, spanExporter sdkTrace.SpanExporter, serviceName string) (*sdkTrace.TracerProvider, error) {
defaultResource := resource.Default()

mergedResource, err := resource.Merge(
defaultResource,
resource.NewWithAttributes(
defaultResource.SchemaURL(),
semconv.ServiceNameKey.String(serviceName),
),
)
if err != nil {
return nil, fmt.Errorf("failed to create otel resource: %w", err)
}

tp := sdkTrace.NewTracerProvider(
sdkTrace.WithBatcher(spanExporter),
sdkTrace.WithResource(mergedResource),
)

otel.SetTracerProvider(tp)

otel.SetTextMapPropagator(
propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
),
)

return tp, nil
}
33 changes: 22 additions & 11 deletions agent/workers/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import (

type PollerWorker struct {
client *client.Client
tracer trace.Tracer
sentSpanIDs *gocache.Cache[string, bool]
inmemoryDatastore tracedb.TraceDB
logger *zap.Logger
observer event.Observer
stoppableProcessRunner StoppableProcessRunner
tracer trace.Tracer
}

type PollerOption func(*PollerWorker)
Expand All @@ -39,7 +39,7 @@ func WithInMemoryDatastore(datastore tracedb.TraceDB) PollerOption {
}
}

func WithObserver(observer event.Observer) PollerOption {
func WithPollerObserver(observer event.Observer) PollerOption {
return func(pw *PollerWorker) {
pw.observer = observer
}
Expand All @@ -51,16 +51,25 @@ func WithPollerStoppableProcessRunner(stoppableProcessRunner StoppableProcessRun
}
}

func NewPollerWorker(client *client.Client, opts ...PollerOption) *PollerWorker {
// TODO: use a real tracer
tracer := trace.NewNoopTracerProvider().Tracer("noop")
func WithPollerLogger(logger *zap.Logger) PollerOption {
return func(pw *PollerWorker) {
pw.logger = logger
}
}

func WithPollerTracer(tracer trace.Tracer) PollerOption {
return func(pw *PollerWorker) {
pw.tracer = tracer
}
}

func NewPollerWorker(client *client.Client, opts ...PollerOption) *PollerWorker {
pollerWorker := &PollerWorker{
client: client,
tracer: tracer,
sentSpanIDs: gocache.New[string, bool](),
logger: zap.NewNop(),
observer: event.NewNopObserver(),
tracer: trace.NewNoopTracerProvider().Tracer("noop"),
}

for _, opt := range opts {
Expand All @@ -70,11 +79,10 @@ func NewPollerWorker(client *client.Client, opts ...PollerOption) *PollerWorker
return pollerWorker
}

func (w *PollerWorker) SetLogger(logger *zap.Logger) {
w.logger = logger
}

func (w *PollerWorker) Poll(ctx context.Context, request *proto.PollingRequest) error {
ctx, span := w.tracer.Start(ctx, "PollingRequest Worker operation")
defer span.End()

w.logger.Debug("Received polling request", zap.Any("request", request))
w.observer.StartTracePoll(request)

Expand Down Expand Up @@ -110,7 +118,10 @@ func (w *PollerWorker) Poll(ctx context.Context, request *proto.PollingRequest)
w.logger.Error("Error sending polling error", zap.Error(sendErr))
w.observer.Error(sendErr)

return fmt.Errorf("could not report polling error back to the server: %w. Original error: %s", sendErr, err.Error())
formattedErr := fmt.Errorf("could not report polling error back to the server: %w. Original error: %s", sendErr, err.Error())
span.RecordError(formattedErr)

return formattedErr
}
}

Expand Down