Skip to content

Commit

Permalink
fix(otel): fix tracing initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
bradub committed Jun 14, 2024
1 parent b866677 commit 7021efb
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 64 deletions.
4 changes: 4 additions & 0 deletions otel/Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
.PHONY: test

test:
go test -race ./...
lint:
golangci-lint run --fix -c=../.golangci.yml
start-collector:
docker-compose -f ./test/docker-compose.yaml up
validate-config:
docker run --volume=./test/collector-config.yaml:/etc/otel/config.yaml otel/opentelemetry-collector-contrib validate --config=/etc/otel/config.yaml
create-sample-traces:
Expand Down
61 changes: 30 additions & 31 deletions otel/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,8 @@ var _ trace.TracerProvider = (*TracerProvider)(nil)
type TracerProvider struct {
embedded.TracerProvider

conn *grpc.ClientConn
tp *sdktrace.TracerProvider
}

// Tracer returns a named tracer.
func (t *TracerProvider) Tracer(name string, options ...trace.TracerOption) trace.Tracer {
return t.tp.Tracer(name, options...)
}

// Close closes the TracerProvider and the GRPC Conn.
func (t *TracerProvider) Close() error {
var errs error

const timeout = 10 * time.Second

timeoutCtx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

if err := t.tp.Shutdown(timeoutCtx); err != nil {
errs = errors.Join(errs, fmt.Errorf("shutdown: %w", err))
}

if err := t.conn.Close(); err != nil {
errs = errors.Join(errs, fmt.Errorf("close connection: %w", err))
}

return errs
conn *grpc.ClientConn
tracerProvider *sdktrace.TracerProvider
}

// Init initializes the OpenTelemetry SDK with the OTLP exporter.
Expand Down Expand Up @@ -87,11 +62,36 @@ func Init(
)

return &TracerProvider{
tp: tracerProvider,
conn: oltpCollectorConn,
tracerProvider: tracerProvider,
conn: oltpCollectorConn,
}, nil
}

// Tracer returns a named tracer.
func (t *TracerProvider) Tracer(name string, options ...trace.TracerOption) trace.Tracer {
return t.tracerProvider.Tracer(name, options...)
}

// Close closes the TracerProvider and the GRPC Conn.
func (t *TracerProvider) Close() error {
var errs error

const timeout = 10 * time.Second

timeoutCtx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

if err := t.tracerProvider.Shutdown(timeoutCtx); err != nil {
errs = errors.Join(errs, fmt.Errorf("shutdown tracer provider: %w", err))
}

if err := t.conn.Close(); err != nil {
errs = errors.Join(errs, fmt.Errorf("close connection: %w", err))
}

return errs
}

func newOTLPCollectorConn(
otlpCollectorEndpoint string,
) (*grpc.ClientConn, error) {
Expand Down Expand Up @@ -121,7 +121,6 @@ func newTracerProvider(
eg.Go(func() error {
e, err := otlptracegrpc.New(
egCtx,
otlptracegrpc.WithInsecure(),
otlptracegrpc.WithGRPCConn(otlpCollectorConn),
)
if err != nil {
Expand Down Expand Up @@ -164,8 +163,8 @@ func newTracerProvider(
sdktrace.TraceIDRatioBased(alwaysSampleRatio),
),
),
sdktrace.WithSpanProcessor(sdktrace.NewBatchSpanProcessor(exporter)),
sdktrace.WithResource(res),
sdktrace.WithSpanProcessor(sdktrace.NewBatchSpanProcessor(exporter)),
)

return traceProvider, nil
Expand Down
150 changes: 117 additions & 33 deletions otel/test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,70 @@ import (
"context"
"net"
"testing"
"time"

commonsgrpc "github.com/purposeinplay/go-commons/grpc"
"github.com/purposeinplay/go-commons/grpc/grpcclient"
"github.com/purposeinplay/go-commons/grpc/test_data/greetpb"
"github.com/purposeinplay/go-commons/otel"
"github.com/stretchr/testify/require"
ootel "go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
)

func TestTracer(t *testing.T) {
req := require.New(t)

ctx := context.Background()

conn, err := grpc.NewClient(
"localhost:4317",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
req.NoError(err)

exp, err := otlptracegrpc.New(
ctx,
otlptracegrpc.WithGRPCConn(conn),
)
req.NoError(err)

t.Cleanup(func() {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err := exp.Shutdown(shutdownCtx); err != nil {
t.Logf("shutdown exporter: %s", err)
}
})

ssp := sdktrace.NewBatchSpanProcessor(
exp,
)

tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithResource(resource.NewWithAttributes(semconv.SchemaURL)),
sdktrace.WithSpanProcessor(ssp),
)

ootel.SetTracerProvider(tracerProvider)

tracer := ootel.Tracer("test")

_, sp := tracer.Start(ctx, "test")

sp.SetAttributes(attribute.String("test", "test"))
sp.End()
}

func TestIntegration(t *testing.T) {
req := require.New(t)

Expand All @@ -33,52 +84,99 @@ func TestIntegration(t *testing.T) {

const bufSize = 1024 * 1024

lis := bufconn.Listen(bufSize)
bufDialer := func(context.Context, string) (net.Conn, error) {
return lis.Dial()
lis1 := bufconn.Listen(bufSize)
bufDialer1 := func(context.Context, string) (net.Conn, error) {
return lis1.Dial()
}

lis2 := bufconn.Listen(bufSize)
bufDialer2 := func(context.Context, string) (net.Conn, error) {
return lis2.Dial()
}

grpcServer, err := commonsgrpc.NewServer(
commonsgrpc.WithGRPCListener(lis),
grpcServer1, err := commonsgrpc.NewServer(
commonsgrpc.WithGRPCListener(lis1),
commonsgrpc.WithDebug(zap.NewExample(), true),
commonsgrpc.WithOTEL(),
commonsgrpc.WithRegisterServerFunc(func(server *grpc.Server) {
greetpb.RegisterGreetServiceServer(server, &greeterService{})
}),
)
req.NoError(err)

go func() {
if err := grpcServer1.ListenAndServe(); err != nil {
t.Logf("listen and serve error: %s", err)
}
}()

t.Cleanup(func() {
if err := grpcServer1.Close(); err != nil {
t.Logf("failed to close grpc server1: %v", err)
}
})

conn1, err := grpcclient.NewConn(
"bufnet",
grpcclient.WithContextDialer(bufDialer1),
grpcclient.WithNoTLS(),
grpcclient.WithOTEL(),
)
req.NoError(err)

t.Cleanup(func() {
if err := conn1.Close(); err != nil {
t.Logf("close client conn1: %s", err)
}
})

greeterClient1 := greetpb.NewGreetServiceClient(conn1)

grpcServer2, err := commonsgrpc.NewServer(
commonsgrpc.WithGRPCListener(lis2),
commonsgrpc.WithDebug(zap.NewExample(), true),
commonsgrpc.WithOTEL(),
commonsgrpc.WithRegisterServerFunc(func(server *grpc.Server) {
greetpb.RegisterGreetServiceServer(server, &greeterService{
greetFunc: func() error { return nil },
greetFunc: func(req *greetpb.GreetRequest) (*greetpb.GreetResponse, error) {
t.Log("greet func server 2")

return greeterClient1.Greet(ctx, req)
},
})
}),
)
req.NoError(err)

go func() {
if err := grpcServer.ListenAndServe(); err != nil {
if err := grpcServer2.ListenAndServe(); err != nil {
t.Logf("listen and serve error: %s", err)
}
}()

t.Cleanup(func() {
if err := grpcServer.Close(); err != nil {
t.Logf("failed to close grpc server: %v", err)
if err := grpcServer2.Close(); err != nil {
t.Logf("failed to close grpc server1: %v", err)
}
})

conn, err := grpcclient.NewConn(
conn2, err := grpcclient.NewConn(
"bufnet",
grpcclient.WithContextDialer(bufDialer),
grpcclient.WithContextDialer(bufDialer2),
grpcclient.WithNoTLS(),
grpcclient.WithOTEL(),
)
req.NoError(err)

t.Cleanup(func() {
if err := conn.Close(); err != nil {
t.Logf("close client conn: %s", err)
if err := conn2.Close(); err != nil {
t.Logf("close client conn1: %s", err)
}
})

greeterClient := greetpb.NewGreetServiceClient(conn)
greeterClient2 := greetpb.NewGreetServiceClient(conn2)

resp, err := greeterClient.Greet(ctx, &greetpb.GreetRequest{
resp, err := greeterClient2.Greet(ctx, &greetpb.GreetRequest{
Greeting: &greetpb.Greeting{
FirstName: "test",
LastName: "otel",
Expand All @@ -93,33 +191,19 @@ var _ greetpb.GreetServiceServer = (*greeterService)(nil)

type greeterService struct {
greetpb.UnimplementedGreetServiceServer
greetFunc func() error
greetFunc func(*greetpb.GreetRequest) (*greetpb.GreetResponse, error)
}

func (s *greeterService) Greet(
ctx context.Context,
_ context.Context,
req *greetpb.GreetRequest,
) (*greetpb.GreetResponse, error) {
tracer := ootel.Tracer("test")

spanCtx, span := tracer.Start(ctx, "greet")
defer span.End()

if s.greetFunc != nil {
err := s.greetFunc()
if err != nil {
return nil, err
}
return s.greetFunc(req)
}

res := req.Greeting.FirstName + req.Greeting.LastName

if md, ok := metadata.FromIncomingContext(spanCtx); ok {
if len(md["custom"]) > 0 {
res += md["custom"][0]
}
}

return &greetpb.GreetResponse{
Result: res,
}, nil
Expand Down

0 comments on commit 7021efb

Please sign in to comment.