/
reservoir.go
105 lines (82 loc) · 2.7 KB
/
reservoir.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package internal // import "go.opentelemetry.io/contrib/samplers/aws/xray/internal"
import (
"sync"
"time"
)
// reservoir represents a sampling statistics for a given rule and populate it's value from
// the response getSamplingTargets API which sends information on sampling statistics in real-time.
type reservoir struct {
// Quota expiration timestamp.
expiresAt time.Time
// Quota assigned to client to consume per second.
quota float64
// Current balance of quota.
quotaBalance float64
// Total size of reservoir consumption per second.
capacity float64
// Quota refresh timestamp.
refreshedAt time.Time
// Polling interval for quota.
interval time.Duration
// Stores reservoir ticks.
lastTick time.Time
mu sync.RWMutex
}
// expired returns true if current time is past expiration timestamp. Otherwise, false is returned if no quota remains.
func (r *reservoir) expired(now time.Time) bool {
r.mu.RLock()
defer r.mu.RUnlock()
return now.After(r.expiresAt)
}
// take consumes quota from reservoir, if any remains, then returns true. False otherwise.
func (r *reservoir) take(now time.Time, borrowed bool, itemCost float64) bool { // nolint: revive // borrowed is not a control flag.
r.mu.Lock()
defer r.mu.Unlock()
if r.capacity == 0 {
return false
}
if r.lastTick.IsZero() {
r.lastTick = now
if borrowed {
r.quotaBalance = 1.0
} else {
r.quotaBalance = r.quota
}
}
if r.quotaBalance >= itemCost {
r.quotaBalance -= itemCost
return true
}
// update quota balance based on elapsed time
r.refreshQuotaBalanceLocked(now, borrowed)
if r.quotaBalance >= itemCost {
r.quotaBalance -= itemCost
return true
}
return false
}
// refreshQuotaBalanceLocked refreshes the quotaBalance. If borrowed is true then add to the quota balance 1 by every second,
// otherwise add to the quota balance based on assigned quota by X-Ray service.
// It is assumed the lock is held when calling this.
func (r *reservoir) refreshQuotaBalanceLocked(now time.Time, borrowed bool) { // nolint: revive // borrowed is not a control flag.
elapsedTime := now.Sub(r.lastTick)
r.lastTick = now
// Calculate how much credit have we accumulated since the last tick.
if borrowed {
// In borrowing case since we want to enforce sample one req every second, no need to accumulate
// quotaBalance based on elapsedTime when elapsedTime is greater than 1.
if elapsedTime.Seconds() > 1.0 {
r.quotaBalance += 1.0
} else {
r.quotaBalance += elapsedTime.Seconds()
}
} else {
elapsedSeconds := elapsedTime.Seconds()
r.quotaBalance += elapsedSeconds * r.quota
if r.quotaBalance > r.quota {
r.quotaBalance = r.quota
}
}
}