-
Notifications
You must be signed in to change notification settings - Fork 178
/
helper.go
497 lines (424 loc) · 16.8 KB
/
helper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
package test
import (
"bytes"
"fmt"
"sync"
"testing"
"time"
"github.com/rs/zerolog/log"
"github.com/stretchr/testify/assert"
testifymock "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/testutil"
mock2 "github.com/onflow/flow-go/engine/testutil/mock"
"github.com/onflow/flow-go/engine/verification"
"github.com/onflow/flow-go/engine/verification/utils"
chmodel "github.com/onflow/flow-go/model/chunks"
"github.com/onflow/flow-go/model/encoding"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/messages"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/mock"
network "github.com/onflow/flow-go/network/mock"
"github.com/onflow/flow-go/network/stub"
"github.com/onflow/flow-go/utils/logging"
"github.com/onflow/flow-go/utils/unittest"
)
// VerificationHappyPath runs `verNodeCount`-many verification nodes
// and checks that concurrently received execution receipts with the same result part that
// by each verification node results in:
// - the selection of the assigned chunks by the ingest engine
// - request of the associated chunk data pack to the assigned chunks
// - formation of a complete verifiable chunk by the ingest engine for each assigned chunk
// - submitting a verifiable chunk locally to the verify engine by the ingest engine
// - dropping the ingestion of the ERs that share the same result once the verifiable chunk is submitted to verify engine
// - broadcast of a matching result approval to consensus nodes for each assigned chunk
func VerificationHappyPath(t *testing.T,
verNodeCount int,
chunkNum int,
verCollector module.VerificationMetrics,
mempoolCollector module.MempoolMetrics) {
// to demarcate the debug logs
log.Debug().
Int("verification_nodes_count", verNodeCount).
Int("chunk_num", chunkNum).
Msg("TestHappyPath started")
// ingest engine parameters
// set based on following issue (3443)
processInterval := 1 * time.Second
requestInterval := 1 * time.Second
failureThreshold := uint(2)
// generates network hub
hub := stub.NewNetworkHub()
chainID := flow.Testnet
// generates identities of nodes, one of each type, `verNodeCount` many of verification nodes
colIdentity := unittest.IdentityFixture(unittest.WithRole(flow.RoleCollection))
exeIdentity := unittest.IdentityFixture(unittest.WithRole(flow.RoleExecution))
verIdentities := unittest.IdentityListFixture(verNodeCount, unittest.WithRole(flow.RoleVerification))
conIdentity := unittest.IdentityFixture(unittest.WithRole(flow.RoleConsensus))
identities := flow.IdentityList{colIdentity, conIdentity, exeIdentity}
identities = append(identities, verIdentities...)
// Execution receipt and chunk assignment
//
// creates an execution receipt and its associated data
// with `chunkNum` chunks
completeER := utils.CompleteExecutionResultFixture(t, chunkNum, chainID.Chain())
result := &completeER.Receipt.ExecutionResult
// mocks the assignment to only assign "some" chunks to the verIdentity
// the assignment is done based on `isAssgined` function
assigner := &mock.ChunkAssigner{}
a := chmodel.NewAssignment()
for _, chunk := range completeER.Receipt.ExecutionResult.Chunks {
assignees := make([]flow.Identifier, 0)
for _, verIdentity := range verIdentities {
if IsAssigned(chunk.Index, len(completeER.Receipt.ExecutionResult.Chunks)) {
assignees = append(assignees, verIdentity.NodeID)
}
}
a.Add(chunk, assignees)
}
// nodes and engines
//
// verification node
verNodes := make([]mock2.VerificationNode, 0)
assigner.On("Assign", result, result.BlockID).Return(a, nil)
for _, verIdentity := range verIdentities {
verNode := testutil.VerificationNode(t,
hub,
verIdentity,
identities,
assigner,
requestInterval,
processInterval,
failureThreshold,
uint(10), // limits size of receipt related mempools to 10
uint(10*chunkNum), // limits size of chunks related mempools to 10 * chunkNum
chainID,
verCollector,
mempoolCollector)
// starts all the engines
<-verNode.FinderEngine.Ready()
<-verNode.MatchEngine.(module.ReadyDoneAware).Ready()
<-verNode.VerifierEngine.(module.ReadyDoneAware).Ready()
// assumes the verification node has received the block
err := verNode.Blocks.Store(completeER.Block)
assert.Nil(t, err)
verNodes = append(verNodes, verNode)
}
// mock execution node
exeNode, exeEngine := SetupMockExeNode(t, hub, exeIdentity, verIdentities, identities, chainID, completeER)
// mock consensus node
conNode, conEngine, conWG := SetupMockConsensusNode(t,
hub,
conIdentity,
verIdentities,
identities,
completeER,
chainID)
// sends execution receipt to each of verification nodes
verWG := sync.WaitGroup{}
for _, verNode := range verNodes {
verWG.Add(1)
go func(vn mock2.VerificationNode, receipt *flow.ExecutionReceipt) {
defer verWG.Done()
err := vn.FinderEngine.Process(exeIdentity.NodeID, receipt)
require.NoError(t, err)
}(verNode, completeER.Receipt)
}
// requires all verification nodes process the receipt
unittest.RequireReturnsBefore(t, verWG.Wait, time.Duration(chunkNum*verNodeCount*5)*time.Second,
"verification node process")
// creates a network instance for each verification node
// and sets it in continuous delivery mode
// then flushes the collection requests
verNets := make([]*stub.Network, 0)
for _, verIdentity := range verIdentities {
verNet, ok := hub.GetNetwork(verIdentity.NodeID)
assert.True(t, ok)
verNet.StartConDev(requestInterval, true)
verNet.DeliverSome(true, func(m *stub.PendingMessage) bool {
return m.ChannelID == engine.RequestCollections
})
verNets = append(verNets, verNet)
}
// requires all verification nodes send a result approval per assigned chunk
unittest.RequireReturnsBefore(t, conWG.Wait, time.Duration(chunkNum*verNodeCount*5)*time.Second,
"consensus node process")
// assert that the RA was received
conEngine.AssertExpectations(t)
// assert proper number of calls made
exeEngine.AssertExpectations(t)
// stops verification nodes
// Note: this should be done prior to any evaluation to make sure that
// the process method of Ingest engines is done working.
for _, verNode := range verNodes {
// stops all the engines
<-verNode.FinderEngine.Done()
<-verNode.MatchEngine.(module.ReadyDoneAware).Done()
<-verNode.VerifierEngine.(module.ReadyDoneAware).Done()
}
// stops continuous delivery of nodes
for _, verNet := range verNets {
verNet.StopConDev()
}
conNode.Done()
exeNode.Done()
// asserts that all processing pipeline of verification node is fully
// cleaned up.
for _, verNode := range verNodes {
assert.Equal(t, verNode.ChunkIDsByResult.Size(), uint(0))
assert.Equal(t, verNode.CachedReceipts.Size(), uint(0))
assert.Equal(t, verNode.ReadyReceipts.Size(), uint(0))
assert.Equal(t, verNode.PendingChunks.Size(), uint(0))
assert.Equal(t, verNode.PendingReceiptIDsByBlock.Size(), uint(0))
assert.Equal(t, verNode.PendingReceipts.Size(), uint(0))
assert.Equal(t, verNode.PendingResults.Size(), uint(0))
assert.Equal(t, verNode.ReceiptIDsByResult.Size(), uint(0))
}
// to demarcate the debug logs
log.Debug().
Int("verification_nodes_count", verNodeCount).
Int("chunk_num", chunkNum).
Msg("TestHappyPath finishes")
}
// SetupMockExeNode creates and returns an execution node and its registered engine in the network (hub)
// it mocks the process method of execution node that on receiving a chunk data pack request from
// a certain verifier node (verIdentity) about a chunk that is assigned to it, replies the chunk back
// data pack back to the node. Otherwise, if the request is not a chunk data pack request, or if the
// requested chunk data pack is not about an assigned chunk to the verifier node (verIdentity), it fails the
// test.
func SetupMockExeNode(t *testing.T,
hub *stub.Hub,
exeIdentity *flow.Identity,
verIdentities flow.IdentityList,
othersIdentity flow.IdentityList,
chainID flow.ChainID,
completeER utils.CompleteExecutionResult) (*mock2.GenericNode, *network.Engine) {
// mock the execution node with a generic node and mocked engine
// to handle request for chunk state
exeNode := testutil.GenericNode(t, hub, exeIdentity, othersIdentity, chainID)
exeEngine := new(network.Engine)
// determines the expected number of result chunk data pack requests
chunkDataPackCount := 0
chunksNum := len(completeER.Receipt.ExecutionResult.Chunks)
for _, chunk := range completeER.Receipt.ExecutionResult.Chunks {
if IsAssigned(chunk.Index, chunksNum) {
chunkDataPackCount++
}
}
exeChunkDataConduit, err := exeNode.Net.Register(engine.ProvideChunks, exeEngine)
assert.Nil(t, err)
chunkNum := len(completeER.ChunkDataPacks)
exeEngine.On("Process", testifymock.Anything, testifymock.Anything).
Run(func(args testifymock.Arguments) {
if originID, ok := args[0].(flow.Identifier); ok {
if req, ok := args[1].(*messages.ChunkDataRequest); ok {
require.True(t, ok)
for i := 0; i < chunkNum; i++ {
chunk, ok := completeER.Receipt.ExecutionResult.Chunks.ByIndex(uint64(i))
require.True(t, ok, "chunk out of range requested")
chunkID := chunk.ID()
if chunkID == req.ChunkID {
if !IsAssigned(chunk.Index, chunksNum) {
require.Error(t, fmt.Errorf(" requested an unassigned chunk data pack %x", req))
}
// publishes the chunk data pack response to the network
res := &messages.ChunkDataResponse{
ChunkDataPack: *completeER.ChunkDataPacks[i],
Nonce: rand.Uint64(),
}
// only non-system chunks have a collection
if !isSystemChunk(uint64(i), chunksNum) {
res.Collection = *completeER.Collections[i]
}
err := exeChunkDataConduit.Unicast(res, originID)
assert.Nil(t, err)
log.Debug().
Hex("origin_id", logging.ID(originID)).
Hex("chunk_id", logging.ID(chunkID)).
Msg("chunk data pack request answered by execution node")
return
}
}
require.Error(t, fmt.Errorf(" requested an unidentifed chunk data pack %v", req))
}
}
require.Error(t, fmt.Errorf("unknown request to execution node %v", args[1]))
}).
Return(nil)
return &exeNode, exeEngine
}
// SetupMockConsensusNode creates and returns a mock consensus node (conIdentity) and its registered engine in the
// network (hub). It mocks the process method of the consensus engine to receive a message from a certain
// verification node (verIdentity) evaluates whether it is a result approval about an assigned chunk to that verifier node.
func SetupMockConsensusNode(t *testing.T,
hub *stub.Hub,
conIdentity *flow.Identity,
verIdentities flow.IdentityList,
othersIdentity flow.IdentityList,
completeER utils.CompleteExecutionResult,
chainID flow.ChainID) (*mock2.GenericNode, *network.Engine, *sync.WaitGroup) {
// determines the expected number of result approvals this node should receive
approvalsCount := 0
chunksNum := len(completeER.Receipt.ExecutionResult.Chunks)
for _, chunk := range completeER.Receipt.ExecutionResult.Chunks {
if IsAssigned(chunk.Index, chunksNum) {
approvalsCount++
}
}
wg := &sync.WaitGroup{}
// each verification node is assigned to `approvalsCount`-many independent chunks
// and there are `len(verIdentities)`-many verification nodes
// so there is a total of len(verIdentities) * approvalsCount expected
// result approvals
wg.Add(len(verIdentities) * approvalsCount)
// mock the consensus node with a generic node and mocked engine to assert
// that the result approval is broadcast
conNode := testutil.GenericNode(t, hub, conIdentity, othersIdentity, chainID)
conEngine := new(network.Engine)
// map form verIds --> result approval ID
resultApprovalSeen := make(map[flow.Identifier]map[flow.Identifier]struct{})
for _, verIdentity := range verIdentities {
resultApprovalSeen[verIdentity.NodeID] = make(map[flow.Identifier]struct{})
}
// creates a hasher for spock
hasher := crypto.NewBLSKMAC(encoding.SPOCKTag)
conEngine.On("Process", testifymock.Anything, testifymock.Anything).
Run(func(args testifymock.Arguments) {
originID, ok := args[0].(flow.Identifier)
assert.True(t, ok)
resultApproval, ok := args[1].(*flow.ResultApproval)
assert.True(t, ok)
log.Debug().
Hex("result_approval_id", logging.ID(resultApproval.ID())).
Msg("result approval received")
// asserts that result approval has not been seen from this
_, ok = resultApprovalSeen[originID][resultApproval.ID()]
assert.False(t, ok)
// marks result approval as seen
resultApprovalSeen[originID][resultApproval.ID()] = struct{}{}
// asserts that the result approval is assigned to the verifier
assert.True(t, IsAssigned(resultApproval.Body.ChunkIndex, chunksNum))
// verifies SPoCK proof of result approval
// against the SPoCK secret of the execution result
//
// retrieves public key of verification node
var pk crypto.PublicKey
found := false
for _, identity := range verIdentities {
if originID == identity.NodeID {
pk = identity.StakingPubKey
found = true
}
}
require.True(t, found)
// verifies proof
valid, err := crypto.SPOCKVerifyAgainstData(
pk,
resultApproval.Body.Spock,
completeER.SpockSecrets[resultApproval.Body.ChunkIndex],
hasher,
)
assert.NoError(t, err)
assert.True(t, valid)
wg.Done()
}).Return(nil)
_, err := conNode.Net.Register(engine.ReceiveApprovals, conEngine)
assert.Nil(t, err)
return &conNode, conEngine, wg
}
// SetupMockVerifierEng sets up a mock verifier engine that asserts the followings:
// - that a set of chunks are delivered to it.
// - that each chunk is delivered exactly once
// SetupMockVerifierEng returns the mock engine and a wait group that unblocks when all ERs are received.
func SetupMockVerifierEng(t testing.TB,
vChunks []*verification.VerifiableChunkData,
completeER *utils.CompleteExecutionResult) (*network.Engine, *sync.WaitGroup) {
eng := new(network.Engine)
// keep track of which verifiable chunks we have received
receivedChunks := make(map[flow.Identifier]struct{})
var (
// decrement the wait group when each verifiable chunk received
wg sync.WaitGroup
// check one verifiable chunk at a time to ensure dupe checking works
mu sync.Mutex
)
// computes expected number of assigned chunks
expected := 0
chunksNum := len(completeER.Receipt.ExecutionResult.Chunks)
for _, c := range vChunks {
if IsAssigned(c.Chunk.Index, chunksNum) {
expected++
}
}
wg.Add(expected)
eng.On("ProcessLocal", testifymock.Anything).
Run(func(args testifymock.Arguments) {
mu.Lock()
defer mu.Unlock()
// the received entity should be a verifiable chunk
vchunk, ok := args[0].(*verification.VerifiableChunkData)
assert.True(t, ok)
// retrieves the content of received chunk
chunk, ok := vchunk.Result.Chunks.ByIndex(vchunk.Chunk.Index)
require.True(t, ok, "chunk out of range requested")
vID := chunk.ID()
// verifies that it has not seen this chunk before
_, alreadySeen := receivedChunks[vID]
if alreadySeen {
t.Logf("received duplicated chunk (id=%s)", vID)
t.Fail()
return
}
// ensure the received chunk matches one we expect
for _, vc := range vChunks {
if chunk.ID() == vID {
// mark it as seen and decrement the waitgroup
receivedChunks[vID] = struct{}{}
// checks end states match as expected
if !bytes.Equal(vchunk.EndState, vc.EndState) {
t.Logf("end states are not equal: expected %x got %x", vchunk.EndState, chunk.EndState)
t.Fail()
}
wg.Done()
return
}
}
// the received chunk doesn't match any expected ERs
t.Logf("received unexpected ER (id=%s)", vID)
t.Fail()
}).
Return(nil)
return eng, &wg
}
func VerifiableDataChunk(chunkIndex uint64, er utils.CompleteExecutionResult) *verification.VerifiableChunkData {
var endState flow.StateCommitment
// last chunk
if int(chunkIndex) == len(er.Receipt.ExecutionResult.Chunks)-1 {
endState = er.Receipt.ExecutionResult.FinalStateCommit
} else {
endState = er.Receipt.ExecutionResult.Chunks[chunkIndex+1].StartState
}
return &verification.VerifiableChunkData{
Chunk: er.Receipt.ExecutionResult.Chunks[chunkIndex],
Header: er.Block.Header,
Result: &er.Receipt.ExecutionResult,
Collection: er.Collections[chunkIndex],
ChunkDataPack: er.ChunkDataPacks[chunkIndex],
EndState: endState,
}
}
// IsAssigned is a helper function that returns true for the even indices in [0, chunkNum-1]
// It also returns true if the index corresponds to the system chunk.
func IsAssigned(index uint64, chunkNum int) bool {
ok := index%2 == 0 || isSystemChunk(index, chunkNum)
return ok
}
// isSystemChunk returns true if the index corresponds to the system chunk, i.e., last chunk in
// the receipt.
func isSystemChunk(index uint64, chunkNum int) bool {
return int(index) == chunkNum-1
}