-
Notifications
You must be signed in to change notification settings - Fork 397
/
rollups_write_cache.go
239 lines (191 loc) · 6.81 KB
/
rollups_write_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package orders
import (
"context"
"sync"
"time"
"go.uber.org/zap"
"storj.io/common/pb"
"storj.io/common/sync2"
"storj.io/common/uuid"
)
// CacheData stores the amount of inline and allocated data
// for a bucket bandwidth rollup.
type CacheData struct {
Inline int64
Allocated int64
Settled int64
Dead int64
}
// CacheKey is the key information for the cached map below.
type CacheKey struct {
ProjectID uuid.UUID
BucketName string
Action pb.PieceAction
IntervalStart int64
}
// RollupData contains the pending rollups waiting to be flushed to the db.
type RollupData map[CacheKey]CacheData
// RollupsWriteCache stores information needed to update bucket bandwidth rollups.
type RollupsWriteCache struct {
DB
batchSize int
wg sync.WaitGroup
log *zap.Logger
mu sync.Mutex
pendingRollups RollupData
stopped bool
flushing bool
nextFlushCompletion *sync2.Fence
}
// NewRollupsWriteCache creates an RollupsWriteCache.
func NewRollupsWriteCache(log *zap.Logger, db DB, batchSize int) *RollupsWriteCache {
return &RollupsWriteCache{
DB: db,
batchSize: batchSize,
log: log,
pendingRollups: make(RollupData),
nextFlushCompletion: new(sync2.Fence),
}
}
// UpdateBucketBandwidthAllocation updates the rollups cache adding allocated data for a bucket bandwidth rollup.
func (cache *RollupsWriteCache) UpdateBucketBandwidthAllocation(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error {
return cache.updateCacheValue(ctx, projectID, bucketName, action, amount, 0, 0, 0, intervalStart.UTC())
}
// UpdateBucketBandwidthInline updates the rollups cache adding inline data for a bucket bandwidth rollup.
func (cache *RollupsWriteCache) UpdateBucketBandwidthInline(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, amount int64, intervalStart time.Time) error {
return cache.updateCacheValue(ctx, projectID, bucketName, action, 0, amount, 0, 0, intervalStart.UTC())
}
// UpdateBucketBandwidthSettle updates the rollups cache adding settled data for a bucket bandwidth rollup - deadAmount is not used.
func (cache *RollupsWriteCache) UpdateBucketBandwidthSettle(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, settledAmount, deadAmount int64, intervalStart time.Time) error {
return cache.updateCacheValue(ctx, projectID, bucketName, action, 0, 0, settledAmount, deadAmount, intervalStart.UTC())
}
// resetCache should only be called after you have acquired the cache lock. It
// will reset the various cache values and return the pendingRollups.
func (cache *RollupsWriteCache) resetCache() RollupData {
pendingRollups := cache.pendingRollups
cache.pendingRollups = make(RollupData)
return pendingRollups
}
// Flush resets cache then flushes the everything in the rollups write cache to the database.
func (cache *RollupsWriteCache) Flush(ctx context.Context) {
defer mon.Task()(&ctx)(nil)
cache.mu.Lock()
// while we're already flushing, wait for it to complete.
for cache.flushing {
done := cache.nextFlushCompletion.Done()
cache.mu.Unlock()
select {
case <-done:
case <-ctx.Done():
return
}
cache.mu.Lock()
}
cache.flushing = true
pendingRollups := cache.resetCache()
cache.mu.Unlock()
cache.flush(ctx, pendingRollups)
}
// CloseAndFlush flushes anything in the cache and marks the cache as stopped.
func (cache *RollupsWriteCache) CloseAndFlush(ctx context.Context) error {
cache.mu.Lock()
cache.stopped = true
cache.mu.Unlock()
cache.wg.Wait()
cache.Flush(ctx)
return nil
}
// flush flushes the everything in the rollups write cache to the database.
func (cache *RollupsWriteCache) flush(ctx context.Context, pendingRollups RollupData) {
defer mon.Task()(&ctx)(nil)
if len(pendingRollups) > 0 {
rollups := make([]BucketBandwidthRollup, 0, len(pendingRollups))
for cacheKey, cacheData := range pendingRollups {
rollups = append(rollups, BucketBandwidthRollup{
ProjectID: cacheKey.ProjectID,
BucketName: cacheKey.BucketName,
IntervalStart: time.Unix(cacheKey.IntervalStart, 0),
Action: cacheKey.Action,
Inline: cacheData.Inline,
Allocated: cacheData.Allocated,
Settled: cacheData.Settled,
Dead: cacheData.Dead,
})
}
err := cache.DB.UpdateBandwidthBatch(ctx, rollups)
if err != nil {
mon.Event("rollups_write_cache_flush_lost")
cache.log.Error("MONEY LOST! Bucket bandwidth rollup batch flush failed.", zap.Error(err))
}
}
cache.mu.Lock()
defer cache.mu.Unlock()
cache.nextFlushCompletion.Release()
cache.nextFlushCompletion = new(sync2.Fence)
cache.flushing = false
}
func (cache *RollupsWriteCache) updateCacheValue(ctx context.Context, projectID uuid.UUID, bucketName []byte, action pb.PieceAction, allocated, inline, settled, dead int64, intervalStart time.Time) error {
defer mon.Task()(&ctx)(nil)
cache.mu.Lock()
defer cache.mu.Unlock()
if cache.stopped {
return Error.New("RollupsWriteCache is stopped")
}
key := CacheKey{
ProjectID: projectID,
BucketName: string(bucketName),
Action: action,
IntervalStart: time.Date(intervalStart.Year(), intervalStart.Month(), intervalStart.Day(), intervalStart.Hour(), 0, 0, 0, intervalStart.Location()).Unix(),
}
// pevent unbounded memory memory growth if we're not flushing fast enough
// to keep up with incoming writes.
data, ok := cache.pendingRollups[key]
if !ok && len(cache.pendingRollups) >= cache.batchSize {
mon.Event("rollups_write_cache_update_lost")
cache.log.Error("MONEY LOST! Flushing too slow to keep up with demand.")
} else {
data.Allocated += allocated
data.Inline += inline
data.Settled += settled
data.Dead += dead
cache.pendingRollups[key] = data
}
if len(cache.pendingRollups) < cache.batchSize {
return nil
}
if !cache.flushing {
cache.flushing = true
pendingRollups := cache.resetCache()
cache.wg.Add(1)
go func() {
defer cache.wg.Done()
cache.flush(ctx, pendingRollups)
}()
}
return nil
}
// OnNextFlush waits until the next time a flush call is made, then closes
// the returned channel.
func (cache *RollupsWriteCache) OnNextFlush() <-chan struct{} {
cache.mu.Lock()
defer cache.mu.Unlock()
return cache.nextFlushCompletion.Done()
}
// CurrentSize returns the current size of the cache.
func (cache *RollupsWriteCache) CurrentSize() int {
cache.mu.Lock()
defer cache.mu.Unlock()
return len(cache.pendingRollups)
}
// CurrentData returns the contents of the cache.
func (cache *RollupsWriteCache) CurrentData() RollupData {
cache.mu.Lock()
defer cache.mu.Unlock()
copyCache := RollupData{}
for k, v := range cache.pendingRollups {
copyCache[k] = v
}
return copyCache
}