-
Notifications
You must be signed in to change notification settings - Fork 178
/
incorporated_results.go
170 lines (146 loc) · 5.91 KB
/
incorporated_results.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package stdmap
import (
"errors"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/mempool/model"
"github.com/onflow/flow-go/storage"
)
// IncorporatedResults implements the incorporated results memory pool of the
// consensus nodes, used to store results that need to be sealed.
type IncorporatedResults struct {
// Concurrency: the mempool internally re-uses the backend's lock
backend *Backend
size uint
}
// NewIncorporatedResults creates a mempool for the incorporated results.
func NewIncorporatedResults(limit uint, opts ...OptionFunc) (*IncorporatedResults, error) {
mempool := &IncorporatedResults{
size: 0,
backend: NewBackend(append(opts, WithLimit(limit))...),
}
adjustSizeOnEjection := func(entity flow.Entity) {
// uncaught type assertion; should never panic as the mempool only stores IncorporatedResultMap:
incorporatedResultMap := entity.(*model.IncorporatedResultMap)
mempool.size -= uint(len(incorporatedResultMap.IncorporatedResults))
}
mempool.backend.RegisterEjectionCallbacks(adjustSizeOnEjection)
return mempool, nil
}
// Add adds an IncorporatedResult to the mempool.
func (ir *IncorporatedResults) Add(incorporatedResult *flow.IncorporatedResult) (bool, error) {
key := incorporatedResult.Result.ID()
appended := false
err := ir.backend.Run(func(backdata map[flow.Identifier]flow.Entity) error {
var incResults map[flow.Identifier]*flow.IncorporatedResult
entity, ok := backdata[key]
if !ok {
// no record with key is available in the mempool, initialise
// incResults.
incResults = make(map[flow.Identifier]*flow.IncorporatedResult)
// add the new map to mempool for holding all incorporated results for the same result.ID
backdata[key] = &model.IncorporatedResultMap{
ExecutionResult: incorporatedResult.Result,
IncorporatedResults: incResults,
}
} else {
// uncaught type assertion; should never panic as the mempool only stores IncorporatedResultMap:
incResults = entity.(*model.IncorporatedResultMap).IncorporatedResults
if _, ok := incResults[incorporatedResult.IncorporatedBlockID]; ok {
// incorporated result is already associated with result and
// incorporated block.
return nil
}
}
// appends incorporated result to the map
incResults[incorporatedResult.IncorporatedBlockID] = incorporatedResult
appended = true
ir.size++
return nil
})
return appended, err
}
// All returns all the items in the mempool.
func (ir *IncorporatedResults) All() flow.IncorporatedResultList {
// To guarantee concurrency safety, we need to copy the map via a locked operation in the backend.
// Otherwise, another routine might concurrently modify the maps stored as mempool entities.
res := make([]*flow.IncorporatedResult, 0)
_ = ir.backend.Run(func(backdata map[flow.Identifier]flow.Entity) error {
for _, entity := range backdata {
// uncaught type assertion; should never panic as the mempool only stores IncorporatedResultMap:
for _, ir := range entity.(*model.IncorporatedResultMap).IncorporatedResults {
res = append(res, ir)
}
}
return nil
}) // error return impossible
return res
}
// ByResultID returns all the IncorporatedResults that contain a specific
// ExecutionResult, indexed by IncorporatedBlockID.
func (ir *IncorporatedResults) ByResultID(resultID flow.Identifier) (*flow.ExecutionResult, map[flow.Identifier]*flow.IncorporatedResult, bool) {
// To guarantee concurrency safety, we need to copy the map via a locked operation in the backend.
// Otherwise, another routine might concurrently modify the map stored for the same resultID.
var result *flow.ExecutionResult
incResults := make(map[flow.Identifier]*flow.IncorporatedResult)
err := ir.backend.Run(func(backdata map[flow.Identifier]flow.Entity) error {
entity, exists := backdata[resultID]
if !exists {
return storage.ErrNotFound
}
// uncaught type assertion; should never panic as the mempool only stores IncorporatedResultMap:
irMap := entity.(*model.IncorporatedResultMap)
result = irMap.ExecutionResult
for i, res := range irMap.IncorporatedResults {
incResults[i] = res
}
return nil
})
if errors.Is(err, storage.ErrNotFound) {
return nil, nil, false
} else if err != nil {
// The current implementation never reaches this path
panic("unexpected internal error in IncorporatedResults mempool: " + err.Error())
}
return result, incResults, true
}
// Rem removes an IncorporatedResult from the mempool.
func (ir *IncorporatedResults) Rem(incorporatedResult *flow.IncorporatedResult) bool {
key := incorporatedResult.Result.ID()
removed := false
_ = ir.backend.Run(func(backdata map[flow.Identifier]flow.Entity) error {
var incResults map[flow.Identifier]*flow.IncorporatedResult
entity, ok := backdata[key]
if !ok {
// there are no items for this result
return nil
}
// uncaught type assertion; should never panic as the mempool only stores IncorporatedResultMap:
incResults = entity.(*model.IncorporatedResultMap).IncorporatedResults
if _, ok := incResults[incorporatedResult.IncorporatedBlockID]; !ok {
// there are no items for this IncorporatedBlockID
return nil
}
if len(incResults) == 1 {
// special case: there is only a single Incorporated result stored for this Result.ID()
// => remove entire map
delete(backdata, key)
} else {
// remove item from map
delete(incResults, incorporatedResult.IncorporatedBlockID)
}
removed = true
ir.size--
return nil
}) // error return impossible
return removed
}
// Size returns the number of incorporated results in the mempool.
func (ir *IncorporatedResults) Size() uint {
// To guarantee concurrency safety, i.e. that the read retrieves the latest size value,
// we need run the read through a locked operation in the backend.
// To guarantee concurrency safety, i.e. that the read retrieves the latest size value,
// we need run utilize the backend's lock.
ir.backend.RLock()
defer ir.backend.RUnlock()
return ir.size
}