forked from DataDog/dd-trace-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sampler.go
150 lines (130 loc) · 3.89 KB
/
sampler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.
package tracer
import (
"encoding/json"
"io"
"math"
"sync"
"github.com/lannguyen-c0x12c/dd-trace-go/ddtrace"
"github.com/lannguyen-c0x12c/dd-trace-go/ddtrace/ext"
"github.com/lannguyen-c0x12c/dd-trace-go/internal/samplernames"
)
// Sampler is the generic interface of any sampler. It must be safe for concurrent use.
type Sampler interface {
// Sample returns true if the given span should be sampled.
Sample(span Span) bool
}
// RateSampler is a sampler implementation which randomly selects spans using a
// provided rate. For example, a rate of 0.75 will permit 75% of the spans.
// RateSampler implementations should be safe for concurrent use.
type RateSampler interface {
Sampler
// Rate returns the current sample rate.
Rate() float64
// SetRate sets a new sample rate.
SetRate(rate float64)
}
// rateSampler samples from a sample rate.
type rateSampler struct {
sync.RWMutex
rate float64
}
// NewAllSampler is a short-hand for NewRateSampler(1). It is all-permissive.
func NewAllSampler() RateSampler { return NewRateSampler(1) }
// NewRateSampler returns an initialized RateSampler with a given sample rate.
func NewRateSampler(rate float64) RateSampler {
return &rateSampler{rate: rate}
}
// Rate returns the current rate of the sampler.
func (r *rateSampler) Rate() float64 {
r.RLock()
defer r.RUnlock()
return r.rate
}
// SetRate sets a new sampling rate.
func (r *rateSampler) SetRate(rate float64) {
r.Lock()
r.rate = rate
r.Unlock()
}
// constants used for the Knuth hashing, same as agent.
const knuthFactor = uint64(1111111111111111111)
// Sample returns true if the given span should be sampled.
func (r *rateSampler) Sample(spn ddtrace.Span) bool {
if r.rate == 1 {
// fast path
return true
}
s, ok := spn.(*span)
if !ok {
return false
}
r.RLock()
defer r.RUnlock()
return sampledByRate(s.TraceID, r.rate)
}
// sampledByRate verifies if the number n should be sampled at the specified
// rate.
func sampledByRate(n uint64, rate float64) bool {
if rate < 1 {
return n*knuthFactor < uint64(rate*math.MaxUint64)
}
return true
}
// prioritySampler holds a set of per-service sampling rates and applies
// them to spans.
type prioritySampler struct {
mu sync.RWMutex
rates map[string]float64
defaultRate float64
}
func newPrioritySampler() *prioritySampler {
return &prioritySampler{
rates: make(map[string]float64),
defaultRate: 1.,
}
}
// readRatesJSON will try to read the rates as JSON from the given io.ReadCloser.
func (ps *prioritySampler) readRatesJSON(rc io.ReadCloser) error {
var payload struct {
Rates map[string]float64 `json:"rate_by_service"`
}
if err := json.NewDecoder(rc).Decode(&payload); err != nil {
return err
}
rc.Close()
const defaultRateKey = "service:,env:"
ps.mu.Lock()
defer ps.mu.Unlock()
ps.rates = payload.Rates
if v, ok := ps.rates[defaultRateKey]; ok {
ps.defaultRate = v
delete(ps.rates, defaultRateKey)
}
return nil
}
// getRate returns the sampling rate to be used for the given span. Callers must
// guard the span.
func (ps *prioritySampler) getRate(spn *span) float64 {
key := "service:" + spn.Service + ",env:" + spn.Meta[ext.Environment]
ps.mu.RLock()
defer ps.mu.RUnlock()
if rate, ok := ps.rates[key]; ok {
return rate
}
return ps.defaultRate
}
// apply applies sampling priority to the given span. Caller must ensure it is safe
// to modify the span.
func (ps *prioritySampler) apply(spn *span) {
rate := ps.getRate(spn)
if sampledByRate(spn.TraceID, rate) {
spn.setSamplingPriority(ext.PriorityAutoKeep, samplernames.AgentRate)
} else {
spn.setSamplingPriority(ext.PriorityAutoReject, samplernames.AgentRate)
}
spn.SetTag(keySamplingPriorityRate, rate)
}