Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add logging #3368

Merged
merged 2 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

retry "github.com/avast/retry-go"
"github.com/kubeshop/tracetest/agent/proto"
"go.uber.org/zap"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -41,6 +42,8 @@ type Client struct {
sessionConfig *SessionConfig
done chan bool

logger *zap.Logger

triggerListener func(context.Context, *proto.TriggerRequest) error
pollListener func(context.Context, *proto.PollingRequest) error
shutdownListener func(context.Context, *proto.ShutdownRequest) error
Expand Down
2 changes: 2 additions & 0 deletions agent/client/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"time"

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -24,6 +25,7 @@ func Connect(ctx context.Context, endpoint string, opts ...Option) (*Client, err
pollListener: pollListener,
shutdownListener: shutdownListener,
dataStoreConnectionListener: dataStoreConnectionListener,
logger: zap.NewNop(),
}

for _, opt := range opts {
Expand Down
12 changes: 11 additions & 1 deletion agent/client/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package client

import "time"
import (
"time"

"go.uber.org/zap"
)

type Option func(*Client)

Expand All @@ -21,3 +25,9 @@ func WithPingPeriod(period time.Duration) Option {
c.config.PingPeriod = period
}
}

func WithLogger(logger *zap.Logger) Option {
return func(c *Client) {
c.logger = logger
}
}
6 changes: 3 additions & 3 deletions agent/collector/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

type TraceCache interface {
Get(string) ([]*v1.Span, bool)
Set(string, []*v1.Span)
Append(string, []*v1.Span)
}

type traceCache struct {
Expand All @@ -25,8 +25,8 @@ func (c *traceCache) Get(traceID string) ([]*v1.Span, bool) {
return c.internalCache.Get(traceID)
}

// Set implements TraceCache.
func (c *traceCache) Set(traceID string, spans []*v1.Span) {
// Append implements TraceCache.
func (c *traceCache) Append(traceID string, spans []*v1.Span) {
c.mutex.Lock()
defer c.mutex.Unlock()

Expand Down
15 changes: 13 additions & 2 deletions agent/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/kubeshop/tracetest/server/otlp"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

type Config struct {
Expand All @@ -34,6 +35,12 @@ func WithStartRemoteServer(startRemoteServer bool) CollectorOption {
}
}

func WithLogger(logger *zap.Logger) CollectorOption {
return func(ric *remoteIngesterConfig) {
ric.logger = logger
}
}

type collector struct {
grpcServer stoppable
httpServer stoppable
Expand All @@ -47,8 +54,9 @@ func (c *collector) Stop() {

func Start(ctx context.Context, config Config, tracer trace.Tracer, opts ...CollectorOption) (stoppable, error) {
ingesterConfig := remoteIngesterConfig{
URL: config.RemoteServerURL,
Token: config.RemoteServerToken,
URL: config.RemoteServerURL,
Token: config.RemoteServerToken,
logger: zap.NewNop(),
}

for _, opt := range opts {
Expand All @@ -61,9 +69,12 @@ func Start(ctx context.Context, config Config, tracer trace.Tracer, opts ...Coll
}

grpcServer := otlp.NewGrpcServer(fmt.Sprintf("0.0.0.0:%d", config.GRPCPort), ingester, tracer)
grpcServer.SetLogger(ingesterConfig.logger)
httpServer := otlp.NewHttpServer(fmt.Sprintf("0.0.0.0:%d", config.HTTPPort), ingester)
httpServer.SetLogger(ingesterConfig.logger)

onProcessTermination(func() {
ingesterConfig.logger.Debug("Stopping collector")
grpcServer.Stop()
httpServer.Stop()
if stoppableIngester, ok := ingester.(stoppable); ok {
Expand Down
2 changes: 1 addition & 1 deletion agent/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestCollectorWatchingSpansFromTest(t *testing.T) {

ctx := trace.ContextWithSpanContext(context.Background(), spanContext)

cache.Set(watchedTraceID.String(), []*v1.Span{})
cache.Append(watchedTraceID.String(), []*v1.Span{})

// 10 spans will be watched and stored in the cache
func(ctx context.Context) {
Expand Down
30 changes: 26 additions & 4 deletions agent/collector/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.opencensus.io/trace"
pb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
Expand All @@ -19,13 +20,14 @@ type stoppable interface {
Stop()
}

func newForwardIngester(ctx context.Context, batchTimeout time.Duration, remoteIngesterConfig remoteIngesterConfig, startRemoteServer bool) (otlp.Ingester, error) {
func newForwardIngester(ctx context.Context, batchTimeout time.Duration, cfg remoteIngesterConfig, startRemoteServer bool) (otlp.Ingester, error) {
ingester := &forwardIngester{
BatchTimeout: batchTimeout,
RemoteIngester: remoteIngesterConfig,
RemoteIngester: cfg,
buffer: &buffer{},
done: make(chan bool),
traceCache: remoteIngesterConfig.traceCache,
traceCache: cfg.traceCache,
logger: cfg.logger,
}

if startRemoteServer {
Expand All @@ -49,13 +51,15 @@ type forwardIngester struct {
buffer *buffer
done chan bool
traceCache TraceCache
logger *zap.Logger
}

type remoteIngesterConfig struct {
URL string
Token string
traceCache TraceCache
startRemoteServer bool
logger *zap.Logger
}

type buffer struct {
Expand All @@ -67,8 +71,10 @@ func (i *forwardIngester) Ingest(ctx context.Context, request *pb.ExportTraceSer
i.buffer.mutex.Lock()
i.buffer.spans = append(i.buffer.spans, request.ResourceSpans...)
i.buffer.mutex.Unlock()
i.logger.Debug("received spans", zap.Int("count", len(request.ResourceSpans)))

if i.traceCache != nil {
i.logger.Debug("caching test spans")
// In case of OTLP datastore, those spans will be polled from this cache instead
// of a real datastore
i.cacheTestSpans(request.ResourceSpans)
Expand All @@ -84,6 +90,7 @@ func (i *forwardIngester) Ingest(ctx context.Context, request *pb.ExportTraceSer
func (i *forwardIngester) connectToRemoteServer(ctx context.Context) error {
conn, err := grpc.DialContext(ctx, i.RemoteIngester.URL, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
i.logger.Error("could not connect to remote server", zap.Error(err))
return fmt.Errorf("could not connect to remote server: %w", err)
}

Expand All @@ -92,15 +99,19 @@ func (i *forwardIngester) connectToRemoteServer(ctx context.Context) error {
}

func (i *forwardIngester) startBatchWorker() {
i.logger.Debug("starting batch worker", zap.Duration("batch_timeout", i.BatchTimeout))
ticker := time.NewTicker(i.BatchTimeout)
done := make(chan bool)
for {
select {
case <-done:
i.logger.Debug("stopping batch worker")
return
case <-ticker.C:
i.logger.Debug("executing batch")
err := i.executeBatch(context.Background())
if err != nil {
i.logger.Error("could not execute batch", zap.Error(err))
log.Println(err)
}
}
Expand All @@ -114,14 +125,18 @@ func (i *forwardIngester) executeBatch(ctx context.Context) error {
i.buffer.mutex.Unlock()

if len(newSpans) == 0 {
i.logger.Debug("no spans to forward")
return nil
}

err := i.forwardSpans(ctx, newSpans)
if err != nil {
i.logger.Error("could not forward spans", zap.Error(err))
return err
}

i.logger.Debug("successfully forwarded spans", zap.Int("count", len(newSpans)))

return nil
}

Expand All @@ -131,13 +146,15 @@ func (i *forwardIngester) forwardSpans(ctx context.Context, spans []*v1.Resource
})

if err != nil {
i.logger.Error("could not forward spans to remote server", zap.Error(err))
return fmt.Errorf("could not forward spans to remote server: %w", err)
}

return nil
}

func (i *forwardIngester) cacheTestSpans(resourceSpans []*v1.ResourceSpans) {
i.logger.Debug("caching test spans")
spans := make(map[string][]*v1.Span)
for _, resourceSpan := range resourceSpans {
for _, scopedSpan := range resourceSpan.ScopeSpans {
Expand All @@ -148,16 +165,21 @@ func (i *forwardIngester) cacheTestSpans(resourceSpans []*v1.ResourceSpans) {
}
}

i.logger.Debug("caching test spans", zap.Int("count", len(spans)))

for traceID, spans := range spans {
if _, ok := i.traceCache.Get(traceID); !ok {
i.logger.Debug("traceID is not part of a test", zap.String("traceID", traceID))
// traceID is not part of a test
continue
}

i.traceCache.Set(traceID, spans)
i.traceCache.Append(traceID, spans)
i.logger.Debug("caching test spans", zap.String("traceID", traceID), zap.Int("count", len(spans)))
}
}

func (i *forwardIngester) Stop() {
i.logger.Debug("stopping forward ingester")
i.done <- true
}
39 changes: 38 additions & 1 deletion agent/initialization/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"os"

"github.com/kubeshop/tracetest/agent/client"
"github.com/kubeshop/tracetest/agent/collector"
Expand All @@ -12,14 +13,26 @@ import (
"github.com/kubeshop/tracetest/agent/workers"
"github.com/kubeshop/tracetest/agent/workers/poller"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

var ErrOtlpServerStart = errors.New("OTLP server start error")

var logger *zap.Logger

func NewClient(ctx context.Context, config config.Config, traceCache collector.TraceCache) (*client.Client, error) {
if enableLogging() {
var err error
logger, err = zap.NewDevelopment()
if err != nil {
return nil, fmt.Errorf("could not create logger: %w", err)
}
}

controlPlaneClient, err := client.Connect(ctx, config.ServerURL,
client.WithAPIKey(config.APIKey),
client.WithAgentName(config.Name),
client.WithLogger(logger),
)
if err != nil {
return nil, err
Expand All @@ -31,6 +44,12 @@ func NewClient(ctx context.Context, config config.Config, traceCache collector.T
))
dataStoreTestConnectionWorker := workers.NewTestConnectionWorker(controlPlaneClient)

if enableLogging() {
triggerWorker.SetLogger(logger)
pollingWorker.SetLogger(logger)
dataStoreTestConnectionWorker.SetLogger(logger)
}

controlPlaneClient.OnDataStoreTestConnectionRequest(dataStoreTestConnectionWorker.Test)
controlPlaneClient.OnTriggerRequest(triggerWorker.Trigger)
controlPlaneClient.OnPollingRequest(pollingWorker.Poll)
Expand Down Expand Up @@ -73,10 +92,28 @@ func StartCollector(ctx context.Context, config config.Config, traceCache collec
GRPCPort: config.OTLPServer.GRPCPort,
}

_, err := collector.Start(ctx, collectorConfig, noopTracer, collector.WithTraceCache(traceCache), collector.WithStartRemoteServer(false))
opts := []collector.CollectorOption{
collector.WithTraceCache(traceCache),
collector.WithStartRemoteServer(false),
}

if enableLogging() {
opts = append(opts, collector.WithLogger(logger))
}

_, err := collector.Start(
ctx,
collectorConfig,
noopTracer,
opts...,
)
if err != nil {
return ErrOtlpServerStart
}

return nil
}

func enableLogging() bool {
return os.Getenv("TRACETEST_DEV") == "true"
}