/
http.go
266 lines (205 loc) · 7.54 KB
/
http.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
package tracing
import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"strings"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/trace"
"github.com/infobloxopen/atlas-app-toolkit/v2/auth"
)
const (
//RequestHeaderAnnotationPrefix is a prefix which is added to each request header attribute
RequestHeaderAnnotationPrefix = "request.header."
//RequestTrailerAnnotationPrefix is a prefix which is added to each request trailer attribute
RequestTrailerAnnotationPrefix = "request.trailer."
//ResponseHeaderAnnotationPrefix is a prefix which is added to each response header attribute
ResponseHeaderAnnotationPrefix = "response.header."
//ResponseTrailerAnnotationPrefix is a prefix which is added to each response header attribute
ResponseTrailerAnnotationPrefix = "response.trailer."
//RequestPayloadAnnotationKey is a key under which request payload stored in span
RequestPayloadAnnotationKey = "request.payload"
//ResponsePayloadAnnotationKey is a key under which response payload stored in span
ResponsePayloadAnnotationKey = "response.payload"
//ResponseErrorKey is a key under which response error will be stored in span
ResponseErrorKey = "response.error"
//DefaultMaxPayloadSize represent max payload size which will be added to span
DefaultMaxPayloadSize = 1024 * 1024
//TruncatedMarkerKey is a key for annotation which will be presented in span in case payload was truncated
TruncatedMarkerKey = "payload.truncated"
//TruncatedMarkerValue is a value for annotation which will be presented in span in case payload was truncated
TruncatedMarkerValue = "true"
//ObfuscationFactor is a percent of value which will be omitted from obfuscated value
ObfuscationFactor = 0.80
CostOfGoodsAccountID = "cogs.accountID"
)
var sensitiveHeaders = map[string]struct{}{
auth.AuthorizationHeader: struct{}{},
}
type headerMatcher func(string) (string, bool)
type httpOptions struct {
spanWithHeaders func(*http.Request) bool
headerMatcher headerMatcher
spanWithPayload func(*http.Request) bool
maxPayloadSize int
}
// HTTPOption allows extending handler with additional functionality
type HTTPOption func(*httpOptions)
func defaultHTTPOptions() *httpOptions {
return &httpOptions{
headerMatcher: defaultHeaderMatcher,
maxPayloadSize: DefaultMaxPayloadSize,
//Keep spanWithHeaders and spanWithPayload equals to nil instead of dummy functions
//to prevent path trough header for each request
}
}
// WithHeadersAnnotation annotate span with http headers
func WithHeadersAnnotation(f func(*http.Request) bool) HTTPOption {
return func(ops *httpOptions) {
ops.spanWithHeaders = f
}
}
// WithHeaderMatcher set header matcher to filterout or preprocess headers
func WithHeaderMatcher(f func(string) (string, bool)) HTTPOption {
return func(ops *httpOptions) {
ops.headerMatcher = f
}
}
// WithPayloadAnnotation add request/response body as an attribute to span if f returns true
func WithPayloadAnnotation(f func(*http.Request) bool) HTTPOption {
return func(ops *httpOptions) {
ops.spanWithPayload = f
}
}
// WithHTTPPayloadSize limit payload size propagated to span
// in case payload exceeds limit, payload truncated and
// annotation payload.truncated=true added into span
func WithHTTPPayloadSize(maxSize int) HTTPOption {
return func(ops *httpOptions) {
ops.maxPayloadSize = maxSize
}
}
// NewMiddleware wrap handler
func NewMiddleware(ops ...HTTPOption) func(http.Handler) http.Handler {
options := defaultHTTPOptions()
for _, op := range ops {
op(options)
}
return func(h http.Handler) http.Handler {
och := &ochttp.Handler{
Handler: &Handler{
child: h,
options: options,
},
}
return och
}
}
// Check that &Handler comply with http.Handler interface
var _ http.Handler = &Handler{}
// Handler is a opencensus http plugin wrapper which do some useful things to reach traces
type Handler struct {
child http.Handler
options *httpOptions
}
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
span := trace.FromContext(r.Context())
// Add accountID to span attributes
accountID, _ := auth.GetAccountID(r.Context(), nil)
span.AddAttributes(trace.StringAttribute(CostOfGoodsAccountID, accountID))
withHeaders := h.options.spanWithHeaders != nil && h.options.spanWithHeaders(r)
withPayload := h.options.spanWithPayload != nil && h.options.spanWithPayload(r)
if withHeaders {
//Annotate span with request headers
attrs := headersToAttributes(r.Header, RequestHeaderAnnotationPrefix, h.options.headerMatcher)
span.AddAttributes(attrs...)
//Annotate span with response headers
//calling in defer to get final headers state, after passing all handlers in chain
defer func() {
attrs := headersToAttributes(w.Header(), ResponseHeaderAnnotationPrefix, h.options.headerMatcher)
span.AddAttributes(attrs...)
}()
}
if withPayload {
//Wraping of r.Body to allow child handlers read request body
requestPayload, _ := ioutil.ReadAll(r.Body) //TODO: handle error
_ = r.Body.Close()
r.Body = ioutil.NopCloser(bytes.NewBuffer(requestPayload))
requestPayload, truncated := truncatePayload(requestPayload, h.options.maxPayloadSize)
if truncated {
markSpanTruncated(span)
}
attrs := []trace.Attribute{trace.StringAttribute(RequestPayloadAnnotationKey, string(requestPayload))}
span.Annotate(attrs, "Request payload")
//Wrap for http.ResponseWriter to get response body after passing all handlers in chain
wrapper := newResponseWrapper(w)
w = wrapper
defer func() {
responsePayload, truncated := truncatePayload(wrapper.buffer.Bytes(), h.options.maxPayloadSize)
if truncated {
markSpanTruncated(span)
}
attrs := []trace.Attribute{trace.StringAttribute(ResponsePayloadAnnotationKey, string(responsePayload))}
span.Annotate(attrs, "Response payload")
}()
}
h.child.ServeHTTP(w, r)
}
func headersToAttributes(headers http.Header, prefix string, matcher headerMatcher) []trace.Attribute {
attributes := make([]trace.Attribute, 0, len(headers))
for k, vals := range headers {
key, ok := matcher(k)
if !ok {
continue
}
key = fmt.Sprint(prefix, key)
valsStr := strings.Join(vals, ", ")
if _, ok := sensitiveHeaders[k]; ok {
valsStr = obfuscate(valsStr)
}
attributes = append(attributes, trace.StringAttribute(key, valsStr))
}
return attributes
}
func newResponseWrapper(w http.ResponseWriter) *responseBodyWrapper {
return &responseBodyWrapper{
ResponseWriter: w,
buffer: &bytes.Buffer{},
}
}
// responseBodyWrapper duplicate all bytes written to it to buffer
type responseBodyWrapper struct {
http.ResponseWriter
buffer *bytes.Buffer
}
func (w *responseBodyWrapper) Write(b []byte) (int, error) {
//In case we receive an error from Writing into buffer we just skip it
//because adding payload to span is not so critical as provide response
_, _ = w.buffer.Write(b)
return w.ResponseWriter.Write(b)
}
func markSpanTruncated(s *trace.Span) {
s.AddAttributes(trace.StringAttribute(TruncatedMarkerKey, TruncatedMarkerValue))
}
func truncatePayload(payload []byte, payloadLimit int) ([]byte, bool) {
if len(payload) <= payloadLimit {
return payload, false
}
payload = payload[:payloadLimit]
flag := []byte("...")
payload = append(payload[:payloadLimit-3], flag...)
return payload, true
}
func obfuscate(x string) string {
countChars := int(float64(len(x)) * (1.0 - ObfuscationFactor))
return x[:countChars] + "..."
}
// defaultHeaderMatcher is a header matcher which just accept all headers
func defaultHeaderMatcher(h string) (string, bool) {
return h, true
}
// AlwaysHTTP for each request returns true
func AlwaysHTTP(_ *http.Request) bool {
return true
}