diff --git a/collector/exporter/otelarrowexporter/internal/arrow/stream.go b/collector/exporter/otelarrowexporter/internal/arrow/stream.go index 9440734a..33372208 100644 --- a/collector/exporter/otelarrowexporter/internal/arrow/stream.go +++ b/collector/exporter/otelarrowexporter/internal/arrow/stream.go @@ -31,6 +31,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/otel" + otelcodes "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" ) @@ -336,10 +337,17 @@ func (s *Stream) write(ctx context.Context) error { } } -func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hpack.Encoder) error { +func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hpack.Encoder) (retErr error) { ctx, span := s.tracer.Start(wri.parent, "otel_arrow_stream_send") defer span.End() + defer func() { + // Set span status if an error is returned. + if retErr != nil { + span := trace.SpanFromContext(ctx) + span.SetStatus(otelcodes.Error, retErr.Error()) + } + }() // Get the global propagator, to inject context. When there // are no fields, it's a no-op propagator implementation and // we can skip the allocations inside this block. @@ -397,6 +405,7 @@ func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hp sized.Method = s.method sized.Length = int64(wri.uncompSize) s.netReporter.CountSend(ctx, sized) + s.netReporter.SetSpanSizeAttributes(ctx, sized) } if err := s.client.Send(batch); err != nil { diff --git a/collector/netstats/handler.go b/collector/netstats/handler.go index 7c8224ac..5716d2e4 100644 --- a/collector/netstats/handler.go +++ b/collector/netstats/handler.go @@ -60,6 +60,7 @@ func (h statsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { } ss.WireLength = int64(s.WireLength) h.rep.CountReceive(ctx, ss) + h.rep.SetSpanSizeAttributes(ctx, ss) case *stats.OutPayload: var ss SizesStruct @@ -69,6 +70,7 @@ func (h statsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { } ss.WireLength = int64(s.WireLength) h.rep.CountSend(ctx, ss) + h.rep.SetSpanSizeAttributes(ctx, ss) } } diff --git a/collector/netstats/netstats.go b/collector/netstats/netstats.go index 4165dbb6..a57a6d4e 100644 --- a/collector/netstats/netstats.go +++ b/collector/netstats/netstats.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" noopmetric "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" "go.uber.org/multierr" "go.opentelemetry.io/collector/config/configtelemetry" @@ -77,6 +78,9 @@ type Interface interface { // CountSend reports inbound bytes. CountReceive(ctx context.Context, ss SizesStruct) + + // SetSpanAttributes takes a context and adds attributes to the associated span. + SetSpanSizeAttributes(ctx context.Context, ss SizesStruct) } // Noop is a no-op implementation of Interface. @@ -84,8 +88,9 @@ type Noop struct{} var _ Interface = Noop{} -func (Noop) CountSend(ctx context.Context, ss SizesStruct) {} -func (Noop) CountReceive(ctx context.Context, ss SizesStruct) {} +func (Noop) CountSend(ctx context.Context, ss SizesStruct) {} +func (Noop) CountReceive(ctx context.Context, ss SizesStruct) {} +func (Noop) SetSpanSizeAttributes(ctx context.Context, ss SizesStruct) {} const ( bytesUnit = "bytes" @@ -236,3 +241,30 @@ func (rep *NetworkReporter) CountReceive(ctx context.Context, ss SizesStruct) { rep.recvWireBytes.Add(ctx, ss.WireLength, attrs) } } + +func (rep *NetworkReporter) SetSpanSizeAttributes(ctx context.Context, ss SizesStruct) { + if rep == nil { + return + } + + span := trace.SpanFromContext(ctx) + + var compressedName string + var uncompressedName string + // set attribute name based on exporter vs receiver + if rep.isExporter { + compressedName = "stream_client_compressed_bytes_sent" + uncompressedName = "stream_client_uncompressed_bytes_sent" + } else { // receiver attributes + compressedName = "stream_server_compressed_bytes_recv" + uncompressedName = "stream_server_uncompressed_bytes_recv" + } + + if ss.Length > 0 { + span.SetAttributes(attribute.Int(uncompressedName, int(ss.Length))) + } + + if ss.WireLength > 0 { + span.SetAttributes(attribute.Int(compressedName, int(ss.WireLength))) + } +} diff --git a/collector/netstats/netstats_test.go b/collector/netstats/netstats_test.go index ca7a9a9c..9e71f7d7 100644 --- a/collector/netstats/netstats_test.go +++ b/collector/netstats/netstats_test.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" "google.golang.org/grpc/stats" "go.opentelemetry.io/collector/component" @@ -134,6 +135,57 @@ func testNetStatsExporter(t *testing.T, level configtelemetry.Level, expect map[ } } +func TestNetStatsSetSpanAttrs(t *testing.T) { + tests := []struct { + name string + attrs []attribute.KeyValue + isExporter bool + length int + wireLength int + }{ + { + name: "set exporter attributes", + isExporter: true, + length: 1234567, + wireLength: 123, + attrs: []attribute.KeyValue{ + attribute.Int("stream_client_uncompressed_bytes_sent", 1234567), + attribute.Int("stream_client_compressed_bytes_sent", 123), + }, + }, + { + name: "set receiver attributes", + isExporter: false, + length: 8901234, + wireLength: 890, + attrs: []attribute.KeyValue{ + attribute.Int("stream_server_uncompressed_bytes_recv", 8901234), + attribute.Int("stream_server_compressed_bytes_recv", 890), + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + enr := &NetworkReporter{ + isExporter: tc.isExporter, + } + + tp := sdktrace.NewTracerProvider() + ctx, sp := tp.Tracer("test/span").Start(context.Background(), "test-op") + + var sized SizesStruct + sized.Method = "test" + sized.Length = int64(tc.length) + sized.WireLength = int64(tc.wireLength) + enr.SetSpanSizeAttributes(ctx, sized) + + actualAttrs := sp.(sdktrace.ReadOnlySpan).Attributes() + + require.Equal(t, tc.attrs, actualAttrs) + }) + } +} + func TestNetStatsReceiverNone(t *testing.T) { testNetStatsReceiver(t, configtelemetry.LevelNone, map[string]interface{}{}) } diff --git a/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go b/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go index 516ed295..87b782f2 100644 --- a/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go +++ b/collector/receiver/otelarrowreceiver/internal/arrow/arrow.go @@ -35,6 +35,7 @@ import ( "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/otel" + otelcodes "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" ) @@ -372,12 +373,20 @@ func (r *Receiver) anyStream(serverStream anyStreamServer, method string) (retEr } } -func (r *Receiver) processAndConsume(ctx context.Context, method string, arrowConsumer arrowRecord.ConsumerAPI, req *arrowpb.BatchArrowRecords, serverStream anyStreamServer, authErr error) error { +func (r *Receiver) processAndConsume(ctx context.Context, method string, arrowConsumer arrowRecord.ConsumerAPI, req *arrowpb.BatchArrowRecords, serverStream anyStreamServer, authErr error) (retErr error) { var err error ctx, span := r.tracer.Start(ctx, "otel_arrow_stream_recv") defer span.End() + defer func() { + // Set span status if an error is returned. + if retErr != nil { + span := trace.SpanFromContext(ctx) + span.SetStatus(otelcodes.Error, retErr.Error()) + } + }() + // Process records: an error in this code path does // not necessarily break the stream. if authErr != nil { @@ -435,6 +444,7 @@ func (r *Receiver) processRecords(ctx context.Context, method string, arrowConsu sized.Method = method sized.Length = uncompSize r.netReporter.CountReceive(ctx, sized) + r.netReporter.SetSpanSizeAttributes(ctx, sized) }() } switch payloads[0].Type {