From 7021efb4e9e4d6bc309a551a07bb662c617680f6 Mon Sep 17 00:00:00 2001 From: bradub Date: Fri, 14 Jun 2024 14:09:12 +0300 Subject: [PATCH] fix(otel): fix tracing initialization --- otel/Makefile | 4 + otel/otel.go | 61 +++++++------- otel/test/integration_test.go | 150 ++++++++++++++++++++++++++-------- 3 files changed, 151 insertions(+), 64 deletions(-) diff --git a/otel/Makefile b/otel/Makefile index 6f5c6ec..adb4f05 100644 --- a/otel/Makefile +++ b/otel/Makefile @@ -1,7 +1,11 @@ +.PHONY: test + test: go test -race ./... lint: golangci-lint run --fix -c=../.golangci.yml +start-collector: + docker-compose -f ./test/docker-compose.yaml up validate-config: docker run --volume=./test/collector-config.yaml:/etc/otel/config.yaml otel/opentelemetry-collector-contrib validate --config=/etc/otel/config.yaml create-sample-traces: diff --git a/otel/otel.go b/otel/otel.go index 07a7ad6..76ca8fc 100644 --- a/otel/otel.go +++ b/otel/otel.go @@ -27,33 +27,8 @@ var _ trace.TracerProvider = (*TracerProvider)(nil) type TracerProvider struct { embedded.TracerProvider - conn *grpc.ClientConn - tp *sdktrace.TracerProvider -} - -// Tracer returns a named tracer. -func (t *TracerProvider) Tracer(name string, options ...trace.TracerOption) trace.Tracer { - return t.tp.Tracer(name, options...) -} - -// Close closes the TracerProvider and the GRPC Conn. -func (t *TracerProvider) Close() error { - var errs error - - const timeout = 10 * time.Second - - timeoutCtx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - if err := t.tp.Shutdown(timeoutCtx); err != nil { - errs = errors.Join(errs, fmt.Errorf("shutdown: %w", err)) - } - - if err := t.conn.Close(); err != nil { - errs = errors.Join(errs, fmt.Errorf("close connection: %w", err)) - } - - return errs + conn *grpc.ClientConn + tracerProvider *sdktrace.TracerProvider } // Init initializes the OpenTelemetry SDK with the OTLP exporter. @@ -87,11 +62,36 @@ func Init( ) return &TracerProvider{ - tp: tracerProvider, - conn: oltpCollectorConn, + tracerProvider: tracerProvider, + conn: oltpCollectorConn, }, nil } +// Tracer returns a named tracer. +func (t *TracerProvider) Tracer(name string, options ...trace.TracerOption) trace.Tracer { + return t.tracerProvider.Tracer(name, options...) +} + +// Close closes the TracerProvider and the GRPC Conn. +func (t *TracerProvider) Close() error { + var errs error + + const timeout = 10 * time.Second + + timeoutCtx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + if err := t.tracerProvider.Shutdown(timeoutCtx); err != nil { + errs = errors.Join(errs, fmt.Errorf("shutdown tracer provider: %w", err)) + } + + if err := t.conn.Close(); err != nil { + errs = errors.Join(errs, fmt.Errorf("close connection: %w", err)) + } + + return errs +} + func newOTLPCollectorConn( otlpCollectorEndpoint string, ) (*grpc.ClientConn, error) { @@ -121,7 +121,6 @@ func newTracerProvider( eg.Go(func() error { e, err := otlptracegrpc.New( egCtx, - otlptracegrpc.WithInsecure(), otlptracegrpc.WithGRPCConn(otlpCollectorConn), ) if err != nil { @@ -164,8 +163,8 @@ func newTracerProvider( sdktrace.TraceIDRatioBased(alwaysSampleRatio), ), ), - sdktrace.WithSpanProcessor(sdktrace.NewBatchSpanProcessor(exporter)), sdktrace.WithResource(res), + sdktrace.WithSpanProcessor(sdktrace.NewBatchSpanProcessor(exporter)), ) return traceProvider, nil diff --git a/otel/test/integration_test.go b/otel/test/integration_test.go index f6a9019..a010601 100644 --- a/otel/test/integration_test.go +++ b/otel/test/integration_test.go @@ -4,6 +4,7 @@ import ( "context" "net" "testing" + "time" commonsgrpc "github.com/purposeinplay/go-commons/grpc" "github.com/purposeinplay/go-commons/grpc/grpcclient" @@ -11,12 +12,62 @@ import ( "github.com/purposeinplay/go-commons/otel" "github.com/stretchr/testify/require" ootel "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "go.uber.org/zap" "google.golang.org/grpc" - "google.golang.org/grpc/metadata" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/test/bufconn" ) +func TestTracer(t *testing.T) { + req := require.New(t) + + ctx := context.Background() + + conn, err := grpc.NewClient( + "localhost:4317", + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + req.NoError(err) + + exp, err := otlptracegrpc.New( + ctx, + otlptracegrpc.WithGRPCConn(conn), + ) + req.NoError(err) + + t.Cleanup(func() { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := exp.Shutdown(shutdownCtx); err != nil { + t.Logf("shutdown exporter: %s", err) + } + }) + + ssp := sdktrace.NewBatchSpanProcessor( + exp, + ) + + tracerProvider := sdktrace.NewTracerProvider( + sdktrace.WithResource(resource.NewWithAttributes(semconv.SchemaURL)), + sdktrace.WithSpanProcessor(ssp), + ) + + ootel.SetTracerProvider(tracerProvider) + + tracer := ootel.Tracer("test") + + _, sp := tracer.Start(ctx, "test") + + sp.SetAttributes(attribute.String("test", "test")) + sp.End() +} + func TestIntegration(t *testing.T) { req := require.New(t) @@ -33,52 +84,99 @@ func TestIntegration(t *testing.T) { const bufSize = 1024 * 1024 - lis := bufconn.Listen(bufSize) - bufDialer := func(context.Context, string) (net.Conn, error) { - return lis.Dial() + lis1 := bufconn.Listen(bufSize) + bufDialer1 := func(context.Context, string) (net.Conn, error) { + return lis1.Dial() + } + + lis2 := bufconn.Listen(bufSize) + bufDialer2 := func(context.Context, string) (net.Conn, error) { + return lis2.Dial() } - grpcServer, err := commonsgrpc.NewServer( - commonsgrpc.WithGRPCListener(lis), + grpcServer1, err := commonsgrpc.NewServer( + commonsgrpc.WithGRPCListener(lis1), + commonsgrpc.WithDebug(zap.NewExample(), true), + commonsgrpc.WithOTEL(), + commonsgrpc.WithRegisterServerFunc(func(server *grpc.Server) { + greetpb.RegisterGreetServiceServer(server, &greeterService{}) + }), + ) + req.NoError(err) + + go func() { + if err := grpcServer1.ListenAndServe(); err != nil { + t.Logf("listen and serve error: %s", err) + } + }() + + t.Cleanup(func() { + if err := grpcServer1.Close(); err != nil { + t.Logf("failed to close grpc server1: %v", err) + } + }) + + conn1, err := grpcclient.NewConn( + "bufnet", + grpcclient.WithContextDialer(bufDialer1), + grpcclient.WithNoTLS(), + grpcclient.WithOTEL(), + ) + req.NoError(err) + + t.Cleanup(func() { + if err := conn1.Close(); err != nil { + t.Logf("close client conn1: %s", err) + } + }) + + greeterClient1 := greetpb.NewGreetServiceClient(conn1) + + grpcServer2, err := commonsgrpc.NewServer( + commonsgrpc.WithGRPCListener(lis2), commonsgrpc.WithDebug(zap.NewExample(), true), commonsgrpc.WithOTEL(), commonsgrpc.WithRegisterServerFunc(func(server *grpc.Server) { greetpb.RegisterGreetServiceServer(server, &greeterService{ - greetFunc: func() error { return nil }, + greetFunc: func(req *greetpb.GreetRequest) (*greetpb.GreetResponse, error) { + t.Log("greet func server 2") + + return greeterClient1.Greet(ctx, req) + }, }) }), ) req.NoError(err) go func() { - if err := grpcServer.ListenAndServe(); err != nil { + if err := grpcServer2.ListenAndServe(); err != nil { t.Logf("listen and serve error: %s", err) } }() t.Cleanup(func() { - if err := grpcServer.Close(); err != nil { - t.Logf("failed to close grpc server: %v", err) + if err := grpcServer2.Close(); err != nil { + t.Logf("failed to close grpc server1: %v", err) } }) - conn, err := grpcclient.NewConn( + conn2, err := grpcclient.NewConn( "bufnet", - grpcclient.WithContextDialer(bufDialer), + grpcclient.WithContextDialer(bufDialer2), grpcclient.WithNoTLS(), grpcclient.WithOTEL(), ) req.NoError(err) t.Cleanup(func() { - if err := conn.Close(); err != nil { - t.Logf("close client conn: %s", err) + if err := conn2.Close(); err != nil { + t.Logf("close client conn1: %s", err) } }) - greeterClient := greetpb.NewGreetServiceClient(conn) + greeterClient2 := greetpb.NewGreetServiceClient(conn2) - resp, err := greeterClient.Greet(ctx, &greetpb.GreetRequest{ + resp, err := greeterClient2.Greet(ctx, &greetpb.GreetRequest{ Greeting: &greetpb.Greeting{ FirstName: "test", LastName: "otel", @@ -93,33 +191,19 @@ var _ greetpb.GreetServiceServer = (*greeterService)(nil) type greeterService struct { greetpb.UnimplementedGreetServiceServer - greetFunc func() error + greetFunc func(*greetpb.GreetRequest) (*greetpb.GreetResponse, error) } func (s *greeterService) Greet( - ctx context.Context, + _ context.Context, req *greetpb.GreetRequest, ) (*greetpb.GreetResponse, error) { - tracer := ootel.Tracer("test") - - spanCtx, span := tracer.Start(ctx, "greet") - defer span.End() - if s.greetFunc != nil { - err := s.greetFunc() - if err != nil { - return nil, err - } + return s.greetFunc(req) } res := req.Greeting.FirstName + req.Greeting.LastName - if md, ok := metadata.FromIncomingContext(spanCtx); ok { - if len(md["custom"]) > 0 { - res += md["custom"][0] - } - } - return &greetpb.GreetResponse{ Result: res, }, nil