-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
opencensus.go
135 lines (112 loc) · 4.1 KB
/
opencensus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package octrace // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver/internal/octrace"
import (
"context"
"errors"
"io"
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
internaldata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus"
)
const (
receiverTransport = "grpc" // TODO: transport is being hard coded for now, investigate if info is available on context.
receiverDataFormat = "protobuf"
)
// Receiver is the type used to handle spans from OpenCensus exporters.
type Receiver struct {
agenttracepb.UnimplementedTraceServiceServer
nextConsumer consumer.Traces
obsrecv *receiverhelper.ObsReport
}
// New creates a new opencensus.Receiver reference.
func New(nextConsumer consumer.Traces, set receiver.CreateSettings) (*Receiver, error) {
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: set.ID,
Transport: receiverTransport,
LongLivedCtx: true,
ReceiverCreateSettings: set,
})
if err != nil {
return nil, err
}
return &Receiver{
nextConsumer: nextConsumer,
obsrecv: obsrecv,
}, nil
}
var _ agenttracepb.TraceServiceServer = (*Receiver)(nil)
var errUnimplemented = errors.New("unimplemented")
// Config handles configuration messages.
func (ocr *Receiver) Config(agenttracepb.TraceService_ConfigServer) error {
// TODO: Implement when we define the config receiver/sender.
return errUnimplemented
}
var errTraceExportProtocolViolation = errors.New("protocol violation: Export's first message must have a Node")
// Export is the gRPC method that receives streamed traces from
// OpenCensus-traceproto compatible libraries/applications.
func (ocr *Receiver) Export(tes agenttracepb.TraceService_ExportServer) error {
ctx := tes.Context()
// The first message MUST have a non-nil Node.
recv, err := tes.Recv()
if err != nil {
return err
}
// Check the condition that the first message has a non-nil Node.
if recv.Node == nil {
return errTraceExportProtocolViolation
}
var lastNonNilNode *commonpb.Node
var resource *resourcepb.Resource
// Now that we've got the first message with a Node, we can start to receive streamed up spans.
for {
lastNonNilNode, resource, err = ocr.processReceivedMsg(
ctx,
lastNonNilNode,
resource,
recv)
if err != nil {
return err
}
recv, err = tes.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
// Do not return EOF as an error so that grpc-gateway calls get an empty
// response with HTTP status code 200 rather than a 500 error with EOF.
return nil
}
return err
}
}
}
func (ocr *Receiver) processReceivedMsg(
longLivedRPCCtx context.Context,
lastNonNilNode *commonpb.Node,
resource *resourcepb.Resource,
recv *agenttracepb.ExportTraceServiceRequest,
) (*commonpb.Node, *resourcepb.Resource, error) {
// If a Node has been sent from downstream, save and use it.
if recv.Node != nil {
lastNonNilNode = recv.Node
}
// TODO(songya): differentiate between unset and nil resource. See
// https://github.com/census-instrumentation/opencensus-proto/issues/146.
if recv.Resource != nil {
resource = recv.Resource
}
td := internaldata.OCToTraces(lastNonNilNode, resource, recv.Spans)
err := ocr.sendToNextConsumer(longLivedRPCCtx, td)
return lastNonNilNode, resource, err
}
func (ocr *Receiver) sendToNextConsumer(longLivedRPCCtx context.Context, td ptrace.Traces) error {
ctx := ocr.obsrecv.StartTracesOp(longLivedRPCCtx)
numReceivedSpans := td.SpanCount()
err := ocr.nextConsumer.ConsumeTraces(ctx, td)
ocr.obsrecv.EndTracesOp(ctx, receiverDataFormat, numReceivedSpans, err)
return err
}