From cfd4ede9d8633c97d7d0a02ee7af940111b4b750 Mon Sep 17 00:00:00 2001 From: Moh Osman <59479562+moh-osman3@users.noreply.github.com> Date: Thu, 8 Feb 2024 17:34:31 -0500 Subject: [PATCH] add uncompressed bytes to receiver and exporter span attributes (#149) This PR adds a new method to the netstats reporter, called `SetSpanAttributes`. This method adds uncompressed request size as an attribute to the provided span and explicitly sets the span status in the case of an error. This method is used to track errors for both `server.Send` and `client.Send` grpc operations. --- .../internal/arrow/stream.go | 11 +++- collector/netstats/handler.go | 2 + collector/netstats/netstats.go | 36 ++++++++++++- collector/netstats/netstats_test.go | 52 +++++++++++++++++++ .../otelarrowreceiver/internal/arrow/arrow.go | 12 ++++- 5 files changed, 109 insertions(+), 4 deletions(-) diff --git a/collector/exporter/otelarrowexporter/internal/arrow/stream.go b/collector/exporter/otelarrowexporter/internal/arrow/stream.go index 9440734a..33372208 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/stream.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/stream.go @@ -31,6 +31,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/otel" + otelcodes "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" ) @@ -336,10 +337,17 @@ func (s *Stream) write(ctx context.Context) error { } } -func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hpack.Encoder) error { +func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hpack.Encoder) (retErr error) { ctx, span := s.tracer.Start(wri.parent, "otel_arrow_stream_send") defer span.End() + defer func() { + // Set span status if an error is returned. + if retErr != nil { + span := trace.SpanFromContext(ctx) + span.SetStatus(otelcodes.Error, retErr.Error()) + } + }() // Get the global propagator, to inject context. When there // are no fields, it's a no-op propagator implementation and // we can skip the allocations inside this block. @@ -397,6 +405,7 @@ func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hp sized.Method = s.method sized.Length = int64(wri.uncompSize) s.netReporter.CountSend(ctx, sized) + s.netReporter.SetSpanSizeAttributes(ctx, sized) } if err := s.client.Send(batch); err != nil { diff --git a/collector/netstats/handler.go b/collector/netstats/handler.go index 7c8224ac..5716d2e4 100644 --- a/collector/netstats/handler.go +++ b/collector/netstats/handler.go @@ -60,6 +60,7 @@ func (h statsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { } ss.WireLength = int64(s.WireLength) h.rep.CountReceive(ctx, ss) + h.rep.SetSpanSizeAttributes(ctx, ss) case *stats.OutPayload: var ss SizesStruct @@ -69,6 +70,7 @@ func (h statsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { } ss.WireLength = int64(s.WireLength) h.rep.CountSend(ctx, ss) + h.rep.SetSpanSizeAttributes(ctx, ss) } } diff --git a/collector/netstats/netstats.go b/collector/netstats/netstats.go index 4165dbb6..a57a6d4e 100644 --- a/collector/netstats/netstats.go +++ b/collector/netstats/netstats.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" noopmetric "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" "go.uber.org/multierr" "go.opentelemetry.io/collector/config/configtelemetry" @@ -77,6 +78,9 @@ type Interface interface { // CountSend reports inbound bytes. CountReceive(ctx context.Context, ss SizesStruct) + + // SetSpanAttributes takes a context and adds attributes to the associated span. + SetSpanSizeAttributes(ctx context.Context, ss SizesStruct) } // Noop is a no-op implementation of Interface. @@ -84,8 +88,9 @@ type Noop struct{} var _ Interface = Noop{} -func (Noop) CountSend(ctx context.Context, ss SizesStruct) {} -func (Noop) CountReceive(ctx context.Context, ss SizesStruct) {} +func (Noop) CountSend(ctx context.Context, ss SizesStruct) {} +func (Noop) CountReceive(ctx context.Context, ss SizesStruct) {} +func (Noop) SetSpanSizeAttributes(ctx context.Context, ss SizesStruct) {} const ( bytesUnit = "bytes" @@ -236,3 +241,30 @@ func (rep *NetworkReporter) CountReceive(ctx context.Context, ss SizesStruct) { rep.recvWireBytes.Add(ctx, ss.WireLength, attrs) } } + +func (rep *NetworkReporter) SetSpanSizeAttributes(ctx context.Context, ss SizesStruct) { + if rep == nil { + return + } + + span := trace.SpanFromContext(ctx) + + var compressedName string + var uncompressedName string + // set attribute name based on exporter vs receiver + if rep.isExporter { + compressedName = "stream_client_compressed_bytes_sent" + uncompressedName = "stream_client_uncompressed_bytes_sent" + } else { // receiver attributes + compressedName = "stream_server_compressed_bytes_recv" + uncompressedName = "stream_server_uncompressed_bytes_recv" + } + + if ss.Length > 0 { + span.SetAttributes(attribute.Int(uncompressedName, int(ss.Length))) + } + + if ss.WireLength > 0 { + span.SetAttributes(attribute.Int(compressedName, int(ss.WireLength))) + } +} diff --git a/collector/netstats/netstats_test.go b/collector/netstats/netstats_test.go index ca7a9a9c..9e71f7d7 100644 --- a/collector/netstats/netstats_test.go +++ b/collector/netstats/netstats_test.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" "google.golang.org/grpc/stats" "go.opentelemetry.io/collector/component" @@ -134,6 +135,57 @@ func testNetStatsExporter(t *testing.T, level configtelemetry.Level, expect map[ } } +func TestNetStatsSetSpanAttrs(t *testing.T) { + tests := []struct { + name string + attrs []attribute.KeyValue + isExporter bool + length int + wireLength int + }{ + { + name: "set exporter attributes", + isExporter: true, + length: 1234567, + wireLength: 123, + attrs: []attribute.KeyValue{ + attribute.Int("stream_client_uncompressed_bytes_sent", 1234567), + attribute.Int("stream_client_compressed_bytes_sent", 123), + }, + }, + { + name: "set receiver attributes", + isExporter: false, + length: 8901234, + wireLength: 890, + attrs: []attribute.KeyValue{ + attribute.Int("stream_server_uncompressed_bytes_recv", 8901234), + attribute.Int("stream_server_compressed_bytes_recv", 890), + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + enr := &NetworkReporter{ + isExporter: tc.isExporter, + } + + tp := sdktrace.NewTracerProvider() + ctx, sp := tp.Tracer("test/span").Start(context.Background(), "test-op") + + var sized SizesStruct + sized.Method = "test" + sized.Length = int64(tc.length) + sized.WireLength = int64(tc.wireLength) + enr.SetSpanSizeAttributes(ctx, sized) + + actualAttrs := sp.(sdktrace.ReadOnlySpan).Attributes() + + require.Equal(t, tc.attrs, actualAttrs) + }) + } +} + func TestNetStatsReceiverNone(t *testing.T) { testNetStatsReceiver(t, configtelemetry.LevelNone, map[string]interface{}{}) } diff --git a/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go b/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go index 516ed295..87b782f2 100644 --- a/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go +++ b/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go @@ -35,6 +35,7 @@ import ( "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/otel" + otelcodes "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" ) @@ -372,12 +373,20 @@ func (r *Receiver) anyStream(serverStream anyStreamServer, method string) (retEr } } -func (r *Receiver) processAndConsume(ctx context.Context, method string, arrowConsumer arrowRecord.ConsumerAPI, req *arrowpb.BatchArrowRecords, serverStream anyStreamServer, authErr error) error { +func (r *Receiver) processAndConsume(ctx context.Context, method string, arrowConsumer arrowRecord.ConsumerAPI, req *arrowpb.BatchArrowRecords, serverStream anyStreamServer, authErr error) (retErr error) { var err error ctx, span := r.tracer.Start(ctx, "otel_arrow_stream_recv") defer span.End() + defer func() { + // Set span status if an error is returned. + if retErr != nil { + span := trace.SpanFromContext(ctx) + span.SetStatus(otelcodes.Error, retErr.Error()) + } + }() + // Process records: an error in this code path does // not necessarily break the stream. if authErr != nil { @@ -435,6 +444,7 @@ func (r *Receiver) processRecords(ctx context.Context, method string, arrowConsu sized.Method = method sized.Length = uncompSize r.netReporter.CountReceive(ctx, sized) + r.netReporter.SetSpanSizeAttributes(ctx, sized) }() } switch payloads[0].Type {