Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(executor): accumulate short messages if message queue not empty #2755

Open
jon-chuang opened this issue May 24, 2022 · 3 comments
Open
Labels
no-issue-activity type/enhancement Improvements to existing implementation.

Comments

@jon-chuang
Copy link
Contributor

jon-chuang commented May 24, 2022

Certain executors may drastically reduce the number of rows in a given message. Although this is probably not a cause of worry, in the interest of efficiency of message size and execution batches, it may be better to try to accumulate such small messages.

To do so, we could wrap executors which potentially reduce rows (filter, join) in "accumulators" which try to accumulate messages under a certain threshold (e.g. num of rows, num of bytes) before forwarding them if there is a message backlog in the queue or if a timeout has not been reached.

The logic could be as follows:

if current_time() - msg.start_time < FORWARD_MSG_TIMEOUT && !self.msg_queue.empty() {
  msg.append(self.execute(self.msg_queue.pop()).await);
} else {
  self.forward(msg);
  msg = Message::new();
}
@jon-chuang jon-chuang added the type/enhancement Improvements to existing implementation. label May 24, 2022
@liurenjie1024
Copy link
Contributor

Executors not only reduce chunk sizes, but also increase chunk sizes, for example (join, exchange). So I think a more general approach is to rechunk data chunks to fixed size, which would benefit vectorized execution.

@skyzh
Copy link
Contributor

skyzh commented May 24, 2022

From my perspective, it is better to batch requests on executors that will be affected by chunk number (e.g., joins). It is always beneficial to get message instantly delivered.

@jon-chuang
Copy link
Contributor Author

jon-chuang commented May 24, 2022

Executors not only reduce chunk sizes, but also increase chunk sizes

Yes, the current approach was to change the executor itself to yield chunks to ensure chunks are bounded. It needed a refactor to break up straight-line code into one that yielded intermittently. I suppose that in this case, we cared about latency and message size, as limiting the message size alone could be solved by simply breaking up the monolithic message.

I think yes, the approaches are complementary.

It is always beneficial to get message instantly delivered.

I think this is generally true for same-node, however, I think if the queue is not empty and the messages are small, those messages do less work, and introduce overhead. Hence, I believe that accumulating them into larger messages will offset the latency cost if it is tuned properly.

Perhaps we could focus on accumulating batches in dispatch operators or operators which know they are sending over the network.

So that any "slack" in a fragment or in a pipeline introduced by will be straightened out at the network boundary.

it is better to batch requests on executors that will be affected by chunk number

But yes I guess a complimentary approach may be to do the accumulation at the executor itself...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
no-issue-activity type/enhancement Improvements to existing implementation.
Projects
None yet
Development

No branches or pull requests

4 participants