/
cached_object.go
404 lines (323 loc) · 12.3 KB
/
cached_object.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
package objectstorage
import (
"sync"
"sync/atomic"
"github.com/iotaledger/hive.go/kvstore"
"github.com/iotaledger/hive.go/objectstorage/typeutils"
"github.com/iotaledger/hive.go/runtime/syncutils"
"github.com/iotaledger/hive.go/runtime/timed"
)
type CachedObject interface {
Key() []byte
Exists() bool
Get() (result StorableObject)
Consume(consumer func(StorableObject), forceRelease ...bool) bool
Retain() CachedObject
retain() CachedObject
Release(force ...bool)
Transaction(callback func(object StorableObject), identifiers ...interface{}) CachedObject
RTransaction(callback func(object StorableObject), identifiers ...interface{}) CachedObject
kvstore.BatchWriteObject
}
type CachedObjectImpl struct {
key []byte
objectStorage *ObjectStorage
value StorableObject
consumers atomic.Int32
published atomic.Bool
evicted atomic.Bool
batchWriteScheduled atomic.Bool
scheduledTask atomic.Pointer[timed.ScheduledTask]
blindDelete atomic.Bool
wg sync.WaitGroup
valueMutex syncutils.RWMutex
transactionMutex syncutils.RWMultiMutex
}
func newCachedObject(database *ObjectStorage, key []byte) (result *CachedObjectImpl) {
result = &CachedObjectImpl{
objectStorage: database,
key: key,
}
result.wg.Add(1)
return
}
// Creates an "empty" CachedObjectImpl, that is not part of any ObjectStorage.
//
// Sometimes, we want to be able to offer a "filtered view" on the ObjectStorage and therefore be able to return an
// "empty" value on load operations even if the underlying object exists (i.e. the value tangle on top of the normal
// tangle only returns value transactions in its load operations).
func NewEmptyCachedObject(key []byte) (result *CachedObjectImpl) {
result = &CachedObjectImpl{
key: key,
}
result.published.Store(true)
return
}
// Key returns the object storage key that is used to address the object.
func (cachedObject *CachedObjectImpl) Key() []byte {
return cachedObject.key
}
// Retrieves the StorableObject, that is cached in this container.
func (cachedObject *CachedObjectImpl) Get() (result StorableObject) {
cachedObject.valueMutex.RLock()
defer cachedObject.valueMutex.RUnlock()
return cachedObject.value
}
// Releases the object, to be picked up by the persistence layer (as soon as all consumers are done).
func (cachedObject *CachedObjectImpl) Release(force ...bool) {
consumers := cachedObject.consumers.Add(-1)
if consumers > 1 {
return
}
if consumers < 0 {
panic("called Release() too often")
}
if cachedObject.objectStorage.options.cacheTime == 0 || (len(force) >= 1 && force[0]) {
// only force release if there is no timer running, so that objects that landed in the cache through normal
// loading stay available
if cachedObject.scheduledTask.Load() == nil {
cachedObject.evict()
}
return
}
if scheduledTask := cachedObject.objectStorage.ReleaseExecutor().ExecuteAfter(
cachedObject.delayedRelease,
cachedObject.objectStorage.options.cacheTime,
); scheduledTask != nil {
cachedObject.scheduledTask.Store(scheduledTask)
}
}
func (cachedObject *CachedObjectImpl) delayedRelease() {
cachedObject.scheduledTask.Store(nil)
consumers := cachedObject.consumers.Load()
if consumers > 1 {
return
}
if consumers < 0 {
panic("called Release() too often")
}
cachedObject.evict()
}
// Directly consumes the StorableObject. This method automatically Release()s the object when the callback is done.
// Returns true if the callback was called.
func (cachedObject *CachedObjectImpl) Consume(consumer func(StorableObject), forceRelease ...bool) bool {
defer cachedObject.Release(forceRelease...)
if storableObject := cachedObject.Get(); !typeutils.IsInterfaceNil(storableObject) && !storableObject.IsDeleted() {
consumer(storableObject)
return true
}
return false
}
// Registers a new consumer for this cached object.
func (cachedObject *CachedObjectImpl) Retain() CachedObject {
if cachedObject.consumers.Add(1) == 1 {
panic("called Retain() on an already released CachedObject")
}
cachedObject.cancelScheduledRelease()
return cachedObject
}
// Exists returns true if the StorableObject in this container does exist (could be found in the database and was not
// marked as deleted).
func (cachedObject *CachedObjectImpl) Exists() bool {
storableObject := cachedObject.Get()
return !typeutils.IsInterfaceNil(storableObject) && !storableObject.IsDeleted()
}
// Transaction is a synchronization primitive that executes the callback atomically which means that if multiple
// Transactions are being started from different goroutines, then only one of them can run at the same time.
//
// The identifiers allow to define the scope of the Transaction. Transactions with different scopes can run at the same
// time and act as if they are secured by different mutexes.
//
// It is also possible to provide multiple identifiers and the callback waits until all of them can be acquired at the
// same time. In contrast to normal mutexes where acquiring multiple locks can lead to deadlocks, this method is
// deadlock safe.
//
// Note: It is the equivalent of a mutex.Lock/Unlock.
func (cachedObject *CachedObjectImpl) Transaction(callback func(object StorableObject), identifiers ...interface{}) CachedObject {
defer cachedObject.Release()
if len(identifiers) == 0 {
panic("Transaction requires at least one identifier for the scope")
}
cachedObject.transactionMutex.Lock(identifiers...)
defer cachedObject.transactionMutex.Unlock(identifiers...)
callback(cachedObject.Get())
return cachedObject
}
// RTransaction is a synchronization primitive that executes the callback together with other RTransactions but never
// together with a normal Transaction.
//
// The identifiers allow to define the scope of the RTransaction. RTransactions with different scopes can run at the
// same time independently of other RTransactions and act as if they are secured by different mutexes.
//
// It is also possible to provide multiple identifiers and the callback waits until all of them can be acquired at the
// same time. In contrast to normal mutexes where acquiring multiple locks can lead to deadlocks, this method is
// deadlock safe.
//
// Note: It is the equivalent of a mutex.RLock/RUnlock.
func (cachedObject *CachedObjectImpl) RTransaction(callback func(object StorableObject), identifiers ...interface{}) CachedObject {
defer cachedObject.Release()
if len(identifiers) == 0 {
panic("RTransaction requires at least one identifier for the scope")
}
cachedObject.transactionMutex.RLock(identifiers...)
defer cachedObject.transactionMutex.RUnlock(identifiers...)
callback(cachedObject.Get())
return cachedObject
}
// Registers a new consumer for this cached object.
func (cachedObject *CachedObjectImpl) retain() CachedObject {
cachedObject.consumers.Add(1)
cachedObject.cancelScheduledRelease()
return cachedObject
}
func (cachedObject *CachedObjectImpl) storeOnCreation() {
if cachedObject.objectStorage.options.persistenceEnabled && cachedObject.objectStorage.options.storeOnCreation && !typeutils.IsInterfaceNil(cachedObject.value) && cachedObject.value.IsModified() && cachedObject.value.ShouldPersist() {
// store the object immediately
cachedObject.evict()
}
}
//nolint:unparam // lets keep this for now
func (cachedObject *CachedObjectImpl) publishResult(result StorableObject) bool {
if !cachedObject.published.Swap(true) {
// was not published before
cachedObject.value = result
cachedObject.wg.Done()
return true
}
return false
}
func (cachedObject *CachedObjectImpl) updateEmptyResult(update interface{}) (updated bool) {
cachedObject.valueMutex.RLock()
if !typeutils.IsInterfaceNil(cachedObject.value) && !cachedObject.value.IsDeleted() {
cachedObject.valueMutex.RUnlock()
return
}
cachedObject.valueMutex.RUnlock()
cachedObject.valueMutex.Lock()
defer cachedObject.valueMutex.Unlock()
if !typeutils.IsInterfaceNil(cachedObject.value) && !cachedObject.value.IsDeleted() {
return
}
switch typedUpdate := update.(type) {
case StorableObject:
cachedObject.value = typedUpdate
case func() StorableObject:
cachedObject.value = typedUpdate()
default:
panic("invalid argument in call to updateEmptyResult")
}
cachedObject.blindDelete.Store(false)
updated = true
return
}
func (cachedObject *CachedObjectImpl) waitForInitialResult() *CachedObjectImpl {
cachedObject.wg.Wait()
return cachedObject
}
func (cachedObject *CachedObjectImpl) cancelScheduledRelease() {
if scheduledTask := cachedObject.scheduledTask.Swap(nil); scheduledTask != nil {
(*(*timed.ScheduledTask)(scheduledTask)).Cancel()
}
}
// evict either releases non-persistable objects or enqueues persistable objects into the batch writer.
func (cachedObject *CachedObjectImpl) evict() {
if !cachedObject.objectStorage.options.persistenceEnabled {
if storableObject := cachedObject.Get(); !typeutils.IsInterfaceNil(storableObject) {
storableObject.SetModified(false)
}
cachedObject.BatchWriteDone()
return
}
cachedObject.objectStorage.options.batchedWriterInstance.Enqueue(cachedObject)
}
// BatchWriteObject interface methods
// BatchWrite checks if the cachedObject should be persisted.
// If all checks pass, the cachedObject is marshaled and added to the BatchedMutations.
// Do not call this method for objects that should not be persisted.
func (cachedObject *CachedObjectImpl) BatchWrite(batchedMuts kvstore.BatchedMutations) {
consumers := cachedObject.consumers.Load()
if consumers < 0 {
panic("too many unregistered consumers of cached object")
}
storableObject := cachedObject.Get()
if typeutils.IsInterfaceNil(storableObject) {
// only blind delete if there are no consumers
if consumers == 0 && cachedObject.blindDelete.Load() {
if err := batchedMuts.Delete(cachedObject.key); err != nil {
panic(err)
}
}
return
}
if storableObject.IsDeleted() {
// only delete if there are no consumers
if consumers == 0 {
storableObject.SetModified(false)
if err := batchedMuts.Delete(cachedObject.key); err != nil {
panic(err)
}
}
return
}
// only store if there are no consumers anymore or the object should be stored on creation
if consumers != 0 && !cachedObject.objectStorage.options.storeOnCreation {
return
}
if wasModified := storableObject.SetModified(false); !wasModified {
return
}
if !storableObject.ShouldPersist() {
return
}
var marshaledValue []byte
if !cachedObject.objectStorage.options.keysOnly {
marshaledValue = storableObject.ObjectStorageValue()
}
if err := batchedMuts.Set(cachedObject.key, marshaledValue); err != nil {
panic(err)
}
}
// BatchWriteDone is called after the cachedObject was persisted.
// It releases the cachedObject from the cache if no consumers are left and it was not modified in the meantime.
func (cachedObject *CachedObjectImpl) BatchWriteDone() {
// acquire mutexes prior to cache modifications
objectStorage := cachedObject.objectStorage
objectStorage.flushMutex.RLock()
defer objectStorage.flushMutex.RUnlock()
objectStorage.cacheMutex.Lock()
defer objectStorage.cacheMutex.Unlock()
// abort if there are still consumers
if consumers := cachedObject.consumers.Load(); consumers != 0 {
return
}
// abort if the object was modified in the mean time
if storableObject := cachedObject.Get(); !typeutils.IsInterfaceNil(storableObject) && storableObject.IsModified() {
return
}
// abort if the object was evicted already
if cachedObject.evicted.Swap(true) {
return
}
// abort if the object could not be deleted from the cache
if !objectStorage.deleteElementFromCache(cachedObject.key) {
return
}
// fire the eviction callback if registered
if objectStorage.options.onEvictionCallback != nil {
objectStorage.options.onEvictionCallback(cachedObject)
}
// abort if the storage is not empty
if objectStorage.size != 0 {
return
}
// mark storage as empty
objectStorage.cachedObjectsEmpty.Done()
}
// BatchWriteScheduled returns true if the cachedObject is already scheduled for a BatchWrite operation.
func (cachedObject *CachedObjectImpl) BatchWriteScheduled() bool {
return cachedObject.batchWriteScheduled.Swap(true)
}
// ResetBatchWriteScheduled resets the flag that the cachedObject is scheduled for a BatchWrite operation.
func (cachedObject *CachedObjectImpl) ResetBatchWriteScheduled() {
cachedObject.batchWriteScheduled.Store(false)
}