Skip to content

Commit

Permalink
add uncompressed bytes to receiver and exporter span attributes (#149)
Browse files Browse the repository at this point in the history
This PR adds a new method to the netstats reporter, called
`SetSpanAttributes`. This method adds uncompressed request size as an
attribute to the provided span and explicitly sets the span status in
the case of an error. This method is used to track errors for both
`server.Send` and `client.Send` grpc operations.
  • Loading branch information
moh-osman3 committed Feb 8, 2024
1 parent 39896a2 commit cfd4ede
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 4 deletions.
11 changes: 10 additions & 1 deletion collector/exporter/otelarrowexporter/internal/arrow/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)
Expand Down Expand Up @@ -336,10 +337,17 @@ func (s *Stream) write(ctx context.Context) error {
}
}

func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hpack.Encoder) error {
func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hpack.Encoder) (retErr error) {
ctx, span := s.tracer.Start(wri.parent, "otel_arrow_stream_send")
defer span.End()

defer func() {
// Set span status if an error is returned.
if retErr != nil {
span := trace.SpanFromContext(ctx)
span.SetStatus(otelcodes.Error, retErr.Error())
}
}()
// Get the global propagator, to inject context. When there
// are no fields, it's a no-op propagator implementation and
// we can skip the allocations inside this block.
Expand Down Expand Up @@ -397,6 +405,7 @@ func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hp
sized.Method = s.method
sized.Length = int64(wri.uncompSize)
s.netReporter.CountSend(ctx, sized)
s.netReporter.SetSpanSizeAttributes(ctx, sized)
}

if err := s.client.Send(batch); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions collector/netstats/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (h statsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
}
ss.WireLength = int64(s.WireLength)
h.rep.CountReceive(ctx, ss)
h.rep.SetSpanSizeAttributes(ctx, ss)

case *stats.OutPayload:
var ss SizesStruct
Expand All @@ -69,6 +70,7 @@ func (h statsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
}
ss.WireLength = int64(s.WireLength)
h.rep.CountSend(ctx, ss)
h.rep.SetSpanSizeAttributes(ctx, ss)
}
}

Expand Down
36 changes: 34 additions & 2 deletions collector/netstats/netstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
noopmetric "go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"

"go.opentelemetry.io/collector/config/configtelemetry"
Expand Down Expand Up @@ -77,15 +78,19 @@ type Interface interface {

// CountSend reports inbound bytes.
CountReceive(ctx context.Context, ss SizesStruct)

// SetSpanAttributes takes a context and adds attributes to the associated span.
SetSpanSizeAttributes(ctx context.Context, ss SizesStruct)
}

// Noop is a no-op implementation of Interface.
type Noop struct{}

var _ Interface = Noop{}

func (Noop) CountSend(ctx context.Context, ss SizesStruct) {}
func (Noop) CountReceive(ctx context.Context, ss SizesStruct) {}
func (Noop) CountSend(ctx context.Context, ss SizesStruct) {}
func (Noop) CountReceive(ctx context.Context, ss SizesStruct) {}
func (Noop) SetSpanSizeAttributes(ctx context.Context, ss SizesStruct) {}

const (
bytesUnit = "bytes"
Expand Down Expand Up @@ -236,3 +241,30 @@ func (rep *NetworkReporter) CountReceive(ctx context.Context, ss SizesStruct) {
rep.recvWireBytes.Add(ctx, ss.WireLength, attrs)
}
}

func (rep *NetworkReporter) SetSpanSizeAttributes(ctx context.Context, ss SizesStruct) {
if rep == nil {
return
}

span := trace.SpanFromContext(ctx)

var compressedName string
var uncompressedName string
// set attribute name based on exporter vs receiver
if rep.isExporter {
compressedName = "stream_client_compressed_bytes_sent"
uncompressedName = "stream_client_uncompressed_bytes_sent"
} else { // receiver attributes
compressedName = "stream_server_compressed_bytes_recv"
uncompressedName = "stream_server_uncompressed_bytes_recv"
}

if ss.Length > 0 {
span.SetAttributes(attribute.Int(uncompressedName, int(ss.Length)))
}

if ss.WireLength > 0 {
span.SetAttributes(attribute.Int(compressedName, int(ss.WireLength)))
}
}
52 changes: 52 additions & 0 deletions collector/netstats/netstats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"google.golang.org/grpc/stats"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -134,6 +135,57 @@ func testNetStatsExporter(t *testing.T, level configtelemetry.Level, expect map[
}
}

func TestNetStatsSetSpanAttrs(t *testing.T) {
tests := []struct {
name string
attrs []attribute.KeyValue
isExporter bool
length int
wireLength int
}{
{
name: "set exporter attributes",
isExporter: true,
length: 1234567,
wireLength: 123,
attrs: []attribute.KeyValue{
attribute.Int("stream_client_uncompressed_bytes_sent", 1234567),
attribute.Int("stream_client_compressed_bytes_sent", 123),
},
},
{
name: "set receiver attributes",
isExporter: false,
length: 8901234,
wireLength: 890,
attrs: []attribute.KeyValue{
attribute.Int("stream_server_uncompressed_bytes_recv", 8901234),
attribute.Int("stream_server_compressed_bytes_recv", 890),
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
enr := &NetworkReporter{
isExporter: tc.isExporter,
}

tp := sdktrace.NewTracerProvider()
ctx, sp := tp.Tracer("test/span").Start(context.Background(), "test-op")

var sized SizesStruct
sized.Method = "test"
sized.Length = int64(tc.length)
sized.WireLength = int64(tc.wireLength)
enr.SetSpanSizeAttributes(ctx, sized)

actualAttrs := sp.(sdktrace.ReadOnlySpan).Attributes()

require.Equal(t, tc.attrs, actualAttrs)
})
}
}

func TestNetStatsReceiverNone(t *testing.T) {
testNetStatsReceiver(t, configtelemetry.LevelNone, map[string]interface{}{})
}
Expand Down
12 changes: 11 additions & 1 deletion collector/receiver/otelarrowreceiver/internal/arrow/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.opentelemetry.io/otel"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)
Expand Down Expand Up @@ -372,12 +373,20 @@ func (r *Receiver) anyStream(serverStream anyStreamServer, method string) (retEr
}
}

func (r *Receiver) processAndConsume(ctx context.Context, method string, arrowConsumer arrowRecord.ConsumerAPI, req *arrowpb.BatchArrowRecords, serverStream anyStreamServer, authErr error) error {
func (r *Receiver) processAndConsume(ctx context.Context, method string, arrowConsumer arrowRecord.ConsumerAPI, req *arrowpb.BatchArrowRecords, serverStream anyStreamServer, authErr error) (retErr error) {
var err error

ctx, span := r.tracer.Start(ctx, "otel_arrow_stream_recv")
defer span.End()

defer func() {
// Set span status if an error is returned.
if retErr != nil {
span := trace.SpanFromContext(ctx)
span.SetStatus(otelcodes.Error, retErr.Error())
}
}()

// Process records: an error in this code path does
// not necessarily break the stream.
if authErr != nil {
Expand Down Expand Up @@ -435,6 +444,7 @@ func (r *Receiver) processRecords(ctx context.Context, method string, arrowConsu
sized.Method = method
sized.Length = uncompSize
r.netReporter.CountReceive(ctx, sized)
r.netReporter.SetSpanSizeAttributes(ctx, sized)
}()
}
switch payloads[0].Type {
Expand Down

0 comments on commit cfd4ede

Please sign in to comment.