Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access node] implement new splitter engine #947

Merged
merged 49 commits into from
Jul 20, 2021
Merged

Conversation

synzhu
Copy link
Contributor

@synzhu synzhu commented Jul 12, 2021

Implements a new splitter engine described in https://github.com/dapperlabs/flow-go/issues/5669.

We also create a new splitter network implementation that can be passed into other engines to allow multiple engines to register for messages on the same network channel.

closes dapperlabs/flow-go#5669

turbolent and others added 4 commits July 12, 2021 11:33
Co-authored-by: turbolent <turbolent@users.noreply.github.com>
Co-authored-by: Janez Podhostnik <67895329+janezpodhostnik@users.noreply.github.com>
@vishalchangrani vishalchangrani changed the base branch from master to smnzhu/network-multi-channel July 13, 2021 00:51
@onflow onflow deleted a comment from codecov-commenter Jul 13, 2021
Copy link
Member

@AlexHentschel AlexHentschel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether this is suggestion is possible within the scope of this PR:

  • your multiplexer.Engine essentially only implements a multiplexing on the Process and Submit method, but not the SubmitLocal or ProcessLocal:

    • From a high-level, engines are vertices in a data flow graph. The fact that the networking layer can feed data into engines is an auxiliary functionality, but not the primary purpose of engines.
    • Your multiplexer.Engine is primarily an engine, but your implementation is focused on networking purposes only, neglecting other functions that are vital to an engine's interface (i.e. SubmitLocal or ProcessLocal).

    You are putting the channel as the engine's primary focus even though it should not be.

  • I think we have two options:

    1. multiplexer.Engine delegates all calls to the wrapped engines. I think this would be fine, as we can assemble any data flow pattern through multiplexers and engines that filter based on channel.
      • so instead of having one multiplexer that is "channel aware" (playmobil approach), create a multiplexer that only consumes messages for one channel. Thereby, the multiplexer can forward all calls to the engines it wraps. Then it would be truly implementing the Engine interface and multiplex all calls to the wrapped engines.
      • In the multiplexer.network you can create one dedicated multiplexer.Engine per channel.
    2. We implement a multiplexer for the MessageProcessor interface and remove the Engine interface from the networking layer into the module package (there exists already an engine interface there, which is wrapping network.Engine)

return e, nil
}

func (e *Engine) RegisterEngine(channel network.Channel, engine network.Engine) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️⚠️
This requires concurrency handling! With the current implementation, there is no guarantee whatsoever that a registered engine will ever receive any messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

engine/common/multiplexer/engine.go Outdated Show resolved Hide resolved
Copy link
Contributor

@huitseeker huitseeker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This in general looks great, I left a few comments.

)

type Engine struct {
mu sync.RWMutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: something to indicate the coverage of the Mutex might help, e.g. enginesMu


// process calls the given function in parallel for all the engines that have
// registered with this splitter.
func (e *Engine) process(f func(module.Engine) error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: f is the one argument I'd really like to track here, having it be a longer string would help.

// Register will subscribe the given engine with the spitter on the given channel, and all registered
// engines will be notified with incoming messages on the channel.
// The returned Conduit can be used to send messages to engines on other nodes subscribed to the same channel
func (n *Network) Register(channel network.Channel, e network.Engine) (network.Conduit, error) {
Copy link
Contributor

@huitseeker huitseeker Jul 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, this may be a comment sitting across this PR and #984 but:

I see the appeal in a generic splitter, but I would also appreciate a strong godoc paragraph at the beginning of the engine that lays out where and when the splitter receives its channel processing obligations over time, and how it maintains them.

The topology of the splitter is clear, but to Vishal's point the channel responsibilities over the lifecycle of the component are a bit harder to tease out when channels are per-method parameters.

Another way to do this would be to write a mixed splitter / relay runnable example that would demonstrate this over time (esp. w/ unsubscribes), and this is stg we can defer to a further PR

@synzhu
Copy link
Contributor Author

synzhu commented Jul 19, 2021

Thanks for the suggestions @AlexHentschel, I knew that the distinction between Process and Submit had to do with synchronous vs asynchronous processing, but I wasn't aware that the convention for Process is that all the logic runs in the calling thread.

I mostly agree with your point about splitter engine's Submit method, will make those changes. Originally I had implemented it this way because it resulted in more concise code since a single function implements most of the logic. There is one thing I don't think will be doable:

SubmitLocal and Submit: any returned errors are logged as fatal.

The Submit method does not return anything (because the convention is that processing is asynchronous and errors are merely logged), so if the splitter engine's Submit calls into the Submit methods of the downstream engines, we will not be able to capture any downstream errors. The alternative is for splitter engine's Submit to call in to the downstream engines' Process methods, but this would require starting a new goroutine for each downstream engine. Ultimately I don't think any of this is necessary, as convention already states that any caller who wants to know about unexpected problems that occur should be calling the splitter engine's Process method rather than Submit in the first place.

As for the Process method however, I'm not sure how I feel about calling each downstream engine sequentially in an unspecified order. I guess the long term solution is to move towards a non-blocking API for the application layer, but currently if we are to obey the conventions then it seems there would be no way to create an engine that actually calls the processing logic on each downstream engine in parallel (which is arguably what we really want) because network will always call splitter's Process method.

I think a possible alternative here is to still start a separate goroutine for each downstream engine, but capture all the errors and return them in a multierror instead of logging them and returning nil. This would allow us to still satisfy the following:

Therefore, the core business logic can return errors, which we should capture and propagate.

Furthermore, using the WaitGroup allows us to still block the caller until all of the engines are finished executing their processing logic. It seems like these two properties are the crux of what we really care about?

I remember this book saying that goroutines are actually very lightweight, and we normally don't have to worry about creating them since they create minimal overhead. In the case of the splitter engine's Process method, I'd argue the tradeoff between goroutine overhead and the amount of processing time we could save by calling the downstream engines in parallel instead of sequentially is one that it may make sense to make

However, I don't have a strong opinion on this given that we will probably be moving towards MessageConsumer API eventually anyways.

@codecov-commenter
Copy link

codecov-commenter commented Jul 20, 2021

Codecov Report

Merging #947 (61e30f4) into master (e35d792) will increase coverage by 0.00%.
The diff coverage is 55.44%.

Impacted file tree graph

@@           Coverage Diff            @@
##           master     #947    +/-   ##
========================================
  Coverage   54.81%   54.81%            
========================================
  Files         277      279     +2     
  Lines       18548    18649   +101     
========================================
+ Hits        10167    10223    +56     
- Misses       7006     7047    +41     
- Partials     1375     1379     +4     
Flag Coverage Δ
unittests 54.81% <55.44%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
network/p2p/network.go 0.00% <ø> (ø)
engine/common/splitter/network/network.go 50.00% <50.00%> (ø)
engine/common/splitter/engine.go 60.37% <60.37%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update e35d792...61e30f4. Read the comment docs.

@synzhu
Copy link
Contributor Author

synzhu commented Jul 20, 2021

@AlexHentschel please take a look at the latest changes and lmk what you think

@huitseeker huitseeker self-requested a review July 20, 2021 21:10
Copy link
Member

@AlexHentschel AlexHentschel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

@huitseeker huitseeker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought you could simplify the multierror logic with a multierror.Group (from multierror 1.1), but I'm not happy with the logic in the WaitGroup therein, and your approach is in the end fine.

@synzhu synzhu merged commit 62ce19f into master Jul 20, 2021
@synzhu synzhu deleted the smnzhu/multiplexer-engine branch July 20, 2021 23:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants