forked from DataDog/dd-trace-go
/
transport.go
182 lines (157 loc) · 6.26 KB
/
transport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package tracer
import (
"errors"
"fmt"
"log"
"net/http"
"strconv"
"time"
)
const (
defaultHostname = "localhost"
defaultPort = "8126"
defaultEncoder = MSGPACK_ENCODER // defines the default encoder used when the Transport is initialized
legacyEncoder = JSON_ENCODER // defines the legacy encoder used with earlier agent versions
defaultHTTPTimeout = time.Second // defines the current timeout before giving up with the send process
encoderPoolSize = 5 // how many encoders are available
traceCountHeader = "X-Datadog-Trace-Count" // header containing the number of traces in the payload
)
// Transport is an interface for span submission to the agent.
type Transport interface {
SendTraces(spans [][]*Span) (*http.Response, error)
SendServices(services map[string]Service) (*http.Response, error)
SetHeader(key, value string)
}
// NewTransport returns a new Transport implementation that sends traces to a
// trace agent running on the given hostname and port. If the zero values for
// hostname and port are provided, the default values will be used ("localhost"
// for hostname, and "8126" for port).
//
// In general, using this method is only necessary if you have a trace agent
// running on a non-default port or if it's located on another machine.
func NewTransport(hostname, port string) Transport {
if hostname == "" {
hostname = defaultHostname
}
if port == "" {
port = defaultPort
}
return newHTTPTransport(hostname, port)
}
// newDefaultTransport return a default transport for this tracing client
func newDefaultTransport() Transport {
return newHTTPTransport(defaultHostname, defaultPort)
}
type httpTransport struct {
traceURL string // the delivery URL for traces
legacyTraceURL string // the legacy delivery URL for traces
serviceURL string // the delivery URL for services
legacyServiceURL string // the legacy delivery URL for services
pool *encoderPool // encoding allocates lot of buffers (which might then be resized) so we use a pool so they can be re-used
client *http.Client // the HTTP client used in the POST
headers map[string]string // the Transport headers
compatibilityMode bool // the Agent targets a legacy API for compatibility reasons
}
// newHTTPTransport returns an httpTransport for the given endpoint
func newHTTPTransport(hostname, port string) *httpTransport {
// initialize the default EncoderPool with Encoder headers
pool, contentType := newEncoderPool(defaultEncoder, encoderPoolSize)
defaultHeaders := make(map[string]string)
defaultHeaders["Content-Type"] = contentType
return &httpTransport{
traceURL: fmt.Sprintf("http://%s:%s/v0.3/traces", hostname, port),
legacyTraceURL: fmt.Sprintf("http://%s:%s/v0.2/traces", hostname, port),
serviceURL: fmt.Sprintf("http://%s:%s/v0.3/services", hostname, port),
legacyServiceURL: fmt.Sprintf("http://%s:%s/v0.2/services", hostname, port),
pool: pool,
client: &http.Client{
Timeout: defaultHTTPTimeout,
},
headers: defaultHeaders,
compatibilityMode: false,
}
}
func (t *httpTransport) SendTraces(traces [][]*Span) (*http.Response, error) {
if t.traceURL == "" {
return nil, errors.New("provided an empty URL, giving up")
}
// borrow an encoder
encoder := t.pool.Borrow()
defer t.pool.Return(encoder)
// encode the spans and return the error if any
err := encoder.EncodeTraces(traces)
if err != nil {
return nil, err
}
// prepare the client and send the payload
req, _ := http.NewRequest("POST", t.traceURL, encoder)
for header, value := range t.headers {
req.Header.Set(header, value)
}
req.Header.Set(traceCountHeader, strconv.Itoa(len(traces)))
response, err := t.client.Do(req)
// if we have an error, return an empty Response to protect against nil pointer dereference
if err != nil {
return &http.Response{StatusCode: 0}, err
}
// if we got a 404 we should downgrade the API to a stable version (at most once)
if (response.StatusCode == 404 || response.StatusCode == 415) && !t.compatibilityMode {
log.Printf("calling the endpoint '%s' but received %d; downgrading the API\n", t.traceURL, response.StatusCode)
t.apiDowngrade()
return t.SendTraces(traces)
}
response.Body.Close()
return response, err
}
func (t *httpTransport) SendServices(services map[string]Service) (*http.Response, error) {
if t.serviceURL == "" {
return nil, errors.New("provided an empty URL, giving up")
}
// Encode the service table
encoder := t.pool.Borrow()
defer t.pool.Return(encoder)
if err := encoder.EncodeServices(services); err != nil {
return nil, err
}
// Send it
req, err := http.NewRequest("POST", t.serviceURL, encoder)
if err != nil {
return nil, fmt.Errorf("cannot create http request: %v", err)
}
for header, value := range t.headers {
req.Header.Set(header, value)
}
response, err := t.client.Do(req)
if err != nil {
return &http.Response{StatusCode: 0}, err
}
// Downgrade if necessary
if (response.StatusCode == 404 || response.StatusCode == 415) && !t.compatibilityMode {
log.Printf("calling the endpoint '%s' but received %d; downgrading the API\n", t.traceURL, response.StatusCode)
t.apiDowngrade()
return t.SendServices(services)
}
response.Body.Close()
return response, err
}
// SetHeader sets the internal header for the httpTransport
func (t *httpTransport) SetHeader(key, value string) {
t.headers[key] = value
}
// changeEncoder switches the internal encoders pool so that a different API with different
// format can be targeted, preventing failures because of outdated agents
func (t *httpTransport) changeEncoder(encoderType int) {
pool, contentType := newEncoderPool(encoderType, encoderPoolSize)
t.pool = pool
t.headers["Content-Type"] = contentType
}
// apiDowngrade downgrades the used encoder and API level. This method must fallback to a safe
// encoder and API, so that it will success despite users' configurations. This action
// ensures that the compatibility mode is activated so that the downgrade will be
// executed only once.
func (t *httpTransport) apiDowngrade() {
t.compatibilityMode = true
t.traceURL = t.legacyTraceURL
t.serviceURL = t.legacyServiceURL
t.changeEncoder(legacyEncoder)
}