forked from DataDog/dd-trace-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
writer.go
339 lines (306 loc) · 9.81 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.
package tracer
import (
"bytes"
"encoding/json"
"errors"
"io"
"math"
"os"
"strconv"
"sync"
"time"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
)
type traceWriter interface {
// add adds traces to be sent by the writer.
add([]*span)
// flush causes the writer to send any buffered traces.
flush()
// stop gracefully shuts down the writer.
stop()
}
type agentTraceWriter struct {
// config holds the tracer configuration
config *config
// payload encodes and buffers traces in msgpack format
payload *payload
// climit limits the number of concurrent outgoing connections
climit chan struct{}
// wg waits for all uploads to finish
wg sync.WaitGroup
// prioritySampling is the prioritySampler into which agentTraceWriter will
// read sampling rates sent by the agent
prioritySampling *prioritySampler
// statsd is used to send metrics
statsd statsdClient
}
func newAgentTraceWriter(c *config, s *prioritySampler, statsdClient statsdClient) *agentTraceWriter {
return &agentTraceWriter{
config: c,
payload: newPayload(),
climit: make(chan struct{}, concurrentConnectionLimit),
prioritySampling: s,
statsd: statsdClient,
}
}
func (h *agentTraceWriter) add(trace []*span) {
if err := h.payload.push(trace); err != nil {
h.statsd.Incr("datadog.tracer.traces_dropped", []string{"reason:encoding_error"}, 1)
log.Error("Error encoding msgpack: %v", err)
}
if h.payload.size() > payloadSizeLimit {
h.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:size"}, 1)
h.flush()
}
}
func (h *agentTraceWriter) stop() {
h.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:shutdown"}, 1)
h.flush()
h.wg.Wait()
}
// flush will push any currently buffered traces to the server.
func (h *agentTraceWriter) flush() {
if h.payload.itemCount() == 0 {
return
}
h.wg.Add(1)
h.climit <- struct{}{}
oldp := h.payload
h.payload = newPayload()
go func(p *payload) {
defer func(start time.Time) {
// Once the payload has been used, clear the buffer for garbage
// collection to avoid a memory leak when references to this object
// may still be kept by faulty transport implementations or the
// standard library. See dd-trace-go#976
p.clear()
<-h.climit
h.wg.Done()
h.statsd.Timing("datadog.tracer.flush_duration", time.Since(start), nil, 1)
}(time.Now())
var count, size int
var err error
for attempt := 0; attempt <= h.config.sendRetries; attempt++ {
size, count = p.size(), p.itemCount()
log.Debug("Sending payload: size: %d traces: %d\n", size, count)
rc, err := h.config.transport.send(p)
if err == nil {
log.Debug("sent traces after %d attempts", attempt+1)
h.statsd.Count("datadog.tracer.flush_bytes", int64(size), nil, 1)
h.statsd.Count("datadog.tracer.flush_traces", int64(count), nil, 1)
if err := h.prioritySampling.readRatesJSON(rc); err != nil {
h.statsd.Incr("datadog.tracer.decode_error", nil, 1)
}
return
}
log.Error("failure sending traces (attempt %d), will retry: %v", attempt+1, err)
p.reset()
time.Sleep(time.Millisecond)
}
h.statsd.Count("datadog.tracer.traces_dropped", int64(count), []string{"reason:send_failed"}, 1)
log.Error("lost %d traces: %v", count, err)
}(oldp)
}
// logWriter specifies the output target of the logTraceWriter; replaced in tests.
var logWriter io.Writer = os.Stdout
// logTraceWriter encodes traces into a format understood by the Datadog Forwarder
// (https://github.com/DataDog/datadog-serverless-functions/tree/master/aws/logs_monitoring)
// and writes them to os.Stdout. This is used to send traces from an AWS Lambda environment.
type logTraceWriter struct {
config *config
buf bytes.Buffer
hasTraces bool
w io.Writer
statsd statsdClient
}
func newLogTraceWriter(c *config, statsdClient statsdClient) *logTraceWriter {
w := &logTraceWriter{
config: c,
w: logWriter,
statsd: statsdClient,
}
w.resetBuffer()
return w
}
const (
// maxFloatLength is the maximum length that a string encoded by encodeFloat will be.
maxFloatLength = 24
// logBufferSuffix is the final string that the trace writer has to append to a buffer to close
// the JSON.
logBufferSuffix = "]}\n"
// logBufferLimit is the maximum size log line allowed by cloudwatch
logBufferLimit = 256 * 1024
)
func (h *logTraceWriter) resetBuffer() {
h.buf.Reset()
h.buf.WriteString(`{"traces": [`)
h.hasTraces = false
}
// encodeFloat correctly encodes float64 into the JSON format followed by ES6.
// This code is reworked from Go's encoding/json package
// (https://github.com/golang/go/blob/go1.15/src/encoding/json/encode.go#L573)
//
// One important departure from encoding/json is that infinities and nans are encoded
// as null rather than signalling an error.
func encodeFloat(p []byte, f float64) []byte {
if math.IsInf(f, 0) || math.IsNaN(f) {
return append(p, "null"...)
}
abs := math.Abs(f)
if abs != 0 && (abs < 1e-6 || abs >= 1e21) {
p = strconv.AppendFloat(p, f, 'e', -1, 64)
// clean up e-09 to e-9
n := len(p)
if n >= 4 && p[n-4] == 'e' && p[n-3] == '-' && p[n-2] == '0' {
p[n-2] = p[n-1]
p = p[:n-1]
}
} else {
p = strconv.AppendFloat(p, f, 'f', -1, 64)
}
return p
}
func (h *logTraceWriter) encodeSpan(s *span) {
var scratch [maxFloatLength]byte
h.buf.WriteString(`{"trace_id":"`)
h.buf.Write(strconv.AppendUint(scratch[:0], uint64(s.TraceID), 16))
h.buf.WriteString(`","span_id":"`)
h.buf.Write(strconv.AppendUint(scratch[:0], uint64(s.SpanID), 16))
h.buf.WriteString(`","parent_id":"`)
h.buf.Write(strconv.AppendUint(scratch[:0], uint64(s.ParentID), 16))
h.buf.WriteString(`","name":`)
h.marshalString(s.Name)
h.buf.WriteString(`,"resource":`)
h.marshalString(s.Resource)
h.buf.WriteString(`,"error":`)
h.buf.Write(strconv.AppendInt(scratch[:0], int64(s.Error), 10))
h.buf.WriteString(`,"meta":{`)
first := true
for k, v := range s.Meta {
if first {
first = false
} else {
h.buf.WriteString(`,`)
}
h.marshalString(k)
h.buf.WriteString(":")
h.marshalString(v)
}
h.buf.WriteString(`},"metrics":{`)
first = true
for k, v := range s.Metrics {
if math.IsNaN(v) || math.IsInf(v, 0) {
// The trace forwarder does not support infinity or nan, so we do not send metrics with those values.
continue
}
if first {
first = false
} else {
h.buf.WriteString(`,`)
}
h.marshalString(k)
h.buf.WriteString(`:`)
h.buf.Write(encodeFloat(scratch[:0], v))
}
h.buf.WriteString(`},"start":`)
h.buf.Write(strconv.AppendInt(scratch[:0], s.Start, 10))
h.buf.WriteString(`,"duration":`)
h.buf.Write(strconv.AppendInt(scratch[:0], s.Duration, 10))
h.buf.WriteString(`,"service":`)
h.marshalString(s.Service)
h.buf.WriteString(`}`)
}
// marshalString marshals the string str as JSON into the writer's buffer.
// Should be used whenever writing non-constant string data to ensure correct sanitization.
func (h *logTraceWriter) marshalString(str string) {
m, err := json.Marshal(str)
if err != nil {
log.Error("Error marshaling value %q: %v", str, err)
} else {
h.buf.Write(m)
}
}
type encodingError struct {
cause error
dropReason string
}
// writeTrace makes an effort to write the trace into the current buffer. It returns
// the number of spans (n) that it wrote and an error (err), if one occurred.
// n may be less than len(trace), meaning that only the first n spans of the trace
// fit into the current buffer. Once the buffer is flushed, the remaining spans
// from the trace can be retried.
// An error, if one is returned, indicates that a span in the trace is too large
// to fit in one buffer, and the trace cannot be written.
func (h *logTraceWriter) writeTrace(trace []*span) (n int, err *encodingError) {
startn := h.buf.Len()
if !h.hasTraces {
h.buf.WriteByte('[')
} else {
h.buf.WriteString(", [")
}
written := 0
for i, s := range trace {
n := h.buf.Len()
if i > 0 {
h.buf.WriteByte(',')
}
h.encodeSpan(s)
if h.buf.Len() > logBufferLimit-len(logBufferSuffix) {
// This span is too big to fit in the current buffer.
if i == 0 {
// This was the first span in this trace. This means we should truncate
// everything we wrote in writeTrace
h.buf.Truncate(startn)
if !h.hasTraces {
// This is the first span of the first trace in the buffer and it's too big.
// We will never be able to send this trace, so we will drop it.
return 0, &encodingError{cause: errors.New("span too large for buffer"), dropReason: "trace_too_large"}
}
return 0, nil
}
// This span was too big, but it might fit in the next buffer.
// We can finish this trace and try again with an empty buffer (see *logTaceWriter.add)
h.buf.Truncate(n)
break
}
written++
}
h.buf.WriteByte(']')
h.hasTraces = true
return written, nil
}
// add adds a trace to the writer's buffer.
func (h *logTraceWriter) add(trace []*span) {
// Try adding traces to the buffer until we flush them all or encounter an error.
for len(trace) > 0 {
n, err := h.writeTrace(trace)
if err != nil {
log.Error("Lost a trace: %s", err.cause)
h.statsd.Count("datadog.tracer.traces_dropped", 1, []string{"reason:" + err.dropReason}, 1)
return
}
trace = trace[n:]
// If there are traces left that didn't fit into the buffer, flush the buffer and loop to
// write the remaining spans.
if len(trace) > 0 {
h.flush()
}
}
}
func (h *logTraceWriter) stop() {
h.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:shutdown"}, 1)
h.flush()
}
// flush will write any buffered traces to standard output.
func (h *logTraceWriter) flush() {
if !h.hasTraces {
return
}
h.buf.WriteString(logBufferSuffix)
h.w.Write(h.buf.Bytes())
h.resetBuffer()
}