-
Notifications
You must be signed in to change notification settings - Fork 2
/
centralized_strategy.go
550 lines (469 loc) · 15.2 KB
/
centralized_strategy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
package sampling
import (
"bytes"
"context"
crand "crypto/rand"
"encoding/binary"
"encoding/hex"
"encoding/json"
"math/rand"
"net"
"net/http"
"net/netip"
"sort"
"sync"
"time"
"github.com/shogo82148/aws-xray-yasdk-go/xray/xraylog"
)
var client = &http.Client{
Transport: &http.Transport{
Proxy: nil, // ignore proxy configure from the environment values
DialContext: (&net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 5,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: time.Second,
},
}
// https://docs.aws.amazon.com/xray/latest/api/API_GetSamplingRules.html#API_GetSamplingRules_RequestBody
type getSamplingRulesInput struct {
NextToken string `json:"NextToken,omitempty"`
}
// https://docs.aws.amazon.com/xray/latest/api/API_GetSamplingRules.html#API_GetSamplingRules_ResponseSyntax
type getSamplingRulesOutput struct {
NextToken string `json:"NextToken,omitempty"`
SamplingRuleRecords []*samplingRuleRecord
}
type samplingRuleRecord struct {
CreatedAt float64 `json:"CreatedAt"`
ModifiedAt float64 `json:"ModifiedAt"`
SamplingRule samplingRule `json:"SamplingRule"`
}
type samplingRule struct {
// Matches attributes derived from the request.
Attributes map[string]string `json:"Attributes"`
// The percentage of matching requests to instrument, after the reservoir is
// exhausted.
FixedRate float64 `json:"FixedRate"`
// Matches the HTTP method of a request.
HTTPMethod string `json:"HTTPMethod"`
// Matches the hostname from a request URL.
Host string `json:"Host"`
// The priority of the sampling rule.
Priority int64 `json:"Priority"`
// A fixed number of matching requests to instrument per second, prior to applying
// the fixed rate. The reservoir is not used directly by services, but applies
// to all services using the rule collectively.
ReservoirSize int64 `json:"ReservoirSize"`
// Matches the ARN of the AWS resource on which the service runs.
ResourceARN string `json:"ResourceARN"`
// The ARN of the sampling rule. Specify a rule by either name or ARN, but not
// both.
RuleARN string `json:"RuleARN"`
// The name of the sampling rule. Specify a rule by either name or ARN, but
// not both.
RuleName string `json:"RuleName"`
// Matches the name that the service uses to identify itself in segments.
ServiceName string `json:"ServiceName"`
// Matches the origin that the service uses to identify its type in segments.
ServiceType string `json:"ServiceType"`
// Matches the path from a request URL.
URLPath string `json:"URLPath"`
// The version of the sampling rule format (1).
Version int64 `json:"Version"`
}
// https://docs.aws.amazon.com/xray/latest/api/API_GetSamplingTargets.html#API_GetSamplingTargets_RequestBody
type getSamplingTargetsInput struct {
SamplingStatisticsDocuments []*samplingStatisticsDocument `json:"SamplingStatisticsDocuments"`
}
type samplingStatisticsDocument struct {
// The number of requests recorded with borrowed reservoir quota.
BorrowCount int64 `json:"BorrowCount"`
// A unique identifier for the service in hexadecimal.
ClientID string `json:"ClientID"`
// The number of requests that matched the rule.
RequestCount int64 `json:"RequestCount"`
// The name of the sampling rule.
RuleName string `json:"RuleName"`
// The number of requests recorded.
SampledCount int64 `json:"SampledCount"`
// The current time, in ISO-8601 format (YYYY-MM-DDThh:mm:ss).
Timestamp string `json:"Timestamp"`
}
// https://docs.aws.amazon.com/xray/latest/api/API_GetSamplingTargets.html#API_GetSamplingTargets_ResponseSyntax
type getSamplingTargetsOutput struct {
// The last time a user changed the sampling rule configuration. If the sampling
// rule configuration changed since the service last retrieved it, the service
// should call GetSamplingRules to get the latest version.
LastRuleModification int64 `json:"LastRuleModification"`
// Updated rules that the service should use to sample requests.
SamplingTargetDocuments []*samplingTargetDocument `json:"SamplingTargetDocuments"`
// Information about SamplingStatisticsDocument that X-Ray could not process.
UnprocessedStatistics []*unprocessedStatistics `json:"UnprocessedStatistics"`
}
type samplingTargetDocument struct {
// The percentage of matching requests to instrument, after the reservoir is
// exhausted.
FixedRate float64 `json:"FixedRate"`
// The number of seconds for the service to wait before getting sampling targets
// again.
Interval int64 `json:"Interval"`
// The number of requests per second that X-Ray allocated this service.
ReservoirQuota int64 `json:"ReservoirQuota"`
// When the reservoir quota expires, in ISO-8601 format (YYYY-MM-DDThh:mm:ss).
ReservoirQuotaTTL string `json:"ReservoirQuotaTTL"`
// The name of the sampling rule.
RuleName string `json:"RuleName"`
}
type unprocessedStatistics struct {
// The error code.
ErrorCode string `json:"ErrorCode"`
// The error message.
Message string `json:"Message"`
// The name of the sampling rule.
RuleName string `json:"RuleName"`
}
// CentralizedStrategy is an implementation of SamplingStrategy.
type CentralizedStrategy struct {
// Sampling strategy used if centralized manifest is expired
fallback *LocalizedStrategy
// Address for X-Ray daemon
addr string
// Unique ID used by XRay service to identify this client
clientID string
// control poller
pollerCtx context.Context
pollerCancel context.CancelFunc
startOnce sync.Once
muRefresh sync.Mutex
mu sync.RWMutex
manifest *centralizedManifest
}
// NewCentralizedStrategy returns new centralized sampling strategy with a fallback on
// the local rule. If local rule is nil, the DefaultSamplingRule is used.
func NewCentralizedStrategy(addr string, manifest *Manifest) (*CentralizedStrategy, error) {
local, err := NewLocalizedStrategy(manifest)
if err != nil {
return nil, err
}
// Generate clientID
var r [12]byte
if _, err := crand.Read(r[:]); err != nil {
return nil, err
}
pollerCtx, pollerCancel := context.WithCancel(context.Background())
return &CentralizedStrategy{
fallback: local,
addr: addr,
clientID: hex.EncodeToString(r[:]),
pollerCtx: pollerCtx,
pollerCancel: pollerCancel,
manifest: ¢ralizedManifest{
Rules: []*centralizedRule{},
Quotas: make(map[string]*centralizedQuota),
},
}, nil
}
func (s *CentralizedStrategy) getSamplingRulesPages(ctx context.Context, input *getSamplingRulesInput, callback func(*getSamplingRulesOutput, bool) bool) error {
token := input.NextToken
for {
out, err := s.getSamplingRules(ctx, &getSamplingRulesInput{
NextToken: token,
})
if err != nil {
return err
}
lastPage := out.NextToken == ""
if !callback(out, lastPage) || lastPage {
break
}
token = out.NextToken
}
return nil
}
// Retrieves all sampling rules.
//
// https://docs.aws.amazon.com/xray/latest/api/API_GetSamplingRules.html
func (s *CentralizedStrategy) getSamplingRules(ctx context.Context, input *getSamplingRulesInput) (*getSamplingRulesOutput, error) {
data, err := json.Marshal(input)
if err != nil {
return nil, err
}
req, err := http.NewRequest(http.MethodPost, "http://"+s.addr+"/GetSamplingRules", bytes.NewReader(data))
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
dec := json.NewDecoder(resp.Body)
var output getSamplingRulesOutput
if err := dec.Decode(&output); err != nil {
return nil, err
}
return &output, nil
}
// Requests a sampling quota for rules that the service is using to sample requests.
//
// https://docs.aws.amazon.com/xray/latest/api/API_GetSamplingTargets.html
func (s *CentralizedStrategy) getSamplingTargets(ctx context.Context, input *getSamplingTargetsInput) (*getSamplingTargetsOutput, error) {
data, err := json.Marshal(input)
if err != nil {
return nil, err
}
req, err := http.NewRequest(http.MethodPost, "http://"+s.addr+"/SamplingTargets", bytes.NewReader(data))
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
dec := json.NewDecoder(resp.Body)
var output getSamplingTargetsOutput
if err := dec.Decode(&output); err != nil {
return nil, err
}
return &output, nil
}
// Close stops polling.
func (s *CentralizedStrategy) Close() {
s.pollerCancel()
}
// ShouldTrace implements Strategy.
func (s *CentralizedStrategy) ShouldTrace(req *Request) *Decision {
if isDirectIPAccess(req) {
return &Decision{
Sample: false,
}
}
s.startOnce.Do(s.start)
manifest := s.getManifest()
if manifest == nil {
return s.fallback.ShouldTrace(req)
}
for _, r := range manifest.Rules {
if r.Match(req) {
xraylog.Debugf(context.Background(), "ShouldTrace Match: rule %s", r.ruleName)
return r.Sample()
}
}
// It should not reach here, because the Default Rule matches any requests.
// The manifest is wrong, so fallback to local strategy.
return s.fallback.ShouldTrace(req)
}
// Nowadays, access using virtual host functionality is mostly used,
// and there are few cases where access is made by directly specifying an IP address.
// Therefore, if access is made by directly specifying an IP address,
// we do not perform sampling.
func isDirectIPAccess(req *Request) bool {
hostport := req.Host
host, _, err := net.SplitHostPort(hostport)
if err != nil {
host = hostport
}
_, err = netip.ParseAddr(host)
return err == nil
}
func (s *CentralizedStrategy) getManifest() *centralizedManifest {
s.mu.RLock()
defer s.mu.RUnlock()
return s.manifest
}
func (s *CentralizedStrategy) setManifest(manifest *centralizedManifest) {
s.mu.Lock()
defer s.mu.Unlock()
s.manifest = manifest
}
// start should be called by `s.startOnce.Do(s.start)“
func (s *CentralizedStrategy) start() {
go s.rulePoller()
go s.quotaPoller()
}
func (s *CentralizedStrategy) rulePoller() {
var seed int64
if err := binary.Read(crand.Reader, binary.BigEndian, &seed); err != nil {
// fallback to timestamp
seed = time.Now().UnixNano()
}
rnd := rand.New(rand.NewSource(seed))
interval := 300 * time.Second
jitter := int64(time.Second)
for {
s.refreshRule()
timer := time.NewTimer(interval + time.Duration(rnd.Int63n(jitter)))
select {
case <-s.pollerCtx.Done():
timer.Stop()
return
case <-timer.C:
}
}
}
func (s *CentralizedStrategy) quotaPoller() {
var seed int64
if err := binary.Read(crand.Reader, binary.BigEndian, &seed); err != nil {
// fallback to timestamp
seed = time.Now().UnixNano()
}
rnd := rand.New(rand.NewSource(seed))
interval := 10 * time.Second
jitter := int64(100 * time.Millisecond)
for {
s.refreshQuota()
timer := time.NewTimer(interval + time.Duration(rnd.Int63n(jitter)))
select {
case <-s.pollerCtx.Done():
timer.Stop()
return
case <-timer.C:
}
}
}
func (s *CentralizedStrategy) refreshRule() {
ctx, cancel := context.WithTimeout(s.pollerCtx, time.Minute)
defer cancel()
s.muRefresh.Lock()
defer s.muRefresh.Unlock()
defer func() {
// avoid propagating panics to the application code.
if e := recover(); e != nil {
xraylog.Errorf(ctx, "panic: %v", e)
}
}()
xraylog.Debug(ctx, "start refreshing sampling rules")
manifest := s.getManifest()
rules := make([]*centralizedRule, 0, len(manifest.Rules))
quotas := make(map[string]*centralizedQuota, len(manifest.Rules))
err := s.getSamplingRulesPages(ctx, &getSamplingRulesInput{}, func(out *getSamplingRulesOutput, lastPage bool) bool {
for _, record := range out.SamplingRuleRecords {
r := record.SamplingRule
name := r.RuleName
quota, ok := manifest.Quotas[name]
if !ok {
// we don't have enough sampling statistics,
// so borrow the reservoir quota.
quota = ¢ralizedQuota{
fixedRate: r.FixedRate,
}
}
rule := ¢ralizedRule{
quota: quota,
ruleName: name,
priority: r.Priority,
host: r.Host,
urlPath: r.URLPath,
httpMethod: r.HTTPMethod,
serviceName: r.ServiceName,
serviceType: r.ServiceType,
}
rules = append(rules, rule)
quotas[name] = quota
xraylog.Debugf(
ctx,
"Refresh Sampling Rule: Priority: %d, ServiceName: %s, ServiceType: %s, Name: %s, Host: %s, URL: %s, Method: %s, Quota: %d, FixedRate: %f",
r.Priority, r.ServiceName, r.ServiceType,
name, r.Host, r.HTTPMethod, r.URLPath, quota.quota, r.FixedRate,
)
}
return true
})
if err != nil {
xraylog.Errorf(ctx, "xray/sampling: failed to get sampling rules: %v", err)
return
}
sort.Stable(centralizedRuleSlice(rules))
s.setManifest(¢ralizedManifest{
Rules: rules,
Quotas: quotas,
RefreshedAt: time.Now(),
})
xraylog.Debug(ctx, "sampling rules are refreshed.")
}
func (s *CentralizedStrategy) refreshQuota() {
// maximum number of targets of GetSamplingTargets API
const maxTargets = 25
ctx, cancel := context.WithTimeout(s.pollerCtx, time.Minute)
defer cancel()
s.muRefresh.Lock()
defer s.muRefresh.Unlock()
defer func() {
// avoid propagating panics to the application code.
if e := recover(); e != nil {
xraylog.Errorf(ctx, "panic: %v", e)
}
}()
manifest := s.getManifest()
now := time.Now()
stats := make([]*samplingStatisticsDocument, 0, len(manifest.Rules))
for _, r := range manifest.Rules {
stat := r.quota.Stats()
stats = append(stats, &samplingStatisticsDocument{
ClientID: s.clientID,
RuleName: r.ruleName,
RequestCount: stat.requests,
SampledCount: stat.sampled,
BorrowCount: stat.borrowed,
Timestamp: now.Format(time.RFC3339),
})
xraylog.Debugf(
ctx,
"Sampling Statistics: Name: %s, Requests: %d, Borrowed: %d, Sampled: %d", r.ruleName, stat.requests, stat.borrowed, stat.sampled,
)
}
var needRefresh bool
for len(stats) > 0 {
l := len(stats)
if l > maxTargets {
l = maxTargets
}
resp, err := s.getSamplingTargets(ctx, &getSamplingTargetsInput{
SamplingStatisticsDocuments: stats[:l],
})
stats = stats[l:]
if err != nil {
xraylog.Errorf(ctx, "failed to refresh sampling targets: %v", err)
continue
}
for _, doc := range resp.SamplingTargetDocuments {
if quota, ok := manifest.Quotas[doc.RuleName]; ok {
if err := quota.update(doc); err != nil {
xraylog.Errorf(
ctx,
"Failed to Refresh Quota: Name: %s, Quota: %d, TTL: %s, Interval: %d, Error: %v",
doc.RuleName, doc.ReservoirQuota, doc.ReservoirQuotaTTL, doc.Interval, err,
)
continue
}
xraylog.Debugf(
ctx,
"Refresh Quota: Name: %s, Quota: %d, TTL: %s, Interval: %d",
doc.RuleName, doc.ReservoirQuota, doc.ReservoirQuotaTTL, doc.Interval,
)
} else {
// new rule may be added? try to refresh.
needRefresh = true
}
}
// check the rules are updated.
lastModification := time.Unix(resp.LastRuleModification, 0)
needRefresh = needRefresh || manifest.RefreshedAt.IsZero() || lastModification.After(manifest.RefreshedAt)
}
xraylog.Debug(ctx, "sampling targets are refreshed.")
// TODO update the interval.
if needRefresh {
xraylog.Debug(ctx, "changing sampling rules is detected. refresh them.")
go s.refreshRule()
}
}