-
Notifications
You must be signed in to change notification settings - Fork 11
/
readcache.go
224 lines (190 loc) · 5.44 KB
/
readcache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package sync2
import (
"context"
"sync"
"time"
"github.com/zeebo/errs"
)
// ReadCache is a backwards compatible implementation.
type ReadCache = ReadCacheOf[any]
// ReadCacheOf implements refreshing of state based on a refresh timeout,
// but also allows for stale reads up to a certain duration.
type ReadCacheOf[T any] struct {
noCopy noCopy //nolint:structcheck
started Fence
ctx context.Context
// read is a func that's called when a new update is needed.
read func(ctx context.Context) (T, error)
// refresh defines when the state should be updated.
refresh time.Duration
// stale defines when we must wait for the new state.
stale time.Duration
// mu protects the internal state of the cache.
mu sync.Mutex
// closed is set true when the read cache is shuting down.
closed bool
// result contains the last known state and any errors that
// occurred during refreshing.
result *readCacheResult[T]
// pending is a channel for waiting for the current refresh.
// it is only present, when there is an ongoing refresh.
pending *readCacheWorker[T]
}
// NewReadCache returns a new ReadCacheOf.
func NewReadCache[T any](refresh time.Duration, stale time.Duration, read func(ctx context.Context) (T, error)) (*ReadCacheOf[T], error) {
cache := &ReadCacheOf[T]{}
return cache, cache.Init(refresh, stale, read)
}
// Init initializes the cache for in-place initialization. This is only needed when NewReadCache
// was not used to initialize it.
func (cache *ReadCacheOf[T]) Init(refresh time.Duration, stale time.Duration, read func(ctx context.Context) (T, error)) error {
if refresh > stale {
refresh = stale
}
if refresh <= 0 || stale <= 0 {
return errs.New("refresh and stale must be positive. refresh=%v, stale=%v", refresh, stale)
}
cache.read = read
cache.refresh = refresh
cache.stale = stale
return nil
}
// readCacheWorker contains the pending result.
type readCacheWorker[T any] struct {
done chan struct{}
result *readCacheResult[T]
}
// readCacheResult contains the result of a read and info related to it.
type readCacheResult[T any] struct {
start time.Time
state T
err error
}
// Run starts the background process for the cache.
func (cache *ReadCacheOf[T]) Run(ctx context.Context) error {
// set the root context
cache.ctx = ctx
cache.started.Release()
// wait for things to start shutting down
<-ctx.Done()
// close the workers
cache.mu.Lock()
cache.closed = true
pending := cache.pending
cache.mu.Unlock()
// wait for worker to exit
if pending != nil {
<-pending.done
}
return nil
}
// Get fetches the latest state and refreshes when it's needed.
func (cache *ReadCacheOf[T]) Get(ctx context.Context, now time.Time) (state T, err error) {
if !cache.started.Wait(ctx) {
var zero T
return zero, ctx.Err()
}
// check whether we need to start a refresh
cache.mu.Lock()
mustWait := false
if cache.result == nil || cache.result.err != nil || now.Sub(cache.result.start) >= cache.refresh {
// check whether we must wait for the result:
// * we don't have anything in cache
// * the cache state has errored
// * we have reached the staleness deadline
mustWait = cache.result == nil || cache.result.err != nil || now.Sub(cache.result.start) >= cache.stale
if err := cache.startRefresh(now); err != nil {
cache.mu.Unlock()
var zero T
return zero, err
}
}
result, pending := cache.result, cache.pending
cache.mu.Unlock()
// wait for the new result, when needed
if mustWait {
select {
case <-pending.done:
case <-ctx.Done():
var zero T
return zero, ctx.Err()
}
result = pending.result
}
return result.state, result.err
}
// RefreshAndGet refreshes the cache and returns the latest result.
func (cache *ReadCacheOf[T]) RefreshAndGet(ctx context.Context, now time.Time) (state T, err error) {
if !cache.started.Wait(ctx) {
return state, ctx.Err()
}
cache.mu.Lock()
if err := cache.startRefresh(now); err != nil {
cache.mu.Unlock()
var zero T
return zero, err
}
pending := cache.pending
cache.mu.Unlock()
select {
case <-pending.done:
case <-ctx.Done():
var zero T
return zero, ctx.Err()
}
return pending.result.state, pending.result.err
}
// Wait waits for any pending refresh and returns the result.
func (cache *ReadCacheOf[T]) Wait(ctx context.Context) (state T, err error) {
if !cache.started.Wait(ctx) {
var zero T
return zero, ctx.Err()
}
cache.mu.Lock()
result, pending := cache.result, cache.pending
cache.mu.Unlock()
if pending != nil {
select {
case <-pending.done:
case <-ctx.Done():
var zero T
return zero, ctx.Err()
}
return pending.result.state, pending.result.err
}
return result.state, result.err
}
// startRefresh starts a new background refresh, when one isn't running
// already. It will return an error when the cache is shutting down.
//
// Note: this must only be called when `cache.mu` is being held.
func (cache *ReadCacheOf[T]) startRefresh(now time.Time) error {
if cache.closed {
return context.Canceled
}
if cache.pending != nil {
return nil
}
pending := &readCacheWorker[T]{
done: make(chan struct{}),
result: nil,
}
go func() {
defer close(pending.done)
state, err := cache.read(cache.ctx)
cache.mu.Lock()
result := &readCacheResult[T]{
start: now,
state: state,
err: err,
}
cache.result = result
pending.result = result
cache.pending = nil
cache.mu.Unlock()
}()
cache.pending = pending
return nil
}