Skip to content

Commit

Permalink
[Verification] rolls-in persistent architecture (#728)
Browse files Browse the repository at this point in the history
* adds more godocs

* adds qualifier

* adds concurrency support for testing update on chunk data requests

* adds godoc for qualifier

* adds incremental qualifier and wires it to the code

* adds more godoc for incremental qualifier

* changes requesting logic

* refactors interface of request qualifier

* adds qualifying requests logic to requester

* adds retry after field to chunk requests

* adds increment attempt and retry after

* adds targets to chunk request status

* resolves fixtures and tests after refactoring chunk data request

* refactors interface of chunk data pack requester

* removes commented parts

* refactors chunk requests interface to only deal with chunk data requests

* adds id for chunk data request

* fixes lint issues

* adds attempt as return value of by ID

* refactors broken tests

* re-generate mocks

* adds chunk data request list fixture

* fixes test for pending chunk with sealed block

* fixes chunk data pack request fixtures target ID

* adds chunk data pack request list

* fixes requesting chunk data pack

* fixes test request pending chunk data pack

* fuxes test request pending chunk sealed block hybrid

* cleans up fixtures for chunk request status

* makes chunk request status private

* refactors behavior on duplicate locators

* Update engine/verification/fetcher/engine.go

Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>

* Update engine/verification/fetcher/engine.go

Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>

* Update engine/verification/fetcher/engine.go

Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>

* refactors checking node status at block ID

* fixes shallow copy

* makes an error nested

* Update utils/unittest/fixtures.go

Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>

* caches last sealed on local variable extrinsic to loop

* fixes lint issue

* Update engine/verification/requester/requester_test.go

Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>

* Update utils/unittest/fixtures.go

Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>

* Update engine/verification/fetcher/engine_test.go

Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>

* Update engine/verification/fetcher/engine_test.go

Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>

* fixes lints and typos on requester tests

* adds comment for testcase

* adds timeout for ready and done methods of engine in tests

* Update engine/verification/fetcher/engine_test.go

Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>

* Update engine/verification/fetcher/engine_test.go

Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>

* Update engine/verification/fetcher/engine_test.go

Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>

* fixes lint

* removing unnecessary log

* adds comment for a fatal level log

* consolidates a log at requester engine

* adds capacity to chunklist of chunk status

* runs consumer check asynchronously

* refactors loop variable

* fixes lint

* fixes lint

* fixes lint issues and interface

* refactors chunk request and moves request info to request status

* refactors update retry after

* adds request info

* refactors attempt type

* updates mock

* adds qualifier functions

* fixes mocking issue

* adds mock for qualifier

* fixes tests with qualifier

* moves qualifier functions to utils and generates exponential backoff

* fixes lint issues.

* removes request qualifier

* migrates qualifier logic to requester engine

* moves exponential backoff to chunk requests

* adds history update functions

* adds godoc

* refactors tests with updater

* adds with updater scenario

* adds draft for retry after test in exponential mode

* updates exponential backup behavior

* adds tests for chunk request updater

* adds comment for with updater scenario

* adds godoc for exponential updater

* fixes mock for pending chunks

* fixes requester tests

* logs updating dispatched requests

* updates chunk requests interface

* adds contains for identifier list

* refactors test with new qualifier

* fixes tests with request info and update mocking

* fixes a bug on requester side

* adds test dispatching requests on hybrid case

* removes unlimited qualifier

* adds godoc to requester

* fixes log

* fixes tests

* refactors test organization

* adds more godoc to the tests

* adds more comment to tests

* adds godoc for request qualifier methods

* minor refactors on chunk requests

* fixes lint issues with tests

* adds more test to chunk requests

* fixes lint issues

* moves test setup to mock package and refactors tests

* refactors test helpers

* refactors tracers and loggers on tests

* adds todos

* fixes lint issue after merge

* refactors mock execution node in verification tests

* adds chunk data pack of helper

* moves fixture to the new package and refactors setup mock chunk provider

* refactors tests with complete execution receipt list methods

* adds godocs for test helpers

* adds chunk of  test helper

* enables mock consensus node for multiple results and refactors tests

* adds batch ready and done helpers to unittest

* adds done helper for generic node

* reorganizes new verification node constructor

* adds handler engine registration method and its implementation

* fixes lint issues in verification tests

* wires up the assigner and fetcher tests

* addresses system chunk collection verification

* avoids stopping verification node on tests

* fixes collection ID of system chunk on fixtures

* adds logger to consumer module

* fixes lint issues

* fixes a bug with chunk consumer notifier

* wip with assigner and fetcher tests

* refactors imports

* fixes import cycle and mocking issues

* work in progress with system chunk

* encapsulates validating chunks and fixes notifying sealed blocks

* fixes tests on fetcher engine

* fixes executor id for execution results fixture

* fixes integration test

* cleans up tests

* adds documentation

* refactors tests

* decouples validating collections

* cleans up tests

* refactors a godoc

* adds godoc to the test.

* adds mock assertions

* fixes documentations

* fixes log messages

* fixes logs

* fixes tests

* adds metric collector interface

* refactors metrics name

* adds metrics variables

* adds verification name spaces

* adds chunk locator ID method to chunk status

* extends error messages with identifiers

* refactors order of calling consumer on sealed block

* Update engine/verification/test/assignerfetcher_test.go

Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>

* Update engine/verification/test/assignerfetcher_test.go

Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>

* fixes lints

* refactors ready-done-aware test helpers

* refactors tests with require component ready and done

* fixes godocs

* fixes namespace

* adds metrics and registrations

* implements metrics methods

* wires in metrics to fetcher and requester

* fixes TestProcessAssignedChunkHappyPath

* fixes TestChunkResponse_RemovingStatusFails

* fixes TestProcessAssignChunkSealedAfterRequest

* fixes TestChunkResponse_InvalidChunkDataPack TestChunkResponse_MissingStatus

* fixes TestSkipChunkOfSealedBlock

* refactors metrics to mock in requester tests

* fixes TestHandleChunkDataPack_HappyPath

* fixes TestCompleteRequestingUnsealedChunkLifeCycle

* fixes TestRequestPendingChunkSealedBlock_Hybrid

* fixes TestDispatchingRequests_Hybrid

* reorganizes the code

* adds metrics support for chunk status

* adds metrics for chunk requests mempool size

* generates mocks for mempools

* adds tracer labels

* adds context to chunk status

* adds tracer to fetcher engine

* adds tracer for pushing chunk to verifier and validating it

* adds tracer labels for requester engine

* adds tracer to requester engine

* removes is system chunk from match engine

* moves happy path verification test to utils

* refactors demo with happy path

* fixes lint issues

* adds random calls for our new metrics

* refactors tracing on fetcher engine

* fixes godocs

* refactors metrics

* WIP

* adds chunk request mempool

* wires in new engines

* fixes dependency issue in cmd

* swaps component orders

* removes handler checking from requester

* reorganizes component initializations

* reports error on block consumer initialization

* separates chunk consumer and block consumer as separate modules

* removes checking handler fatal logs

* adds comments for verification parameters

* fixes lint

* updates mock network

* migrates constants and refactors data types

* fixes a comment

* refactors bootstrapping fetcher and requester as modules

* groups requester and fetcher in the same component

* wires in done dependency

* fixes lint issues

* sets chunk and block workers

Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>
  • Loading branch information
yhassanzadeh13 and jordanschalm committed Jun 11, 2021
1 parent 0272226 commit d5a3844
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 214 deletions.
333 changes: 134 additions & 199 deletions cmd/verification/main.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion engine/verification/assigner/blockconsumer/consumer.go
Expand Up @@ -15,7 +15,7 @@ import (
)

// DefaultBlockWorkers is the number of blocks processed in parallel.
const DefaultBlockWorkers = uint64(5)
const DefaultBlockWorkers = uint64(2)

// BlockConsumer listens to the OnFinalizedBlock event
// and notifies the consumer to check in the job queue
Expand Down
24 changes: 12 additions & 12 deletions engine/verification/fetcher/chunkconsumer/consumer.go
Expand Up @@ -13,33 +13,37 @@ import (

const (
DefaultJobIndex = uint64(0)
DefaultChunkWorkers = uint64(20)
DefaultChunkWorkers = uint64(5)
)

// ChunkConsumer consumes the jobs from the job queue, and pass it to the
// Worker for processing.
// It wraps the generic job consumer in order to be used as a ReadyDoneAware
// on startup
type ChunkConsumer struct {
consumer module.JobConsumer
consumer module.JobConsumer
chunkProcessor fetcher.AssignedChunkProcessor
}

func NewChunkConsumer(
log zerolog.Logger,
processedIndex storage.ConsumerProgress, // to persist the processed index
chunksQueue storage.ChunksQueue, // to read jobs (chunks) from
engine fetcher.AssignedChunkProcessor, // to process jobs (chunks)
chunkProcessor fetcher.AssignedChunkProcessor, // to process jobs (chunks)
maxProcessing uint64, // max number of jobs to be processed in parallel
) *ChunkConsumer {
worker := NewWorker(engine)
engine.WithChunkConsumerNotifier(worker)
worker := NewWorker(chunkProcessor)
chunkProcessor.WithChunkConsumerNotifier(worker)

jobs := &ChunkJobs{locators: chunksQueue}

lg := log.With().Str("module", "chunk_consumer").Logger()
consumer := jobqueue.NewConsumer(lg, jobs, processedIndex, worker, maxProcessing)

chunkConsumer := &ChunkConsumer{consumer}
chunkConsumer := &ChunkConsumer{
consumer: consumer,
chunkProcessor: chunkProcessor,
}

worker.consumer = chunkConsumer

Expand All @@ -65,15 +69,11 @@ func (c *ChunkConsumer) Ready() <-chan struct{} {
panic(fmt.Errorf("could not start the chunk consumer for match engine: %w", err))
}

ready := make(chan struct{})
close(ready)
return ready
return c.chunkProcessor.Ready()
}

func (c *ChunkConsumer) Done() <-chan struct{} {
c.consumer.Stop()

ready := make(chan struct{})
close(ready)
return ready
return c.chunkProcessor.Done()
}
12 changes: 12 additions & 0 deletions engine/verification/fetcher/chunkconsumer/consumer_test.go
Expand Up @@ -171,6 +171,18 @@ type mockChunkProcessor struct {
process func(notifier module.ProcessingNotifier, locator *chunks.Locator)
}

func (e *mockChunkProcessor) Ready() <-chan struct{} {
ready := make(chan struct{})
close(ready)
return ready
}

func (e *mockChunkProcessor) Done() <-chan struct{} {
done := make(chan struct{})
close(done)
return done
}

func (e *mockChunkProcessor) ProcessAssignedChunk(locator *chunks.Locator) {
e.process(e.notifier, locator)
}
Expand Down
11 changes: 9 additions & 2 deletions engine/verification/fetcher/engine.go
Expand Up @@ -97,12 +97,19 @@ func (e *Engine) WithChunkConsumerNotifier(notifier module.ProcessingNotifier) {

// Ready initializes the engine and returns a channel that is closed when the initialization is done
func (e *Engine) Ready() <-chan struct{} {
return e.unit.Ready()
if e.chunkConsumerNotifier == nil {
e.log.Fatal().Msg("missing chunk consumer notifier callback in verification fetcher engine")
}
return e.unit.Ready(func() {
<-e.requester.Ready()
})
}

// Done terminates the engine and returns a channel that is closed when the termination is done
func (e *Engine) Done() <-chan struct{} {
return e.unit.Done()
return e.unit.Done(func() {
<-e.requester.Done()
})
}

// ProcessAssignedChunk is the entry point of fetcher engine.
Expand Down
32 changes: 32 additions & 0 deletions engine/verification/fetcher/mock/assigned_chunk_processor.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions engine/verification/fetcher/mock/chunk_data_pack_requester.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions engine/verification/fetcher/processor.go
Expand Up @@ -14,6 +14,7 @@ import (
// | Chunk Queue | ---> assigned chunk locators --> | Chunk Consumer | ---> chunk workers --> | Assigned Chunk Processor|
// ---------------- ------------------ ---------------------------
type AssignedChunkProcessor interface {
module.ReadyDoneAware
// ProcessAssignedChunk receives an assigned chunk locator and processes its corresponding chunk.
// A chunk processor is expected to shape a verifiable chunk out of the assigned chunk, and pass it to
// the verifier Engine.
Expand Down
2 changes: 2 additions & 0 deletions engine/verification/fetcher/requester.go
Expand Up @@ -3,10 +3,12 @@ package fetcher
import (
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/verification"
"github.com/onflow/flow-go/module"
)

// ChunkDataPackRequester encapsulates the logic of requesting a chunk data pack from an execution node.
type ChunkDataPackRequester interface {
module.ReadyDoneAware
// Request makes the request of chunk data pack for the specified chunk ID with the specified targets.
Request(request *verification.ChunkDataPackRequest)

Expand Down

0 comments on commit d5a3844

Please sign in to comment.