Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
pellared committed Nov 16, 2023
2 parents 445c84d + d9e86fb commit 3393623
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 102 deletions.
7 changes: 3 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Fixed

- Fix `StreamClientInterceptor` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc` to end the spans synchronously. (#4537)
- Fix data race in stats handlers when processing messages received and sent metrics in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#4577)
- The stats handlers `NewClientHandler`, `NewServerHandler` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc` now records RPC durations in `ms` instead of `ns`. (#4548)

## [1.21.0/0.46.0/0.15.0/0.1.0] - 2023-11-10
Expand Down Expand Up @@ -45,6 +47,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The `go.opentelemetry.io/contrib/samplers/jaegerremote` sampler does not panic when the default HTTP round-tripper (`http.DefaultTransport`) is not `*http.Transport`. (#4045)
- The `UnaryServerInterceptor` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc` now sets gRPC status code correctly for the `rpc.server.duration` metric. (#4481)
- The `NewClientHandler`, `NewServerHandler` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc` now honor `otelgrpc.WithMessageEvents` options. (#4536)
- The `net.sock.peer.*` and `net.peer.*` high cardinality attributes are removed from the metrics generated by `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#4322)

## [1.20.0/0.45.0/0.14.0] - 2023-09-28

Expand All @@ -59,10 +62,6 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Upgrade dependencies of OpenTelemetry Go to use the new [`v1.19.0`/`v0.42.0`/`v0.0.7` release](https://github.com/open-telemetry/opentelemetry-go/releases/tag/v1.19.0).
- Use `grpc.StatsHandler` for gRPC instrumentation in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/example`. (#4325)

### Removed

- The `net.sock.peer.*` and `net.peer.*` high cardinality attributes are removed from the metrics generated by `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#4322)

## [1.19.0/0.44.0/0.13.0] - 2023-09-12

### Added
Expand Down
91 changes: 20 additions & 71 deletions instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,27 +125,13 @@ func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
}
}

type streamEventType int

type streamEvent struct {
Type streamEventType
Err error
}

const (
receiveEndEvent streamEventType = iota
errorEvent
)

// clientStream wraps around the embedded grpc.ClientStream, and intercepts the RecvMsg and
// SendMsg method call.
type clientStream struct {
grpc.ClientStream
desc *grpc.StreamDesc

desc *grpc.StreamDesc
events chan streamEvent
eventsDone chan struct{}
finished chan error
span trace.Span

receivedEvent bool
sentEvent bool
Expand All @@ -160,11 +146,11 @@ func (w *clientStream) RecvMsg(m interface{}) error {
err := w.ClientStream.RecvMsg(m)

if err == nil && !w.desc.ServerStreams {
w.sendStreamEvent(receiveEndEvent, nil)
w.endSpan(nil)
} else if err == io.EOF {
w.sendStreamEvent(receiveEndEvent, nil)
w.endSpan(nil)
} else if err != nil {
w.sendStreamEvent(errorEvent, err)
w.endSpan(err)
} else {
w.receivedMessageID++

Expand All @@ -186,7 +172,7 @@ func (w *clientStream) SendMsg(m interface{}) error {
}

if err != nil {
w.sendStreamEvent(errorEvent, err)
w.endSpan(err)
}

return err
Expand All @@ -195,7 +181,7 @@ func (w *clientStream) SendMsg(m interface{}) error {
func (w *clientStream) Header() (metadata.MD, error) {
md, err := w.ClientStream.Header()
if err != nil {
w.sendStreamEvent(errorEvent, err)
w.endSpan(err)
}

return md, err
Expand All @@ -204,54 +190,32 @@ func (w *clientStream) Header() (metadata.MD, error) {
func (w *clientStream) CloseSend() error {
err := w.ClientStream.CloseSend()
if err != nil {
w.sendStreamEvent(errorEvent, err)
w.endSpan(err)
}

return err
}

func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc, cfg *config) *clientStream {
events := make(chan streamEvent)
eventsDone := make(chan struct{})
finished := make(chan error)

go func() {
defer close(eventsDone)

for {
select {
case event := <-events:
switch event.Type {
case receiveEndEvent:
finished <- nil
return
case errorEvent:
finished <- event.Err
return
}
case <-ctx.Done():
finished <- ctx.Err()
return
}
}
}()

func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc, span trace.Span, cfg *config) *clientStream {
return &clientStream{
ClientStream: s,
span: span,
desc: desc,
events: events,
eventsDone: eventsDone,
finished: finished,
receivedEvent: cfg.ReceivedEvent,
sentEvent: cfg.SentEvent,
}
}

func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
select {
case <-w.eventsDone:
case w.events <- streamEvent{Type: eventType, Err: err}:
func (w *clientStream) endSpan(err error) {
if err != nil {
s, _ := status.FromError(err)
w.span.SetStatus(codes.Error, s.Message())
w.span.SetAttributes(statusCodeAttr(s.Code()))
} else {
w.span.SetAttributes(statusCodeAttr(grpc_codes.OK))
}

w.span.End()
}

// StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable
Expand Down Expand Up @@ -306,22 +270,7 @@ func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
span.End()
return s, err
}
stream := wrapClientStream(ctx, s, desc, cfg)

go func() {
err := <-stream.finished

if err != nil {
s, _ := status.FromError(err)
span.SetStatus(codes.Error, s.Message())
span.SetAttributes(statusCodeAttr(s.Code()))
} else {
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
}

span.End()
}()

stream := wrapClientStream(ctx, s, desc, span, cfg)
return stream, nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats) {
elapsedTime := float64(rs.EndTime.Sub(rs.BeginTime)) / float64(time.Millisecond)

c.rpcDuration.Record(wctx, elapsedTime, metric.WithAttributes(metricAttrs...))
c.rpcRequestsPerRPC.Record(wctx, gctx.messagesReceived, metric.WithAttributes(metricAttrs...))
c.rpcResponsesPerRPC.Record(wctx, gctx.messagesSent, metric.WithAttributes(metricAttrs...))

c.rpcRequestsPerRPC.Record(wctx, atomic.LoadInt64(&gctx.messagesReceived), metric.WithAttributes(metricAttrs...))
c.rpcResponsesPerRPC.Record(wctx, atomic.LoadInt64(&gctx.messagesSent), metric.WithAttributes(metricAttrs...))
default:
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ package test

import (
"context"
"io"
"net"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/interop"
"google.golang.org/grpc/status"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -31,6 +35,8 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"

testpb "google.golang.org/grpc/interop/grpc_testing"

"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
Expand All @@ -49,8 +55,7 @@ func TestStatsHandler(t *testing.T) {

listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err, "failed to open port")
err = newGrpcTest(
listener,
client := newGrpcTest(t, listener,
[]grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler(
otelgrpc.WithTracerProvider(clientTP),
Expand All @@ -66,7 +71,7 @@ func TestStatsHandler(t *testing.T) {
),
},
)
require.NoError(t, err)
doCalls(client)

t.Run("ClientSpans", func(t *testing.T) {
checkClientSpans(t, clientSR.Ended())
Expand Down Expand Up @@ -1317,3 +1322,72 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) {

metricdatatest.AssertEqual(t, expectedScopeMetric, rm.ScopeMetrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
}

// Ensure there is no data race for the following scenario:
// Bidirectional streaming + client cancels context in the middle of streaming.
func TestStatsHandlerConcurrentSafeContextCancellation(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err, "failed to open port")
client := newGrpcTest(t, listener,
[]grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
},
[]grpc.ServerOption{
grpc.StatsHandler(otelgrpc.NewServerHandler()),
},
)

const n = 10
for i := 0; i < n; i++ {
ctx, cancel := context.WithCancel(context.Background())
stream, err := client.FullDuplexCall(ctx)
require.NoError(t, err)

const messageCount = 10
var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < messageCount; i++ {
const reqSize = 1
pl := interop.ClientNewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
respParam := []*testpb.ResponseParameters{
{
Size: reqSize,
},
}
req := &testpb.StreamingOutputCallRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseParameters: respParam,
Payload: pl,
}
err := stream.Send(req)
if err == io.EOF { // possible due to context cancellation
require.ErrorIs(t, ctx.Err(), context.Canceled)
} else {
require.NoError(t, err)
}
}
require.NoError(t, stream.CloseSend())
}()

wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < messageCount; i++ {
_, err := stream.Recv()
if i > messageCount/2 {
cancel()
}
// must continue to receive messages until server acknowledges the cancellation, to ensure no data race happens there too
if status.Code(err) == codes.Canceled {
return
}
require.NoError(t, err)
}
}()

wg.Wait()
}
}
34 changes: 13 additions & 21 deletions instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"net"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -46,14 +45,18 @@ var wantInstrumentationScope = instrumentation.Scope{
Version: otelgrpc.Version(),
}

// newGrpcTest creats a grpc server, starts it, and executes all the calls, closes everything down.
func newGrpcTest(listener net.Listener, cOpt []grpc.DialOption, sOpt []grpc.ServerOption) error {
// newGrpcTest creats a grpc server, starts it, and returns the client, closes everything down during test cleanup.
func newGrpcTest(t testing.TB, listener net.Listener, cOpt []grpc.DialOption, sOpt []grpc.ServerOption) pb.TestServiceClient {
grpcServer := grpc.NewServer(sOpt...)
pb.RegisterTestServiceServer(grpcServer, interop.NewTestServer())
errCh := make(chan error)
go func() {
errCh <- grpcServer.Serve(listener)
}()
t.Cleanup(func() {
grpcServer.Stop()
assert.NoError(t, <-errCh)
})
ctx := context.Background()

cOpt = append(cOpt, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand All @@ -68,17 +71,12 @@ func newGrpcTest(listener net.Listener, cOpt []grpc.DialOption, sOpt []grpc.Serv
listener.Addr().String(),
cOpt...,
)
if err != nil {
return err
}
client := pb.NewTestServiceClient(conn)

doCalls(client)

conn.Close()
grpcServer.Stop()
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, conn.Close())
})

return <-errCh
return pb.NewTestServiceClient(conn)
}

func doCalls(client pb.TestServiceClient) {
Expand Down Expand Up @@ -106,7 +104,7 @@ func TestInterceptors(t *testing.T) {

listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err, "failed to open port")
err = newGrpcTest(listener,
client := newGrpcTest(t, listener,
[]grpc.DialOption{
//nolint:staticcheck // Interceptors are deprecated and will be removed in the next release.
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(
Expand All @@ -133,19 +131,13 @@ func TestInterceptors(t *testing.T) {
)),
},
)
require.NoError(t, err)
doCalls(client)

t.Run("UnaryClientSpans", func(t *testing.T) {
checkUnaryClientSpans(t, clientUnarySR.Ended(), listener.Addr().String())
})

t.Run("StreamClientSpans", func(t *testing.T) {
// StreamClientInterceptor ends the spans asynchronously.
// We need to wait for all spans before asserting them.
require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Len(c, clientStreamSR.Ended(), 3)
}, 5*time.Second, 100*time.Millisecond)

checkStreamClientSpans(t, clientStreamSR.Ended(), listener.Addr().String())
})

Expand Down

0 comments on commit 3393623

Please sign in to comment.