Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add uncompressed bytes to receiver and exporter span attributes #149

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,14 @@ func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hp
ctx, span := s.tracer.Start(wri.parent, "otel_arrow_stream_send")
defer span.End()

var err error
jmacd marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
// Due to potential double compression the netstats code knows uncompressed bytes
// value can be unreliable. Add span attributes for uncompressed size and set
// span Status if an error is returned.
s.netReporter.SetSpanSizeAttributes(ctx, sized)
s.netReporter.SetSpanError(ctx, err)
}
jmacd marked this conversation as resolved.
Show resolved Hide resolved
// Get the global propagator, to inject context. When there
// are no fields, it's a no-op propagator implementation and
// we can skip the allocations inside this block.
Expand Down Expand Up @@ -392,8 +400,8 @@ func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hp
// unreliable for arrow transport, so we instrument it
// directly here. Only the primary direction of transport
// is instrumented this way.
var sized netstats.SizesStruct
if wri.uncompSize != 0 {
var sized netstats.SizesStruct
sized.Method = s.method
sized.Length = int64(wri.uncompSize)
s.netReporter.CountSend(ctx, sized)
Expand Down
2 changes: 2 additions & 0 deletions collector/netstats/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (h statsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
}
ss.WireLength = int64(s.WireLength)
h.rep.CountReceive(ctx, ss)
h.rep.SetSpanSizeAttributes(ctx, ss)

case *stats.OutPayload:
var ss SizesStruct
Expand All @@ -69,6 +70,7 @@ func (h statsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
}
ss.WireLength = int64(s.WireLength)
h.rep.CountSend(ctx, ss)
h.rep.SetSpanSizeAttributes(ctx, ss)
}
}

Expand Down
46 changes: 46 additions & 0 deletions collector/netstats/netstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"context"

"go.opentelemetry.io/otel/attribute"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
noopmetric "go.opentelemetry.io/otel/metric/noop"
"go.uber.org/multierr"

Expand Down Expand Up @@ -77,6 +79,12 @@ type Interface interface {

// CountSend reports inbound bytes.
CountReceive(ctx context.Context, ss SizesStruct)

// SetSpanAttributes takes a context and adds attributes to the associated span.
SetSpanSizeAttributes(ctx context.Context, ss SizesStruct)

// SetSpanError set span status explicitly, if there is an non-nil error provided.
SetSpanError(ctx context.Context, err error)
jmacd marked this conversation as resolved.
Show resolved Hide resolved
}

// Noop is a no-op implementation of Interface.
Expand All @@ -86,6 +94,8 @@ var _ Interface = Noop{}

func (Noop) CountSend(ctx context.Context, ss SizesStruct) {}
func (Noop) CountReceive(ctx context.Context, ss SizesStruct) {}
func (Noop) SetSpanSizeAttributes(ctx context.Context, ss SizesStruct) {}
func (Noop) SetSpanError(ctx context.Context, err error) {}

const (
bytesUnit = "bytes"
Expand Down Expand Up @@ -236,3 +246,39 @@ func (rep *NetworkReporter) CountReceive(ctx context.Context, ss SizesStruct) {
rep.recvWireBytes.Add(ctx, ss.WireLength, attrs)
}
}

func (rep *NetworkReporter) SetSpanSizeAttributes(ctx context.Context, ss SizesStruct) {
if rep == nil {
return
}

span := trace.SpanFromContext(ctx)

var compressedName string
var uncompressedName string
// set attribute name based on exporter vs receiver
if rep.isExporter {
compressedName = "stream_client_compressed_bytes_sent"
uncompressedName = "stream_client_uncompressed_bytes_sent"
} else { // receiver attributes
compressedName = "stream_server_compressed_bytes_recv"
uncompressedName = "stream_server_uncompressed_bytes_recv"
}

if ss.Length > 0 {
span.SetAttributes(attribute.Int(uncompressedName, int(ss.Length)))
}

if ss.WireLength > 0 {
span.SetAttributes(attribute.Int(compressedName, int(ss.WireLength)))
}
}

func (rep *NetworkReporter) SetSpanError(ctx context.Context, err error) {
if err == nil {
return
}

span := trace.SpanFromContext(ctx)
span.SetStatus(otelcodes.Error, err.Error())
}
34 changes: 34 additions & 0 deletions collector/netstats/netstats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ package netstats

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"google.golang.org/grpc/stats"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -134,6 +137,37 @@ func testNetStatsExporter(t *testing.T, level configtelemetry.Level, expect map[
}
}

func TestNetStatsSetSpanAttrs(t *testing.T) {
testErr := fmt.Errorf("test error")

enr := &NetworkReporter{
isExporter: true,
}

expectedAttrs := []attribute.KeyValue{
attribute.Int("stream_client_uncompressed_bytes_sent", 1234567),
}
expectedStatus := sdktrace.Status{
Code: otelcodes.Error,
Description: "test error",
}

tp := sdktrace.NewTracerProvider()
ctx, sp := tp.Tracer("test/span").Start(context.Background(), "test-op")

var sized SizesStruct
sized.Method = "test"
sized.Length = 1234567
enr.SetSpanSizeAttributes(ctx, sized)
enr.SetSpanError(ctx, testErr)

actualAttrs := sp.(sdktrace.ReadOnlySpan).Attributes()
actualStatus := sp.(sdktrace.ReadOnlySpan).Status()

require.Equal(t, expectedAttrs, actualAttrs)
require.Equal(t, expectedStatus, actualStatus)
}

func TestNetStatsReceiverNone(t *testing.T) {
testNetStatsReceiver(t, configtelemetry.LevelNone, map[string]interface{}{})
}
Expand Down
31 changes: 22 additions & 9 deletions collector/receiver/otelarrowreceiver/internal/arrow/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,16 +374,29 @@ func (r *Receiver) anyStream(serverStream anyStreamServer, method string) (retEr

func (r *Receiver) processAndConsume(ctx context.Context, method string, arrowConsumer arrowRecord.ConsumerAPI, req *arrowpb.BatchArrowRecords, serverStream anyStreamServer, authErr error) error {
var err error
var bytesProcessed int64

ctx, span := r.tracer.Start(ctx, "otel_arrow_stream_recv")
defer span.End()

var err error
defer func() {
// Due to potential double compression the netstats code knows uncompressed bytes
// value can be unreliable. Add span attributes for uncompressed size and set
// span Status if an error is returned.
var sized netstats.SizesStruct
sized.Method = method
sized.Length = bytesProcessed
s.netReporter.SetSpanSizeAttributes(ctx, sized)
s.netReporter.SetSpanError(ctx, err)
jmacd marked this conversation as resolved.
Show resolved Hide resolved
}

// Process records: an error in this code path does
// not necessarily break the stream.
if authErr != nil {
err = authErr
} else {
err = r.processRecords(ctx, method, arrowConsumer, req)
bytesProcessed, err = r.processRecords(ctx, method, arrowConsumer, req)
}

// Note: Statuses can be batched, but we do not take
Expand Down Expand Up @@ -422,7 +435,7 @@ func (r *Receiver) processAndConsume(ctx context.Context, method string, arrowCo
func (r *Receiver) processRecords(ctx context.Context, method string, arrowConsumer arrowRecord.ConsumerAPI, records *arrowpb.BatchArrowRecords) error {
payloads := records.GetArrowPayloads()
if len(payloads) == 0 {
return nil
return int64(0), nil
}
var uncompSize int64
if r.telemetry.MetricsLevel > configtelemetry.LevelNormal {
Expand All @@ -440,7 +453,7 @@ func (r *Receiver) processRecords(ctx context.Context, method string, arrowConsu
switch payloads[0].Type {
case arrowpb.ArrowPayloadType_UNIVARIATE_METRICS:
if r.Metrics() == nil {
return status.Error(codes.Unimplemented, "metrics service not available")
return int64(0), status.Error(codes.Unimplemented, "metrics service not available")
}
var sizer pmetric.ProtoMarshaler
var numPts int
Expand All @@ -462,11 +475,11 @@ func (r *Receiver) processRecords(ctx context.Context, method string, arrowConsu
}
}
r.obsrecv.EndMetricsOp(ctx, streamFormat, numPts, err)
return err
return uncompSize, err

case arrowpb.ArrowPayloadType_LOGS:
if r.Logs() == nil {
return status.Error(codes.Unimplemented, "logs service not available")
return int64(0), status.Error(codes.Unimplemented, "logs service not available")
}
var sizer plog.ProtoMarshaler
var numLogs int
Expand All @@ -487,11 +500,11 @@ func (r *Receiver) processRecords(ctx context.Context, method string, arrowConsu
}
}
r.obsrecv.EndLogsOp(ctx, streamFormat, numLogs, err)
return err
return uncompSize, err

case arrowpb.ArrowPayloadType_SPANS:
if r.Traces() == nil {
return status.Error(codes.Unimplemented, "traces service not available")
return int64(0), status.Error(codes.Unimplemented, "traces service not available")
}
var sizer ptrace.ProtoMarshaler
var numSpans int
Expand All @@ -512,9 +525,9 @@ func (r *Receiver) processRecords(ctx context.Context, method string, arrowConsu
}
}
r.obsrecv.EndTracesOp(ctx, streamFormat, numSpans, err)
return err
return uncompSize, err

default:
return ErrUnrecognizedPayload
return int64(0), ErrUnrecognizedPayload
}
}
Loading