Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Verification] Fixes flakey tests and race conditions #919

Merged
merged 14 commits into from
Jul 9, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
156 changes: 80 additions & 76 deletions engine/verification/fetcher/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func testProcessAssignChunkHappyPath(t *testing.T, chunkNum int, assignedNum int
unittest.RequireReturnsBefore(t, verifierWG.Wait, 1*time.Second, "could not push verifiable chunk on time")
unittest.RequireReturnsBefore(t, processWG.Wait, 1*time.Second, "could not process chunks on time")

mock.AssertExpectationsForObjects(t, s.results, s.requester, s.pendingChunks, s.verifier, s.chunkConsumerNotifier, s.metrics)
mock.AssertExpectationsForObjects(t, s.results, s.requester, s.pendingChunks, s.chunkConsumerNotifier, s.metrics)
}

// TestChunkResponse_RemovingStatusFails evaluates behavior of fetcher engine respect to receiving duplicate and concurrent
Expand Down Expand Up @@ -561,27 +561,28 @@ func mockStateAtBlockIDForMissingIdentities(state *protocol.State, blockID flow.
func mockPendingChunksAdd(t *testing.T, pendingChunks *mempool.ChunkStatuses, list []*verification.ChunkStatus, added bool) {
mu := &sync.Mutex{}

pendingChunks.On("Add", mock.Anything).Run(func(args mock.Arguments) {
// to provide mutual exclusion under concurrent invocations.
mu.Lock()
defer mu.Unlock()
pendingChunks.On("Add", mock.Anything).
Run(func(args mock.Arguments) {
// to provide mutual exclusion under concurrent invocations.
mu.Lock()
defer mu.Unlock()

actual, ok := args[0].(*verification.ChunkStatus)
require.True(t, ok)
actual, ok := args[0].(*verification.ChunkStatus)
require.True(t, ok)

// there should be a matching chunk status with the received one.
statusID := actual.ID()
// there should be a matching chunk status with the received one.
statusID := actual.ID()

for _, expected := range list {
expectedID := expected.ID()
if expectedID == statusID {
require.Equal(t, expected.ExecutionResult, actual.ExecutionResult)
return
for _, expected := range list {
expectedID := expected.ID()
if expectedID == statusID {
require.Equal(t, expected.ExecutionResult, actual.ExecutionResult)
return
}
}
}

require.Fail(t, "tried adding an unexpected chunk status to mempool")
}).Return(added).Times(len(list))
require.Fail(t, "tried adding an unexpected chunk status to mempool")
}).Return(added).Times(len(list))
}

// mockPendingChunksRem mocks the remove method of pending chunks for expecting only the specified list of chunk statuses.
Expand All @@ -590,24 +591,25 @@ func mockPendingChunksAdd(t *testing.T, pendingChunks *mempool.ChunkStatuses, li
func mockPendingChunksRem(t *testing.T, pendingChunks *mempool.ChunkStatuses, list []*verification.ChunkStatus, removed bool) {
mu := &sync.Mutex{}

pendingChunks.On("Rem", mock.Anything).Run(func(args mock.Arguments) {
// to provide mutual exclusion under concurrent invocations.
mu.Lock()
defer mu.Unlock()
pendingChunks.On("Rem", mock.Anything).
Run(func(args mock.Arguments) {
// to provide mutual exclusion under concurrent invocations.
mu.Lock()
defer mu.Unlock()

actual, ok := args[0].(flow.Identifier)
require.True(t, ok)
actual, ok := args[0].(flow.Identifier)
require.True(t, ok)

// there should be a matching chunk status with the received one.
for _, expected := range list {
expectedID := expected.ID()
if expectedID == actual {
return
// there should be a matching chunk status with the received one.
for _, expected := range list {
expectedID := expected.ID()
if expectedID == actual {
return
}
}
}

require.Fail(t, "tried removing an unexpected chunk status to mempool")
}).Return(removed).Times(len(list))
require.Fail(t, "tried removing an unexpected chunk status to mempool")
}).Return(removed).Times(len(list))
}

// mockPendingChunksByID mocks the ByID method of pending chunks for expecting only the specified list of chunk statuses.
Expand Down Expand Up @@ -650,42 +652,43 @@ func mockVerifierEngine(t *testing.T,

seen := make(map[flow.Identifier]struct{})

verifier.On("ProcessLocal", mock.Anything).Run(func(args mock.Arguments) {
mu.Lock()
defer mu.Unlock()
verifier.On("ProcessLocal", mock.Anything).
Run(func(args mock.Arguments) {
mu.Lock()
defer mu.Unlock()

vc, ok := args[0].(*verification.VerifiableChunkData)
require.True(t, ok)
vc, ok := args[0].(*verification.VerifiableChunkData)
require.True(t, ok)

// verifiable chunk data should be distinct.
_, ok = seen[vc.Chunk.ID()]
require.False(t, ok, "duplicated verifiable chunk received")
seen[vc.Chunk.ID()] = struct{}{}
// verifiable chunk data should be distinct.
_, ok = seen[vc.Chunk.ID()]
require.False(t, ok, "duplicated verifiable chunk received")
seen[vc.Chunk.ID()] = struct{}{}

// we should expect this verifiable chunk and its fields should match our expectation
expected, ok := verifiableChunks[vc.Chunk.ID()]
require.True(t, ok, "verifier engine received an unknown verifiable chunk data")
// we should expect this verifiable chunk and its fields should match our expectation
expected, ok := verifiableChunks[vc.Chunk.ID()]
require.True(t, ok, "verifier engine received an unknown verifiable chunk data")

if vc.IsSystemChunk {
// system chunk has an empty collection
require.Equal(t, vc.Collection.Len(), 0)
} else {
require.Equal(t, expected.Collection.ID(), vc.Collection.ID())
}
if vc.IsSystemChunk {
// system chunk has an empty collection
require.Equal(t, vc.Collection.Len(), 0)
} else {
require.Equal(t, expected.Collection.ID(), vc.Collection.ID())
}

require.Equal(t, *expected.ChunkDataPack, *vc.ChunkDataPack)
require.Equal(t, expected.Result.ID(), vc.Result.ID())
require.Equal(t, expected.Header.ID(), vc.Header.ID())
require.Equal(t, *expected.ChunkDataPack, *vc.ChunkDataPack)
require.Equal(t, expected.Result.ID(), vc.Result.ID())
require.Equal(t, expected.Header.ID(), vc.Header.ID())

isSystemChunk := fetcher.IsSystemChunk(vc.Chunk.Index, vc.Result)
require.Equal(t, isSystemChunk, vc.IsSystemChunk)
isSystemChunk := fetcher.IsSystemChunk(vc.Chunk.Index, vc.Result)
require.Equal(t, isSystemChunk, vc.IsSystemChunk)

endState, err := fetcher.EndStateCommitment(vc.Result, vc.Chunk.Index, isSystemChunk)
require.NoError(t, err)
endState, err := fetcher.EndStateCommitment(vc.Result, vc.Chunk.Index, isSystemChunk)
require.NoError(t, err)

require.Equal(t, endState, vc.EndState)
wg.Done()
}).Return(nil).Times(len(verifiableChunks))
require.Equal(t, endState, vc.EndState)
wg.Done()
}).Return(nil)

return wg
}
Expand Down Expand Up @@ -741,29 +744,30 @@ func mockRequester(t *testing.T, requester *mockfetcher.ChunkDataPackRequester,
mu := sync.Mutex{}
wg := &sync.WaitGroup{}
wg.Add(len(requests))
requester.On("Request", mock.Anything).Run(func(args mock.Arguments) {
mu.Lock()
defer mu.Unlock()
requester.On("Request", mock.Anything).
Run(func(args mock.Arguments) {
mu.Lock()
defer mu.Unlock()

actualRequest, ok := args[0].(*verification.ChunkDataPackRequest)
require.True(t, ok)
actualRequest, ok := args[0].(*verification.ChunkDataPackRequest)
require.True(t, ok)

expectedRequest, ok := requests[actualRequest.ChunkID]
require.True(t, ok, "requester received an unexpected chunk request")
expectedRequest, ok := requests[actualRequest.ChunkID]
require.True(t, ok, "requester received an unexpected chunk request")

require.Equal(t, *expectedRequest, *actualRequest)
require.Equal(t, *expectedRequest, *actualRequest)

go func() {
cdp, ok := chunkDataPacks[actualRequest.ChunkID]
require.True(t, ok)
go func() {
cdp, ok := chunkDataPacks[actualRequest.ChunkID]
require.True(t, ok)

collection, ok := collections[actualRequest.ChunkID]
require.True(t, ok)
collection, ok := collections[actualRequest.ChunkID]
require.True(t, ok)

handler(actualRequest.Agrees[0], cdp, collection)
wg.Done()
}()
}).Return().Times(len(requests))
handler(actualRequest.Agrees[0], cdp, collection)
wg.Done()
}()
}).Return()

return wg
}
Expand Down