-
Notifications
You must be signed in to change notification settings - Fork 5
/
timeline.go
226 lines (192 loc) · 5.49 KB
/
timeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
package timers
import (
"context"
"math/rand"
"sync"
"sync/atomic"
"time"
)
// Timeline is a data structure that maintains a cache of deadlines represented
// by background contexts. A Timeline has a resolution attribute representing
// the accuracy of the deadlines it maintains. All deadlines that fall within
// the same resolution window share the same context, making it very efficient
// to create thousands, or even millions of them since the runtime only needs to
// maintain a single timer per resolution window.
//
// Timelines are safe to use concurrently from multiple goroutines, however they
// should not be copied after being first used.
//
// The zero-value is a valid timeline with a resolution of 100ms.
type Timeline struct {
// Resolution represents the accuracy of timers managed by this timeline.
// The lower the resolution the more accurate the timers are, but it also
// means the timeline will put more pressure on the runtime and use more
// memory.
Resolution time.Duration
// Background configures the background context used by contexts created by
// the timeline. If nil, the default background context is used instead.
Background context.Context
mutex sync.RWMutex
deadlines map[int64]deadline
cleanupLock int64
cleanupTime int64
}
var (
// HighRes is a timeline configured for high resolution timers, with 10
// millisecond accuracy.
HighRes = Timeline{
Resolution: 10 * time.Millisecond,
}
// LowRes is a timeline configured for low resolution timers, with 1 second
// accuracy. This timeline is typically useful for network timeouts.
//
// Here is an example of how the timeline may be used to set a timeout on an
// http request:
//
// req = req.WithContext(timers.LowRes.Timeout(10 * time.Second))
// res, err := httpClient.Do(req)
//
LowRes = Timeline{
Resolution: 1 * time.Second,
}
)
// Cancel cancels all contexts and releases all internal resources managed by
// the timeline.
func (t *Timeline) Cancel() {
t.mutex.Lock()
deadlines := t.deadlines
t.deadlines = nil
t.mutex.Unlock()
for _, d := range deadlines {
d.cancel()
}
}
// Timeout returns a context which expires after the given amount of time has
// passed, plus up to the timeline's resolution.
func (t *Timeline) Timeout(timeout time.Duration) context.Context {
now := time.Now()
return t.Context(now.Add(timeout), now)
}
// Deadline returns a context which expires when the given deadline is reached,
// plus up to the timeline's resolution.
func (t *Timeline) Deadline(deadline time.Time) context.Context {
return t.Context(deadline, time.Now())
}
// Context returns a context which expires when the given deadline is reached,
// using `now` as the current time.
func (t *Timeline) Context(at time.Time, now time.Time) context.Context {
r := int64(t.resolution())
k := ((at.UnixNano() / r) + 1) * r
t.mutex.RLock()
d, ok := t.deadlines[k]
t.mutex.RUnlock()
if ok { // fast path
return d.context
}
background := t.background()
expiration := jitterTime(time.Unix(0, k), time.Duration(r))
newDeadline := makeDeadline(background, expiration)
t.mutex.Lock()
d, ok = t.deadlines[k]
if !ok {
if t.deadlines == nil {
t.deadlines = make(map[int64]deadline)
}
t.deadlines[k] = newDeadline
}
t.mutex.Unlock()
if ok {
newDeadline.cancel()
} else {
d = newDeadline
}
if cleanupTime := t.loadCleanupTime(); cleanupTime.IsZero() || cleanupTime.Before(now) {
if t.tryLockCleanup() {
t.storeCleanupTime(t.nextCleanupTime(cleanupTime))
t.cleanup(now)
t.unlockCleanup()
}
}
return d.context
}
func (t *Timeline) nextCleanupTime(lastCleanupTime time.Time) time.Time {
return lastCleanupTime.Add(100 * t.resolution())
}
func (t *Timeline) loadCleanupTime() time.Time {
return time.Unix(0, atomic.LoadInt64(&t.cleanupTime))
}
func (t *Timeline) storeCleanupTime(cleanupTime time.Time) {
atomic.StoreInt64(&t.cleanupTime, cleanupTime.UnixNano())
}
func (t *Timeline) tryLockCleanup() bool {
return atomic.CompareAndSwapInt64(&t.cleanupLock, 0, 1)
}
func (t *Timeline) unlockCleanup() {
atomic.StoreInt64(&t.cleanupLock, 0)
}
func (t *Timeline) cleanup(now time.Time) {
type timestampAndDeadline struct {
timestamp int64
deadline deadline
}
expired := []timestampAndDeadline{}
r := t.resolution()
t.mutex.RLock()
for k, d := range t.deadlines {
if deadline, _ := d.context.Deadline(); now.After(deadline.Add(r)) {
expired = append(expired, timestampAndDeadline{
timestamp: k,
deadline: d,
})
}
}
t.mutex.RUnlock()
if len(expired) != 0 {
t.mutex.Lock()
for _, x := range expired {
delete(t.deadlines, x.timestamp)
}
t.mutex.Unlock()
for _, x := range expired {
x.deadline.cancel()
}
}
}
func (t *Timeline) resolution() time.Duration {
if r := t.Resolution; r != 0 {
return r
}
return 100 * time.Millisecond
}
func (t *Timeline) background() context.Context {
if b := t.Background; b != nil {
return b
}
return context.Background()
}
type deadline struct {
context context.Context
cancel context.CancelFunc
}
func makeDeadline(parent context.Context, expiration time.Time) deadline {
context, cancel := context.WithDeadline(parent, expiration)
return deadline{
context: context,
cancel: cancel,
}
}
var (
jitterMutex sync.Mutex
jitterRand = rand.New(
rand.NewSource(time.Now().UnixNano()),
)
)
func jitter(d time.Duration) time.Duration {
jitterMutex.Lock()
x := time.Duration(jitterRand.Int63n(int64(d)))
jitterMutex.Unlock()
return x
}
func jitterTime(t time.Time, d time.Duration) time.Time {
return t.Add(jitter(d))
}