Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FLIP for Handing Messages from Networking Layer to Engines #343

Merged
merged 6 commits into from
Sep 30, 2021

Conversation

AlexHentschel
Copy link
Member

@AlexHentschel AlexHentschel commented Feb 2, 2021

FLIP 343 to generalize the API though which the Networking Layer hands messages to Engines

@AlexHentschel AlexHentschel self-assigned this Feb 2, 2021
@vercel
Copy link

vercel bot commented Feb 2, 2021

This pull request is being automatically deployed with Vercel (learn more).
To see the status of your deployment, click below or on the icon next to each commit.

🔍 Inspect: https://vercel.com/onflow/flow-docs/7YV9SjxcWYRV3FRoQ9QqBqNtsTDM
✅ Preview: https://flow-docs-git-alex-networking-api-flip-onflow.vercel.app

Copy link
Member

@jordanschalm jordanschalm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great writeup 👏 . Generally very much in favour of this change. A few suggestions and comments noted below.

Networking Layer Behaviour

Something you point out here is that the application layer is better positioned to manage queued messages, so it should be able to manage message queueing. I think we should take the full step here and say the engine must manage message queueing and implementations of MessageProcessor must be non-blocking.

This makes the separation of concerns more clear: the networking layer passes messages to engines as quickly as it can, does not store/queue messages, and will drop a message if the MessageProcessor doesn't accept it; the engine is responsible for all queueing, management of the message queue, and message processing. I also think this is necessary to fully solve the problem you point out:

If one of the engines (e.g. Engine C) gets overwhelmed with large number of messages, this engine will likely delay messages for all other engines as the workers will soon all be blocked by the overwhelmed engine.

Allowing non-blocking MessageProcessors isn't sufficient to fix this -- we need to enforce that they are non-blocking. Explicit use of channels in the API would make this fairly simple to implement:

// in networking layer
channel := messageProcessor.RecvChan()
select {
    case channel <- msg:
    case time.After(timeout):
        log.Str("msg_type", msg.Type()).Msg("dropped message")
}

Interface Changes

I think we should remove the network.Engine interface and the SubmitLocal, Process functions that are copied in every engine to satisfy it. These are rarely used and only exist because network.Engine was originally created with more methods than it needed. I think we should replace the instances of in-process engines communicating using these methods with context-specific methods (ie. rather than doing engine.ProcessLocal(block), do engine.HandleBlock(block).

Minor suggestion: make MessageProcessor an interface rather than a function type to discourage the use of anonymous functions.

@vishalchangrani
Copy link
Contributor

vishalchangrani commented Feb 3, 2021

Thanks for the write up.
The networking layer does not really care about the Engine. To keep it simple and easy to test, we can have the Register function defined as,

type IncomingMessage struct {
	originID flow.Identifier
	event    interface{}
}

type Network interface {
	Register(channel network.Channel, receiveChannel chan<- IncomingMessage) (network.Conduit, error)
}

Then following on what Jordan mentioned, the network layer will try to push the incoming message to the receiveChannel.

I can remove the priority queue in the networking layer since it will be redundant after the queuing has been added to the engine.

Alexander Hentschel added 2 commits February 3, 2021 16:51
…Process` methods (from the interface) and replace them with context-specific methods
@AlexHentschel
Copy link
Member Author

AlexHentschel commented Feb 4, 2021

Thanks @jordanschalm and @vishalchangrani for the great feedback and detailed suggestions.

Updated Flip

incorporated your suggestions:

  • made MessageConsumer an interface rather than a function type to discourage the use of anonymous functions
  • The API now states that engine must manage message queueing and implementations of MessageConsumer must be non-blocking
  • added suggestion to remove the network.Engine's SubmitLocal and Process methods (from the interface) and replace them with context-specific methods
  • added step to remove the priority queue in the networking layer

Regarding the suggestion to use a channel as part of the API:

Register(channel network.Channel, receiveChannel chan<- IncomingMessage) (network.Conduit, error)

I feel that channels are fairly restrictive in such high-level APIs. Some thoughts:

  • The argument pro-channel is that we can force message drops after some timeout expires, as @jordanschalm suggested. However, what is a good amount for that timeout?

  • A channel by design has a fixed capacity and we can't easily drop elements from it (I think?). So if we want a more flexible queue (e.g. unbounded queue; priority queue; ability to filter out expired messages; etc), then we would need a dedicated routine to pick up the incoming messages from the channel and feed them into the queue.

    • I think for such a practically non-blocking task, we could use the thread from the networking layer without the need to add another pipeline step (shovelling the messages from the messages from the channel to the queue)
    • And if we want to use a plain old queue for engine-internal message buffering, we can implement the Consume method exactly as @jordanschalm suggested.

    That way, the implementation supports wider array of use cases.

@vishalchangrani
Copy link
Contributor

@AlexHentschel - agree with what you say - we would need boilerplate code if we use channels for sure.

However, I would like to suggest that the MessageConsumer interface definition lives somewhere in the network package (if you were not already thinking of doing that). That way network test don't have to look into any other package outside network.

Thanks for putting this proposal together 👍

@AlexHentschel
Copy link
Member Author

AlexHentschel commented Feb 4, 2021

MessageConsumer interface definition lives somewhere in the network package

💯

@zhangchiqing
Copy link
Member

Great write up and improvement! 👍 I like the idea to simplify the network interface.

Just have some comments:

  1. We need to ensure the network knows which engine to deliver the message to. And each message must map to at most one engine (not two), otherwise it creates confusion. I think this has been ensured by the Register function. (I have more ideas about this, but irrelevant to this topic)
  2. In order to ensure each engine is processing message in a non-blocking manner, they now must place a queue in front to buffer the inbound network messages.
  3. It makes sense to implement a generic queue in a base engine to be reused across all engines.
  4. The base engine could be a different engine than engine.Unit, because not all engines will consume network messages, but engines all need to be ReadyDoneAware, which is what engine.Unit offers.
  5. The queue could be either a simple bounded channel. But what if the channel gets full? I think for now it’s fine to drop messages when full. Yes, important message might get dropped, but we are still able to capture this event, and report metrics for debugging purpose. And the application should always have re-fetch mechanism for important messages. (side note, I think this FLIP is mostly focusing on the network interface design, so we could discuss how to adjust the application level design in different thread)
  6. In terms of whether to keep the networking priority queue, it currently serves a purpose to know how many messages are blocked in the network layer, and what kind of messages are they. This is useful for debugging problems when an engine hasn’t received an expected message. For instance, a collection node has sent a collection to an execution node, but the execution node’s ingestion engine never receives it. In order to debug, we could look into the priority network queue, and see if there are messages blocked there. So the only downside of removing the priority queue is the loss of this helpful information for debugging. As long as we can monitor the number messages queued in network layer and also in each engine’s queue, then I think we could remove the network priority queue, which I don’t think is hard to do.

@jordanschalm
Copy link
Member

The argument pro-channel is that we can force message drops after some timeout expires, as @jordanschalm suggested. However, what is a good amount for that timeout?

The queue could be either a simple bounded channel.

Some clarification: the idea is to have some bound on the invocation of Consume so we can be sure the networking layer threads aren't getting blocked by an engine that decided to use a blocking Consume function. The networking layer workers would be safe from getting blocked regardless of how the engines are implemented. The channel wouldn't be for queueing messages, it would be for enforcing "non-blocking-ness" in the API (in fact probably the channel should be unbuffered).

But I agree with Alex's point that this is restrictive. Given that it's a private API I'm happy to not enforce the non-blocking requirement the way I suggested. We could detect misbehaving engines with metrics on the timing of Consume instead.

@vishalchangrani
Copy link
Contributor

Spoke with @yhassanzadeh13 and @noisygerman and we think it will be best to retain the message queue at the networking layer even after the queuing logic has been added to the individual engines because:

  1. The queue on the networking layer helps to pinpoint bottlenecks on the networking side versus the application side.
  2. Help provide some control over the rate of inbound messages

However, once the engines have the queue implemented, the networking queue can:

  1. Have far greater number of workers (1000 or more) to never starve the engines.
  2. Be a simple queue instead of a priority queue.

@AlexHentschel
Copy link
Member Author

AlexHentschel commented Feb 17, 2021

totally on-board with retaining the queue at the networking level.

Have far greater number of workers (1000 or more) to never starve the engines.

Not sure what the utility of more workers will be (?). When Engines are non-blocking (and queueing messages internally), a few workers should totally suffice to deliver the messages to the engines. I am worried that an excessive amount of workers might actually obfuscate problems on the engine level, e.g. if an engine has a bug and blocks a worker we would only see this very rarely when there are 1000 workers.

@synzhu synzhu mentioned this pull request Sep 13, 2021
1 task
@peterargue peterargue deleted the alex/networking-api-flip branch January 17, 2023 22:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
FLIP Flow Improvement Proposal
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants