-
Notifications
You must be signed in to change notification settings - Fork 167
/
persistent_strict_monotonic_counter.go
70 lines (59 loc) · 2.3 KB
/
persistent_strict_monotonic_counter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package counters
import (
"errors"
"fmt"
"github.com/onflow/flow-go/storage"
)
// ErrIncorrectValue indicates that a processed value is lower or equal than current value.
var (
ErrIncorrectValue = errors.New("incorrect value")
)
// PersistentStrictMonotonicCounter represents the consumer progress with strict monotonic counter.
type PersistentStrictMonotonicCounter struct {
consumerProgress storage.ConsumerProgress
// used to skip heights that are lower than the current height
counter StrictMonotonousCounter
}
// NewPersistentStrictMonotonicCounter creates a new PersistentStrictMonotonicCounter which inserts the default
// processed index to the storage layer and creates new counter with defaultIndex value.
// The consumer progress and associated db entry must not be accessed outside of calls to the returned object,
// otherwise the state may become inconsistent.
//
// No errors are expected during normal operation.
func NewPersistentStrictMonotonicCounter(consumerProgress storage.ConsumerProgress, defaultIndex uint64) (*PersistentStrictMonotonicCounter, error) {
m := &PersistentStrictMonotonicCounter{
consumerProgress: consumerProgress,
}
// sync with storage for the processed index to ensure the consistency
value, err := m.consumerProgress.ProcessedIndex()
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return nil, fmt.Errorf("could not read consumer progress: %w", err)
}
err := m.consumerProgress.InitProcessedIndex(defaultIndex)
if err != nil {
return nil, fmt.Errorf("could not init consumer progress: %w", err)
}
value = defaultIndex
}
m.counter = NewMonotonousCounter(value)
return m, nil
}
// Set sets the processed index, ensuring it is strictly monotonically increasing.
//
// Expected errors during normal operation:
// - codes.ErrIncorrectValue - if stored value is larger than processed.
// - generic error in case of unexpected failure from the database layer or
// encoding failure.
func (m *PersistentStrictMonotonicCounter) Set(processed uint64) error {
if !m.counter.Set(processed) {
return ErrIncorrectValue
}
return m.consumerProgress.SetProcessedIndex(processed)
}
// Value loads the current stored index.
//
// No errors are expected during normal operation.
func (m *PersistentStrictMonotonicCounter) Value() uint64 {
return m.counter.Value()
}