diff --git a/CHANGELOG.md b/CHANGELOG.md index 37161577e10..9c0eb929792 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add the new `go.opentelemetry.io/contrib/instrgen` package to provide auto-generated source code instrumentation. (#3068, #3108) +### Fixed + +- Fix `StreamClientInterceptor` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc` to end the spans synchronously. (#4537) + ## [1.21.0/0.46.0/0.15.0/0.1.0] - 2023-11-10 ### Added diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go b/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go index fa015e9ac88..72cfae1d4e9 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go @@ -125,27 +125,13 @@ func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor { } } -type streamEventType int - -type streamEvent struct { - Type streamEventType - Err error -} - -const ( - receiveEndEvent streamEventType = iota - errorEvent -) - // clientStream wraps around the embedded grpc.ClientStream, and intercepts the RecvMsg and // SendMsg method call. type clientStream struct { grpc.ClientStream + desc *grpc.StreamDesc - desc *grpc.StreamDesc - events chan streamEvent - eventsDone chan struct{} - finished chan error + span trace.Span receivedEvent bool sentEvent bool @@ -160,11 +146,11 @@ func (w *clientStream) RecvMsg(m interface{}) error { err := w.ClientStream.RecvMsg(m) if err == nil && !w.desc.ServerStreams { - w.sendStreamEvent(receiveEndEvent, nil) + w.endSpan(nil) } else if err == io.EOF { - w.sendStreamEvent(receiveEndEvent, nil) + w.endSpan(nil) } else if err != nil { - w.sendStreamEvent(errorEvent, err) + w.endSpan(err) } else { w.receivedMessageID++ @@ -186,7 +172,7 @@ func (w *clientStream) SendMsg(m interface{}) error { } if err != nil { - w.sendStreamEvent(errorEvent, err) + w.endSpan(err) } return err @@ -195,7 +181,7 @@ func (w *clientStream) SendMsg(m interface{}) error { func (w *clientStream) Header() (metadata.MD, error) { md, err := w.ClientStream.Header() if err != nil { - w.sendStreamEvent(errorEvent, err) + w.endSpan(err) } return md, err @@ -204,54 +190,32 @@ func (w *clientStream) Header() (metadata.MD, error) { func (w *clientStream) CloseSend() error { err := w.ClientStream.CloseSend() if err != nil { - w.sendStreamEvent(errorEvent, err) + w.endSpan(err) } return err } -func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc, cfg *config) *clientStream { - events := make(chan streamEvent) - eventsDone := make(chan struct{}) - finished := make(chan error) - - go func() { - defer close(eventsDone) - - for { - select { - case event := <-events: - switch event.Type { - case receiveEndEvent: - finished <- nil - return - case errorEvent: - finished <- event.Err - return - } - case <-ctx.Done(): - finished <- ctx.Err() - return - } - } - }() - +func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc, span trace.Span, cfg *config) *clientStream { return &clientStream{ ClientStream: s, + span: span, desc: desc, - events: events, - eventsDone: eventsDone, - finished: finished, receivedEvent: cfg.ReceivedEvent, sentEvent: cfg.SentEvent, } } -func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) { - select { - case <-w.eventsDone: - case w.events <- streamEvent{Type: eventType, Err: err}: +func (w *clientStream) endSpan(err error) { + if err != nil { + s, _ := status.FromError(err) + w.span.SetStatus(codes.Error, s.Message()) + w.span.SetAttributes(statusCodeAttr(s.Code())) + } else { + w.span.SetAttributes(statusCodeAttr(grpc_codes.OK)) } + + w.span.End() } // StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable @@ -306,22 +270,7 @@ func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor { span.End() return s, err } - stream := wrapClientStream(ctx, s, desc, cfg) - - go func() { - err := <-stream.finished - - if err != nil { - s, _ := status.FromError(err) - span.SetStatus(codes.Error, s.Message()) - span.SetAttributes(statusCodeAttr(s.Code())) - } else { - span.SetAttributes(statusCodeAttr(grpc_codes.OK)) - } - - span.End() - }() - + stream := wrapClientStream(ctx, s, desc, span, cfg) return stream, nil } } diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go index 6768dfcb4d6..e6fd212f904 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go @@ -49,8 +49,7 @@ func TestStatsHandler(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err, "failed to open port") - err = newGrpcTest( - listener, + client := newGrpcTest(t, listener, []grpc.DialOption{ grpc.WithStatsHandler(otelgrpc.NewClientHandler( otelgrpc.WithTracerProvider(clientTP), @@ -66,7 +65,7 @@ func TestStatsHandler(t *testing.T) { ), }, ) - require.NoError(t, err) + doCalls(client) t.Run("ClientSpans", func(t *testing.T) { checkClientSpans(t, clientSR.Ended()) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go index 1c8cc790a2f..faec8107016 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go @@ -19,7 +19,6 @@ import ( "net" "strconv" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -46,14 +45,18 @@ var wantInstrumentationScope = instrumentation.Scope{ Version: otelgrpc.Version(), } -// newGrpcTest creats a grpc server, starts it, and executes all the calls, closes everything down. -func newGrpcTest(listener net.Listener, cOpt []grpc.DialOption, sOpt []grpc.ServerOption) error { +// newGrpcTest creats a grpc server, starts it, and returns the client, closes everything down during test cleanup. +func newGrpcTest(t testing.TB, listener net.Listener, cOpt []grpc.DialOption, sOpt []grpc.ServerOption) pb.TestServiceClient { grpcServer := grpc.NewServer(sOpt...) pb.RegisterTestServiceServer(grpcServer, interop.NewTestServer()) errCh := make(chan error) go func() { errCh <- grpcServer.Serve(listener) }() + t.Cleanup(func() { + grpcServer.Stop() + assert.NoError(t, <-errCh) + }) ctx := context.Background() cOpt = append(cOpt, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -68,17 +71,12 @@ func newGrpcTest(listener net.Listener, cOpt []grpc.DialOption, sOpt []grpc.Serv listener.Addr().String(), cOpt..., ) - if err != nil { - return err - } - client := pb.NewTestServiceClient(conn) - - doCalls(client) - - conn.Close() - grpcServer.Stop() + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, conn.Close()) + }) - return <-errCh + return pb.NewTestServiceClient(conn) } func doCalls(client pb.TestServiceClient) { @@ -106,7 +104,7 @@ func TestInterceptors(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err, "failed to open port") - err = newGrpcTest(listener, + client := newGrpcTest(t, listener, []grpc.DialOption{ //nolint:staticcheck // Interceptors are deprecated and will be removed in the next release. grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor( @@ -133,19 +131,13 @@ func TestInterceptors(t *testing.T) { )), }, ) - require.NoError(t, err) + doCalls(client) t.Run("UnaryClientSpans", func(t *testing.T) { checkUnaryClientSpans(t, clientUnarySR.Ended(), listener.Addr().String()) }) t.Run("StreamClientSpans", func(t *testing.T) { - // StreamClientInterceptor ends the spans asynchronously. - // We need to wait for all spans before asserting them. - require.EventuallyWithT(t, func(c *assert.CollectT) { - assert.Len(c, clientStreamSR.Ended(), 3) - }, 5*time.Second, 100*time.Millisecond) - checkStreamClientSpans(t, clientStreamSR.Ended(), listener.Addr().String()) }) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go index 400f970dba0..e4714211caa 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go @@ -40,6 +40,7 @@ import ( "google.golang.org/grpc" grpc_codes "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/interop" "google.golang.org/grpc/interop/grpc_testing" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -1128,3 +1129,20 @@ func assertServerMetrics(t *testing.T, reader metric.Reader, serviceName, name s require.Len(t, rm.ScopeMetrics, 1) metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) } + +func BenchmarkStreamClientInterceptor(b *testing.B) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(b, err, "failed to open port") + client := newGrpcTest(b, listener, + []grpc.DialOption{ + //nolint:staticcheck // Interceptors are deprecated and will be removed in the next release. + grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()), + }, + []grpc.ServerOption{}, + ) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + interop.DoClientStreaming(client) + } +}