/
delta.go
95 lines (80 loc) · 2.55 KB
/
delta.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package store
import (
"fmt"
"regexp"
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
)
func (b *baseStore) ApplyDelta(delta *pbsubstreams.StoreDelta) {
// Keys need to have at least one character, and mustn't start with 0xFF is reserved for internal use.
if len(delta.Key) == 0 {
panic(fmt.Sprintf("key invalid, must be at least 1 character for module %q", b.name))
}
if delta.Key[0] == byte(255) {
panic(fmt.Sprintf("key %q invalid, must be at least 1 character and not start with 0xFF", delta.Key))
}
newSize := uint64(len(delta.NewValue))
oldSize := uint64(len(delta.OldValue))
keySize := uint64(len(delta.Key))
switch delta.Operation {
case pbsubstreams.StoreDelta_UPDATE:
b.kv[delta.Key] = delta.NewValue
switch {
case newSize > oldSize:
b.totalSizeBytes += (newSize - oldSize)
case newSize < oldSize:
b.totalSizeBytes -= (oldSize - newSize)
}
case pbsubstreams.StoreDelta_CREATE:
b.kv[delta.Key] = delta.NewValue
b.totalSizeBytes += newSize
b.totalSizeBytes += keySize
case pbsubstreams.StoreDelta_DELETE:
delete(b.kv, delta.Key)
b.totalSizeBytes -= oldSize
b.totalSizeBytes -= keySize
return
}
if b.totalSizeBytes > b.totalSizeLimit {
panic(storeTooBigError(b.Name(), b.totalSizeBytes, b.totalSizeLimit))
}
}
var StoreAboveMaxSizeRegexp = regexp.MustCompile("store .* became too big at [0-9]*, maximum size: [0-9]*")
func storeTooBigError(storeName string, size, limit uint64) error {
return fmt.Errorf("store %q became too big at %d, maximum size: %d", storeName, size, limit)
}
func (b *baseStore) ApplyDeltasReverse(deltas []*pbsubstreams.StoreDelta) {
for i := len(deltas) - 1; i >= 0; i-- {
delta := deltas[i]
newSize := uint64(len(delta.NewValue))
oldSize := uint64(len(delta.OldValue))
keySize := uint64(len(delta.Key))
switch delta.Operation {
case pbsubstreams.StoreDelta_UPDATE:
b.kv[delta.Key] = delta.OldValue
switch {
case newSize > oldSize:
b.totalSizeBytes -= (newSize - oldSize)
case newSize < oldSize:
b.totalSizeBytes += (oldSize - newSize)
}
case pbsubstreams.StoreDelta_CREATE:
delete(b.kv, delta.Key)
b.totalSizeBytes -= newSize
b.totalSizeBytes -= keySize
case pbsubstreams.StoreDelta_DELETE:
b.kv[delta.Key] = delta.OldValue
b.totalSizeBytes += oldSize
b.totalSizeBytes += keySize
return
}
}
}
func (b *baseStore) GetDeltas() []*pbsubstreams.StoreDelta {
return b.deltas
}
func (b *baseStore) SetDeltas(deltas []*pbsubstreams.StoreDelta) {
b.deltas = deltas
for _, delta := range deltas {
b.ApplyDelta(delta)
}
}