/
opencensus.go
158 lines (133 loc) · 5.31 KB
/
opencensus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ocmetrics
import (
"context"
"errors"
"io"
"time"
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"go.opencensus.io/trace"
"google.golang.org/api/support/bundler"
"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/observability"
"github.com/open-telemetry/opentelemetry-collector/oterr"
)
// Receiver is the type used to handle metrics from OpenCensus exporters.
type Receiver struct {
nextConsumer consumer.MetricsConsumer
metricBufferPeriod time.Duration
metricBufferCount int
}
// New creates a new ocmetrics.Receiver reference.
func New(nextConsumer consumer.MetricsConsumer, opts ...Option) (*Receiver, error) {
if nextConsumer == nil {
return nil, oterr.ErrNilNextConsumer
}
ocr := &Receiver{nextConsumer: nextConsumer}
for _, opt := range opts {
opt.WithReceiver(ocr)
}
return ocr, nil
}
var _ agentmetricspb.MetricsServiceServer = (*Receiver)(nil)
var errMetricsExportProtocolViolation = errors.New("protocol violation: Export's first message must have a Node")
const receiverTagValue = "oc_metrics"
// Export is the gRPC method that receives streamed metrics from
// OpenCensus-metricproto compatible libraries/applications.
func (ocr *Receiver) Export(mes agentmetricspb.MetricsService_ExportServer) error {
// The bundler will receive batches of metrics i.e. []*metricspb.Metric
// We need to ensure that it propagates the receiver name as a tag
ctxWithReceiverName := observability.ContextWithReceiverName(mes.Context(), receiverTagValue)
metricsBundler := bundler.NewBundler((*consumerdata.MetricsData)(nil), func(payload interface{}) {
ocr.batchMetricExporting(ctxWithReceiverName, payload)
})
metricBufferPeriod := ocr.metricBufferPeriod
if metricBufferPeriod <= 0 {
metricBufferPeriod = 2 * time.Second // Arbitrary value
}
metricBufferCount := ocr.metricBufferCount
if metricBufferCount <= 0 {
// TODO: (@odeke-em) provide an option to disable any buffering
metricBufferCount = 50 // Arbitrary value
}
metricsBundler.DelayThreshold = metricBufferPeriod
metricsBundler.BundleCountThreshold = metricBufferCount
// Retrieve the first message. It MUST have a non-nil Node.
recv, err := mes.Recv()
if err != nil {
return err
}
// Check the condition that the first message has a non-nil Node.
if recv.Node == nil {
return errMetricsExportProtocolViolation
}
var lastNonNilNode *commonpb.Node
var resource *resourcepb.Resource
// Now that we've got the first message with a Node, we can start to receive streamed up metrics.
for {
// If a Node has been sent from downstream, save and use it.
if recv.Node != nil {
lastNonNilNode = recv.Node
}
// TODO(songya): differentiate between unset and nil resource. See
// https://github.com/census-instrumentation/opencensus-proto/issues/146.
if recv.Resource != nil {
resource = recv.Resource
}
processReceivedMetrics(lastNonNilNode, resource, recv.Metrics, metricsBundler)
recv, err = mes.Recv()
if err != nil {
if err == io.EOF {
// Do not return EOF as an error so that grpc-gateway calls get an empty
// response with HTTP status code 200 rather than a 500 error with EOF.
return nil
}
return err
}
}
}
func processReceivedMetrics(ni *commonpb.Node, resource *resourcepb.Resource, metrics []*metricspb.Metric, bundler *bundler.Bundler) {
// Firstly, we'll add them to the bundler.
if len(metrics) > 0 {
bundlerPayload := &consumerdata.MetricsData{Node: ni, Metrics: metrics, Resource: resource}
bundler.Add(bundlerPayload, len(bundlerPayload.Metrics))
}
}
func (ocr *Receiver) batchMetricExporting(longLivedRPCCtx context.Context, payload interface{}) {
mds := payload.([]*consumerdata.MetricsData)
if len(mds) == 0 {
return
}
// Trace this method
ctx, span := trace.StartSpan(context.Background(), "OpenCensusMetricsReceiver.Export")
defer span.End()
// TODO: (@odeke-em) investigate if it is necessary
// to group nodes with their respective metrics during
// bundledMetrics list unfurling then send metrics grouped per node
// If the starting RPC has a parent span, then add it as a parent link.
observability.SetParentLink(longLivedRPCCtx, span)
nMetrics := int64(0)
for _, md := range mds {
ocr.nextConsumer.ConsumeMetricsData(ctx, *md)
nMetrics += int64(len(md.Metrics))
}
span.Annotate([]trace.Attribute{
trace.Int64Attribute("num_metrics", nMetrics),
}, "")
}