From eacf0acbcc6d54859ec3e2bd2330e29831492dbc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=C3=B1igo=20Horcajo?= Date: Mon, 12 Feb 2024 12:11:06 +0100 Subject: [PATCH] otelgrpc: add custom attributes to the stats handler Fixes https://github.com/open-telemetry/opentelemetry-go-contrib/issues/3894 --- .../grpc/otelgrpc/grpccontext.go | 76 ++++++++++ .../grpc/otelgrpc/stats_handler.go | 133 +++++++----------- 2 files changed, 130 insertions(+), 79 deletions(-) create mode 100644 instrumentation/google.golang.org/grpc/otelgrpc/grpccontext.go diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/grpccontext.go b/instrumentation/google.golang.org/grpc/otelgrpc/grpccontext.go new file mode 100644 index 00000000000..4dcdb806a2f --- /dev/null +++ b/instrumentation/google.golang.org/grpc/otelgrpc/grpccontext.go @@ -0,0 +1,76 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// metricsInfo contains metrics information for an RPC. +type metricsInfo struct { + msgReceived int64 + msgSent int64 +} + +// traceInfo contains tracing information for an RPC. +type traceInfo struct { + name string + kind trace.SpanKind +} + +// gRPCContext contains all the information needed to record metrics and traces. +type gRPCContext struct { + metricsInfo *metricsInfo + traceInfo *traceInfo + attrs []attribute.KeyValue + record bool +} + +// AddAttrs adds attributes to the given context. +func AddAttrs(ctx context.Context, attrs ...attribute.KeyValue) context.Context { + gctx, _ := gRPCContextFromContext(ctx) + gctx.addAttrs(attrs...) + return contextWithGRPCContext(ctx, gctx) +} + +// add attributes to a gRPCContext. +func (g *gRPCContext) addAttrs(attrs ...attribute.KeyValue) { + g.attrs = append(g.attrs, attrs...) +} + +type gRPCContextKey struct{} + +// contextWithGRPCContext returns a new context with the provided gRPCContext attached. +func contextWithGRPCContext(ctx context.Context, gctx *gRPCContext) context.Context { + return context.WithValue(ctx, gRPCContextKey{}, gctx) +} + +// gRPCContextFromContext retrieves a GRPCContext instance from the provided context if +// one is available. If no GRPCContext was found in the provided context a new, empty +// GRPCContext is returned and the second return value is false. In this case it is +// safe to use the GRPCContext but any attributes added to it will not be used. +func gRPCContextFromContext(ctx context.Context) (*gRPCContext, bool) { // nolint: revive + l, ok := ctx.Value(gRPCContextKey{}).(*gRPCContext) + if !ok { + l = &gRPCContext{ + metricsInfo: &metricsInfo{}, + traceInfo: &traceInfo{}, + } + } + return l, ok +} diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go index fad58733fec..c1dcf828b0a 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go @@ -14,22 +14,12 @@ import ( "google.golang.org/grpc/status" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/metric" semconv "go.opentelemetry.io/otel/semconv/v1.17.0" "go.opentelemetry.io/otel/trace" ) -type gRPCContextKey struct{} - -type gRPCContext struct { - messagesReceived int64 - messagesSent int64 - metricAttrs []attribute.KeyValue - record bool -} - type serverHandler struct { *config } @@ -54,31 +44,17 @@ func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) { // TagRPC can attach some information to the given context. func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { - ctx = extract(ctx, h.config.Propagators) + ctx = extract(ctx, h.Propagators) - name, attrs := internal.ParseFullMethod(info.FullMethodName) - attrs = append(attrs, RPCSystemGRPC) - ctx, _ = h.tracer.Start( - trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)), - name, - trace.WithSpanKind(trace.SpanKindServer), - trace.WithAttributes(attrs...), - ) + gctx, _ := gRPCContextFromContext(ctx) + gctx.traceInfo.kind = trace.SpanKindServer - gctx := gRPCContext{ - metricAttrs: attrs, - record: true, - } - if h.config.Filter != nil { - gctx.record = h.config.Filter(info) - } - return context.WithValue(ctx, gRPCContextKey{}, &gctx) + return h.tagRPC(ctx, info) } // HandleRPC processes the RPC stats. func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { - isServer := true - h.handleRPC(ctx, rs, isServer) + h.handleRPC(ctx, rs) } type clientHandler struct { @@ -96,30 +72,17 @@ func NewClientHandler(opts ...Option) stats.Handler { // TagRPC can attach some information to the given context. func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { - name, attrs := internal.ParseFullMethod(info.FullMethodName) - attrs = append(attrs, RPCSystemGRPC) - ctx, _ = h.tracer.Start( - ctx, - name, - trace.WithSpanKind(trace.SpanKindClient), - trace.WithAttributes(attrs...), - ) + gctx, _ := gRPCContextFromContext(ctx) + gctx.traceInfo.kind = trace.SpanKindClient - gctx := gRPCContext{ - metricAttrs: attrs, - record: true, - } - if h.config.Filter != nil { - gctx.record = h.config.Filter(info) - } + ctx = h.tagRPC(contextWithGRPCContext(ctx, gctx), info) - return inject(context.WithValue(ctx, gRPCContextKey{}, &gctx), h.config.Propagators) + return inject(ctx, h.Propagators) } // HandleRPC processes the RPC stats. func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { - isServer := false - h.handleRPC(ctx, rs, isServer) + h.handleRPC(ctx, rs) } // TagConn can attach some information to the given context. @@ -132,28 +95,48 @@ func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) { // no-op } -func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool) { // nolint: revive // isServer is not a control flag. +func (c *config) tagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + name, attrs := internal.ParseFullMethod(info.FullMethodName) + attrs = append(attrs, RPCSystemGRPC) + + gctx, _ := gRPCContextFromContext(ctx) + gctx.traceInfo.name = name + gctx.addAttrs(attrs...) + gctx.record = true + + if c.Filter != nil { + gctx.record = c.Filter(info) + } + + if gctx.traceInfo.kind == trace.SpanKindServer { + ctx = trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)) + } + + ctx, _ = c.tracer.Start( + ctx, + gctx.traceInfo.name, + trace.WithSpanKind(gctx.traceInfo.kind), + trace.WithAttributes(gctx.attrs...), + ) + + return contextWithGRPCContext(ctx, gctx) +} + +func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats) { + gctx, _ := gRPCContextFromContext(ctx) span := trace.SpanFromContext(ctx) - var metricAttrs []attribute.KeyValue + var messageId int64 - gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext) - if gctx != nil { - if !gctx.record { - return - } - metricAttrs = make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1) - metricAttrs = append(metricAttrs, gctx.metricAttrs...) + if gctx != nil && !gctx.record { + return } switch rs := rs.(type) { case *stats.Begin: case *stats.InPayload: - if gctx != nil { - messageId = atomic.AddInt64(&gctx.messagesReceived, 1) - c.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...)) - } - + messageId = atomic.AddInt64(&gctx.metricsInfo.msgReceived, 1) + c.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributes(gctx.attrs...)) if c.ReceivedEvent { span.AddEvent("message", trace.WithAttributes( @@ -165,11 +148,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool ) } case *stats.OutPayload: - if gctx != nil { - messageId = atomic.AddInt64(&gctx.messagesSent, 1) - c.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...)) - } - + messageId = atomic.AddInt64(&gctx.metricsInfo.msgSent, 1) + c.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributes(gctx.attrs...)) if c.SentEvent { span.AddEvent("message", trace.WithAttributes( @@ -186,33 +166,28 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool span.SetAttributes(peerAttr(p.Addr.String())...) } case *stats.End: - var rpcStatusAttr attribute.KeyValue - if rs.Error != nil { s, _ := status.FromError(rs.Error) - if isServer { + if gctx.traceInfo.kind == trace.SpanKindServer { statusCode, msg := serverStatus(s) span.SetStatus(statusCode, msg) } else { span.SetStatus(codes.Error, s.Message()) } - rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(s.Code())) + gctx.addAttrs(semconv.RPCGRPCStatusCodeKey.Int(int(s.Code()))) } else { - rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(grpc_codes.OK)) + gctx.addAttrs(semconv.RPCGRPCStatusCodeKey.Int(int(grpc_codes.OK))) } - span.SetAttributes(rpcStatusAttr) - span.End() - metricAttrs = append(metricAttrs, rpcStatusAttr) + span.SetAttributes(gctx.attrs...) + span.End() // Use floating point division here for higher precision (instead of Millisecond method). elapsedTime := float64(rs.EndTime.Sub(rs.BeginTime)) / float64(time.Millisecond) - c.rpcDuration.Record(ctx, elapsedTime, metric.WithAttributes(metricAttrs...)) - if gctx != nil { - c.rpcRequestsPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesReceived), metric.WithAttributes(metricAttrs...)) - c.rpcResponsesPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesSent), metric.WithAttributes(metricAttrs...)) - } + c.rpcDuration.Record(ctx, elapsedTime, metric.WithAttributes(gctx.attrs...)) + c.rpcRequestsPerRPC.Record(ctx, atomic.LoadInt64(&gctx.metricsInfo.msgReceived), metric.WithAttributes(gctx.attrs...)) + c.rpcResponsesPerRPC.Record(ctx, atomic.LoadInt64(&gctx.metricsInfo.msgSent), metric.WithAttributes(gctx.attrs...)) default: return }