Skip to content

Commit

Permalink
add logging (#3368)
Browse files Browse the repository at this point in the history
* add logging

* add logging
  • Loading branch information
schoren committed Nov 17, 2023
1 parent faf1159 commit 7ab73c3
Show file tree
Hide file tree
Showing 14 changed files with 247 additions and 40 deletions.
3 changes: 3 additions & 0 deletions agent/client/client.go
Expand Up @@ -13,6 +13,7 @@ import (

retry "github.com/avast/retry-go"
"github.com/kubeshop/tracetest/agent/proto"
"go.uber.org/zap"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -41,6 +42,8 @@ type Client struct {
sessionConfig *SessionConfig
done chan bool

logger *zap.Logger

triggerListener func(context.Context, *proto.TriggerRequest) error
pollListener func(context.Context, *proto.PollingRequest) error
shutdownListener func(context.Context, *proto.ShutdownRequest) error
Expand Down
2 changes: 2 additions & 0 deletions agent/client/connector.go
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"time"

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -24,6 +25,7 @@ func Connect(ctx context.Context, endpoint string, opts ...Option) (*Client, err
pollListener: pollListener,
shutdownListener: shutdownListener,
dataStoreConnectionListener: dataStoreConnectionListener,
logger: zap.NewNop(),
}

for _, opt := range opts {
Expand Down
12 changes: 11 additions & 1 deletion agent/client/options.go
@@ -1,6 +1,10 @@
package client

import "time"
import (
"time"

"go.uber.org/zap"
)

type Option func(*Client)

Expand All @@ -21,3 +25,9 @@ func WithPingPeriod(period time.Duration) Option {
c.config.PingPeriod = period
}
}

func WithLogger(logger *zap.Logger) Option {
return func(c *Client) {
c.logger = logger
}
}
6 changes: 3 additions & 3 deletions agent/collector/cache.go
Expand Up @@ -9,7 +9,7 @@ import (

type TraceCache interface {
Get(string) ([]*v1.Span, bool)
Set(string, []*v1.Span)
Append(string, []*v1.Span)
}

type traceCache struct {
Expand All @@ -25,8 +25,8 @@ func (c *traceCache) Get(traceID string) ([]*v1.Span, bool) {
return c.internalCache.Get(traceID)
}

// Set implements TraceCache.
func (c *traceCache) Set(traceID string, spans []*v1.Span) {
// Append implements TraceCache.
func (c *traceCache) Append(traceID string, spans []*v1.Span) {
c.mutex.Lock()
defer c.mutex.Unlock()

Expand Down
15 changes: 13 additions & 2 deletions agent/collector/collector.go
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/kubeshop/tracetest/server/otlp"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

type Config struct {
Expand All @@ -34,6 +35,12 @@ func WithStartRemoteServer(startRemoteServer bool) CollectorOption {
}
}

func WithLogger(logger *zap.Logger) CollectorOption {
return func(ric *remoteIngesterConfig) {
ric.logger = logger
}
}

type collector struct {
grpcServer stoppable
httpServer stoppable
Expand All @@ -47,8 +54,9 @@ func (c *collector) Stop() {

func Start(ctx context.Context, config Config, tracer trace.Tracer, opts ...CollectorOption) (stoppable, error) {
ingesterConfig := remoteIngesterConfig{
URL: config.RemoteServerURL,
Token: config.RemoteServerToken,
URL: config.RemoteServerURL,
Token: config.RemoteServerToken,
logger: zap.NewNop(),
}

for _, opt := range opts {
Expand All @@ -61,9 +69,12 @@ func Start(ctx context.Context, config Config, tracer trace.Tracer, opts ...Coll
}

grpcServer := otlp.NewGrpcServer(fmt.Sprintf("0.0.0.0:%d", config.GRPCPort), ingester, tracer)
grpcServer.SetLogger(ingesterConfig.logger)
httpServer := otlp.NewHttpServer(fmt.Sprintf("0.0.0.0:%d", config.HTTPPort), ingester)
httpServer.SetLogger(ingesterConfig.logger)

onProcessTermination(func() {
ingesterConfig.logger.Debug("Stopping collector")
grpcServer.Stop()
httpServer.Stop()
if stoppableIngester, ok := ingester.(stoppable); ok {
Expand Down
2 changes: 1 addition & 1 deletion agent/collector/collector_test.go
Expand Up @@ -91,7 +91,7 @@ func TestCollectorWatchingSpansFromTest(t *testing.T) {

ctx := trace.ContextWithSpanContext(context.Background(), spanContext)

cache.Set(watchedTraceID.String(), []*v1.Span{})
cache.Append(watchedTraceID.String(), []*v1.Span{})

// 10 spans will be watched and stored in the cache
func(ctx context.Context) {
Expand Down
30 changes: 26 additions & 4 deletions agent/collector/ingester.go
Expand Up @@ -11,6 +11,7 @@ import (
"go.opencensus.io/trace"
pb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
Expand All @@ -19,13 +20,14 @@ type stoppable interface {
Stop()
}

func newForwardIngester(ctx context.Context, batchTimeout time.Duration, remoteIngesterConfig remoteIngesterConfig, startRemoteServer bool) (otlp.Ingester, error) {
func newForwardIngester(ctx context.Context, batchTimeout time.Duration, cfg remoteIngesterConfig, startRemoteServer bool) (otlp.Ingester, error) {
ingester := &forwardIngester{
BatchTimeout: batchTimeout,
RemoteIngester: remoteIngesterConfig,
RemoteIngester: cfg,
buffer: &buffer{},
done: make(chan bool),
traceCache: remoteIngesterConfig.traceCache,
traceCache: cfg.traceCache,
logger: cfg.logger,
}

if startRemoteServer {
Expand All @@ -49,13 +51,15 @@ type forwardIngester struct {
buffer *buffer
done chan bool
traceCache TraceCache
logger *zap.Logger
}

type remoteIngesterConfig struct {
URL string
Token string
traceCache TraceCache
startRemoteServer bool
logger *zap.Logger
}

type buffer struct {
Expand All @@ -67,8 +71,10 @@ func (i *forwardIngester) Ingest(ctx context.Context, request *pb.ExportTraceSer
i.buffer.mutex.Lock()
i.buffer.spans = append(i.buffer.spans, request.ResourceSpans...)
i.buffer.mutex.Unlock()
i.logger.Debug("received spans", zap.Int("count", len(request.ResourceSpans)))

if i.traceCache != nil {
i.logger.Debug("caching test spans")
// In case of OTLP datastore, those spans will be polled from this cache instead
// of a real datastore
i.cacheTestSpans(request.ResourceSpans)
Expand All @@ -84,6 +90,7 @@ func (i *forwardIngester) Ingest(ctx context.Context, request *pb.ExportTraceSer
func (i *forwardIngester) connectToRemoteServer(ctx context.Context) error {
conn, err := grpc.DialContext(ctx, i.RemoteIngester.URL, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
i.logger.Error("could not connect to remote server", zap.Error(err))
return fmt.Errorf("could not connect to remote server: %w", err)
}

Expand All @@ -92,15 +99,19 @@ func (i *forwardIngester) connectToRemoteServer(ctx context.Context) error {
}

func (i *forwardIngester) startBatchWorker() {
i.logger.Debug("starting batch worker", zap.Duration("batch_timeout", i.BatchTimeout))
ticker := time.NewTicker(i.BatchTimeout)
done := make(chan bool)
for {
select {
case <-done:
i.logger.Debug("stopping batch worker")
return
case <-ticker.C:
i.logger.Debug("executing batch")
err := i.executeBatch(context.Background())
if err != nil {
i.logger.Error("could not execute batch", zap.Error(err))
log.Println(err)
}
}
Expand All @@ -114,14 +125,18 @@ func (i *forwardIngester) executeBatch(ctx context.Context) error {
i.buffer.mutex.Unlock()

if len(newSpans) == 0 {
i.logger.Debug("no spans to forward")
return nil
}

err := i.forwardSpans(ctx, newSpans)
if err != nil {
i.logger.Error("could not forward spans", zap.Error(err))
return err
}

i.logger.Debug("successfully forwarded spans", zap.Int("count", len(newSpans)))

return nil
}

Expand All @@ -131,13 +146,15 @@ func (i *forwardIngester) forwardSpans(ctx context.Context, spans []*v1.Resource
})

if err != nil {
i.logger.Error("could not forward spans to remote server", zap.Error(err))
return fmt.Errorf("could not forward spans to remote server: %w", err)
}

return nil
}

func (i *forwardIngester) cacheTestSpans(resourceSpans []*v1.ResourceSpans) {
i.logger.Debug("caching test spans")
spans := make(map[string][]*v1.Span)
for _, resourceSpan := range resourceSpans {
for _, scopedSpan := range resourceSpan.ScopeSpans {
Expand All @@ -148,16 +165,21 @@ func (i *forwardIngester) cacheTestSpans(resourceSpans []*v1.ResourceSpans) {
}
}

i.logger.Debug("caching test spans", zap.Int("count", len(spans)))

for traceID, spans := range spans {
if _, ok := i.traceCache.Get(traceID); !ok {
i.logger.Debug("traceID is not part of a test", zap.String("traceID", traceID))
// traceID is not part of a test
continue
}

i.traceCache.Set(traceID, spans)
i.traceCache.Append(traceID, spans)
i.logger.Debug("caching test spans", zap.String("traceID", traceID), zap.Int("count", len(spans)))
}
}

func (i *forwardIngester) Stop() {
i.logger.Debug("stopping forward ingester")
i.done <- true
}
39 changes: 38 additions & 1 deletion agent/initialization/start.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"os"

"github.com/kubeshop/tracetest/agent/client"
"github.com/kubeshop/tracetest/agent/collector"
Expand All @@ -12,14 +13,26 @@ import (
"github.com/kubeshop/tracetest/agent/workers"
"github.com/kubeshop/tracetest/agent/workers/poller"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

var ErrOtlpServerStart = errors.New("OTLP server start error")

var logger *zap.Logger

func NewClient(ctx context.Context, config config.Config, traceCache collector.TraceCache) (*client.Client, error) {
if enableLogging() {
var err error
logger, err = zap.NewDevelopment()
if err != nil {
return nil, fmt.Errorf("could not create logger: %w", err)
}
}

controlPlaneClient, err := client.Connect(ctx, config.ServerURL,
client.WithAPIKey(config.APIKey),
client.WithAgentName(config.Name),
client.WithLogger(logger),
)
if err != nil {
return nil, err
Expand All @@ -31,6 +44,12 @@ func NewClient(ctx context.Context, config config.Config, traceCache collector.T
))
dataStoreTestConnectionWorker := workers.NewTestConnectionWorker(controlPlaneClient)

if enableLogging() {
triggerWorker.SetLogger(logger)
pollingWorker.SetLogger(logger)
dataStoreTestConnectionWorker.SetLogger(logger)
}

controlPlaneClient.OnDataStoreTestConnectionRequest(dataStoreTestConnectionWorker.Test)
controlPlaneClient.OnTriggerRequest(triggerWorker.Trigger)
controlPlaneClient.OnPollingRequest(pollingWorker.Poll)
Expand Down Expand Up @@ -73,10 +92,28 @@ func StartCollector(ctx context.Context, config config.Config, traceCache collec
GRPCPort: config.OTLPServer.GRPCPort,
}

_, err := collector.Start(ctx, collectorConfig, noopTracer, collector.WithTraceCache(traceCache), collector.WithStartRemoteServer(false))
opts := []collector.CollectorOption{
collector.WithTraceCache(traceCache),
collector.WithStartRemoteServer(false),
}

if enableLogging() {
opts = append(opts, collector.WithLogger(logger))
}

_, err := collector.Start(
ctx,
collectorConfig,
noopTracer,
opts...,
)
if err != nil {
return ErrOtlpServerStart
}

return nil
}

func enableLogging() bool {
return os.Getenv("TRACETEST_DEV") == "true"
}

0 comments on commit 7ab73c3

Please sign in to comment.