Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Synchronization] Message queue for collection sync engine #1248

Merged
merged 8 commits into from
Sep 7, 2021

Conversation

durkmurder
Copy link
Member

https://github.com/dapperlabs/flow-go/issues/5807

Context

This PR implements message queues for collection sync engine. Design and implementation is inspired by consensus syng engine, basically same engine is used with needed modifications for collection clusters.

@codecov-commenter
Copy link

codecov-commenter commented Sep 3, 2021

Codecov Report

Merging #1248 (fd0fc3d) into master (7b0ae36) will increase coverage by 0.09%.
The diff coverage is 60.81%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #1248      +/-   ##
==========================================
+ Coverage   56.18%   56.27%   +0.09%     
==========================================
  Files         496      497       +1     
  Lines       30179    30320     +141     
==========================================
+ Hits        16956    17064     +108     
- Misses      10921    10944      +23     
- Partials     2302     2312      +10     
Flag Coverage Δ
unittests 56.27% <60.81%> (+0.09%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
engine/common/synchronization/config.go 42.85% <50.00%> (ø)
...gine/collection/synchronization/request_handler.go 59.49% <59.49%> (ø)
engine/collection/synchronization/engine.go 62.90% <62.12%> (+9.70%) ⬆️
engine/common/synchronization/engine.go 68.78% <100.00%> (ø)
...sus/approvals/assignment_collector_statemachine.go 47.11% <0.00%> (+4.80%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 7b0ae36...fd0fc3d. Read the comment docs.

Copy link
Member

@jordanschalm jordanschalm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this Yurii.

If you haven't already, I think we should smoke test forcing a collection node to catch up with the sync engine on localnet as well with these changes.

engine/collection/synchronization/engine.go Outdated Show resolved Hide resolved
Comment on lines 342 to 343
e.log.Error().Err(err).Msg("could not get last finalized header")
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be fatal if we're going to exit here? Stopping the polling process can cause the node to get permanently out of sync

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have replaced it with continue but my feeling is that we should crash the node at this point. Not retrieving state seems like a fatal error to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my feeling is that we should crash the node at this point

I agree actually, I think it's better to change the log level to Fatal

Comment on lines +318 to 323
if e.pollInterval > 0 {
poll := time.NewTicker(e.pollInterval)
pollChan = poll.C
defer poll.Stop()
}
scan := time.NewTicker(e.scanInterval)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to treat pollInterval and scanInterval the same here? IE handle negative values, and defer <ticker>.Stop() for both?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For execution nodes consensus sync engine passed 0, meaning it doesn't want to poll height.
I don't know if it's needed for collection nodes but no harm in keeping it I guess

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add the defer scanStop() though?

if err != nil {
errs = multierror.Append(errs, fmt.Errorf("could not submit range request (from=%d, to=%d): %w", ran.From, ran.To, err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could add the more informational error message back (and add to consensus engine as well)

if err != nil {
errs = multierror.Append(errs, fmt.Errorf("could not submit batch request (size=%d): %w", len(batch.BlockIDs), err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could add the more informational error message back (and add to consensus engine as well)

Comment on lines +318 to 323
if e.pollInterval > 0 {
poll := time.NewTicker(e.pollInterval)
pollChan = poll.C
defer poll.Stop()
}
scan := time.NewTicker(e.scanInterval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add the defer scanStop() though?

// get the last finalized header
final, err := e.state.Final().Head()
func (e *Engine) pollHeight() {
head, err := e.state.Final().Head()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we instead use FinalizedHeaderCache, like what is done in the common sync engine?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, collection cluster don't support same notification infrastructure as consensus nodes. Don't think it's worth implementing it only for this.

}

if comp == nil {
panic("must initialize synchronization engine with comp engine")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning an error would be more proper for handling the failures.

func (e *Engine) Ready() <-chan struct{} {
e.unit.Launch(e.checkLoop)
return e.unit.Ready()
e.lm.OnStart(func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are using unit it is safer to first invoke unit.Ready. This follows the lifecycle assumptions of the unit.

@jordanschalm
Copy link
Member

Cluster syncing is looking good on localnet on this PR.

@durkmurder
Copy link
Member Author

bors merge

@bors
Copy link
Contributor

bors bot commented Sep 7, 2021

@bors bors bot merged commit a63b2ba into master Sep 7, 2021
@bors bors bot deleted the yurii/5807-collection-sync-engine-refactoring branch September 7, 2021 19:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants