-
Notifications
You must be signed in to change notification settings - Fork 176
/
chunk_requests.go
229 lines (193 loc) · 7.5 KB
/
chunk_requests.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
package stdmap
import (
"fmt"
"time"
"github.com/onflow/flow-go/model/chunks"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/verification"
"github.com/onflow/flow-go/module/mempool"
)
// ChunkRequests is an implementation of in-memory storage for maintaining chunk requests data objects.
//
// In this implementation, the ChunkRequests
// wraps the ChunkDataPackRequests around an internal ChunkRequestStatus data object, and maintains the wrapped
// version in memory.
type ChunkRequests struct {
*Backend
}
func NewChunkRequests(limit uint) *ChunkRequests {
return &ChunkRequests{
Backend: NewBackend(WithLimit(limit)),
}
}
func toChunkRequestStatus(entity flow.Entity) *chunkRequestStatus {
status, ok := entity.(*chunkRequestStatus)
if !ok {
panic(fmt.Sprintf("could not convert the entity into chunk status from the mempool: %v", entity))
}
return status
}
// RequestHistory returns the number of times the chunk has been requested,
// last time the chunk has been requested, and the retryAfter duration of the
// underlying request status of this chunk.
//
// The last boolean parameter returns whether a chunk request for this chunk ID
// exists in memory-pool.
func (cs *ChunkRequests) RequestHistory(chunkID flow.Identifier) (uint64, time.Time, time.Duration, bool) {
var lastAttempt time.Time
var retryAfter time.Duration
var attempts uint64
err := cs.Backend.Run(func(backdata mempool.BackData) error {
entity, ok := backdata.ByID(chunkID)
if !ok {
return fmt.Errorf("request does not exist for chunk %x", chunkID)
}
request := toChunkRequestStatus(entity)
lastAttempt = request.LastAttempt
retryAfter = request.RetryAfter
attempts = request.Attempt
return nil
})
return attempts, lastAttempt, retryAfter, err == nil
}
// Add provides insertion functionality into the memory pool.
// The insertion is only successful if there is no duplicate chunk request for the same
// tuple of (chunkID, resultID, chunkIndex).
func (cs *ChunkRequests) Add(request *verification.ChunkDataPackRequest) bool {
err := cs.Backend.Run(func(backdata mempool.BackData) error {
entity, exists := backdata.ByID(request.ChunkID)
chunkLocatorID := request.Locator.ID()
if !exists {
locators := make(chunks.LocatorMap)
locators[chunkLocatorID] = &request.Locator
// no chunk request status exists for this chunk ID, hence initiating one.
status := &chunkRequestStatus{
Locators: locators,
RequestInfo: request.ChunkDataPackRequestInfo,
}
added := backdata.Add(request.ChunkID, status)
if !added {
return fmt.Errorf("potential race condition in adding chunk requests")
}
return nil
}
status := toChunkRequestStatus(entity)
if _, ok := status.Locators[chunkLocatorID]; ok {
return fmt.Errorf("chunk request exists with same locator (result_id=%x, chunk_index=%d)", request.Locator.ResultID, request.Locator.Index)
}
status.Locators[chunkLocatorID] = &request.Locator
status.RequestInfo.Agrees = status.RequestInfo.Agrees.Union(request.Agrees)
status.RequestInfo.Disagrees = status.RequestInfo.Disagrees.Union(request.Disagrees)
status.RequestInfo.Targets = status.RequestInfo.Targets.Union(request.Targets)
backdata.Add(request.ChunkID, status)
return nil
})
return err == nil
}
// Remove provides deletion functionality from the memory pool.
// If there is a chunk request with this ID, Remove removes it and returns true.
// Otherwise it returns false.
func (cs *ChunkRequests) Remove(chunkID flow.Identifier) bool {
return cs.Backend.Remove(chunkID)
}
// PopAll atomically returns all locators associated with this chunk ID while clearing out the
// chunk request status for this chunk id.
// Boolean return value indicates whether there are requests in the memory pool associated
// with chunk ID.
func (cs *ChunkRequests) PopAll(chunkID flow.Identifier) (chunks.LocatorMap, bool) {
var locators map[flow.Identifier]*chunks.Locator
err := cs.Backend.Run(func(backdata mempool.BackData) error {
entity, exists := backdata.ByID(chunkID)
if !exists {
return fmt.Errorf("not exist")
}
locators = toChunkRequestStatus(entity).Locators
_, removed := backdata.Remove(chunkID)
if !removed {
return fmt.Errorf("potential race condition on removing chunk request from mempool")
}
return nil
})
if err != nil {
return nil, false
}
return locators, true
}
// IncrementAttempt increments the Attempt field of the corresponding status of the
// chunk request in memory pool that has the specified chunk ID.
// If such chunk ID does not exist in the memory pool, it returns false.
//
// The increments are done atomically, thread-safe, and in isolation.
func (cs *ChunkRequests) IncrementAttempt(chunkID flow.Identifier) bool {
err := cs.Backend.Run(func(backdata mempool.BackData) error {
entity, exists := backdata.ByID(chunkID)
if !exists {
return fmt.Errorf("not exist")
}
chunk := toChunkRequestStatus(entity)
chunk.Attempt++
chunk.LastAttempt = time.Now()
return nil
})
return err == nil
}
// All returns all chunk requests stored in this memory pool.
func (cs *ChunkRequests) All() verification.ChunkDataPackRequestInfoList {
all := cs.Backend.All()
requestInfoList := verification.ChunkDataPackRequestInfoList{}
for _, entity := range all {
requestInfo := toChunkRequestStatus(entity).RequestInfo
requestInfoList = append(requestInfoList, &requestInfo)
}
return requestInfoList
}
// UpdateRequestHistory updates the request history of the specified chunk ID. If the update was successful, i.e.,
// the updater returns true, the result of update is committed to the mempool, and the time stamp of the chunk request
// is updated to the current time. Otherwise, it aborts and returns false.
//
// It returns the updated request history values.
//
// The updates under this method are atomic, thread-safe, and done in isolation.
func (cs *ChunkRequests) UpdateRequestHistory(chunkID flow.Identifier, updater mempool.ChunkRequestHistoryUpdaterFunc) (uint64, time.Time, time.Duration, bool) {
var lastAttempt time.Time
var retryAfter time.Duration
var attempts uint64
err := cs.Backend.Run(func(backdata mempool.BackData) error {
entity, exists := backdata.ByID(chunkID)
if !exists {
return fmt.Errorf("not exist")
}
status := toChunkRequestStatus(entity)
var ok bool
attempts, retryAfter, ok = updater(status.Attempt, status.RetryAfter)
if !ok {
return fmt.Errorf("updater failed")
}
lastAttempt = time.Now()
// updates underlying request
status.LastAttempt = lastAttempt
status.RetryAfter = retryAfter
status.Attempt = attempts
return nil
})
return attempts, lastAttempt, retryAfter, err == nil
}
// Size returns total number of chunk requests in the memory pool.
func (cs ChunkRequests) Size() uint {
return cs.Backend.Size()
}
// chunkRequestStatus is an internal data type for ChunkRequests mempool. It acts as a wrapper for ChunkDataRequests, maintaining
// some auxiliary attributes that are internal to ChunkRequests.
type chunkRequestStatus struct {
Locators map[flow.Identifier]*chunks.Locator // keeps locators by their chunk id.
RequestInfo verification.ChunkDataPackRequestInfo
LastAttempt time.Time // timestamp of last request dispatched for this chunk id.
RetryAfter time.Duration // interval until request should be retried.
Attempt uint64 // number of times this chunk request has been dispatched in the network.
}
func (c chunkRequestStatus) ID() flow.Identifier {
return c.RequestInfo.ChunkID
}
func (c chunkRequestStatus) Checksum() flow.Identifier {
return c.RequestInfo.ChunkID
}