-
Notifications
You must be signed in to change notification settings - Fork 410
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
change the exchange packets interface among tiflash #3184
Conversation
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
f59ea35
to
4dfd62f
Compare
@@ -91,4 +93,14 @@ Block CHBlockChunkCodec::decode(const tipb::Chunk & chunk, const DAGSchema & sch | |||
return block_in.read(); | |||
} | |||
|
|||
Block CHBlockChunkCodec::decode(const std::string & str, const DAGSchema & schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
L88-L93 is the same as L98-L103, should reuse them.
class CHBlockChunkCodec : public ChunkCodec | ||
{ | ||
public: | ||
CHBlockChunkCodec() = default; | ||
Block decode(const tipb::Chunk &, const DAGSchema &) override; | ||
Block decode(const std::string &, const DAGSchema &); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just change Block decode(const tipb::Chunk &, const DAGSchema &)
to Block decode(const std::string &, const DAGSchema &)
in the base class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
|
||
void write(mpp::MPPDataPacket &) | ||
{ | ||
assert(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throw exception with unsupported error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
} | ||
void write(mpp::MPPDataPacket &, [[maybe_unused]] uint16_t) | ||
{ | ||
assert(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
if (records_per_chunk == -1) | ||
{ | ||
mpp::MPPDataPacket packet; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TiSpark supports CHBlock
encode type, will it be affected by this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
writing to tispark uses current_memory_tracker
? I suppose this branch only works for shuffing among tiflash.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by writing to tispark uses current_memory_tracker
? Writing to TiSpark does use StreamingDAGResponseWriter
. And I think we can only guarantee that partitionAndEncodeThenWriteBlocks
is used among TiFlash.
|
||
auto rows = result.decodeChunks(block_queue, remote_reader->getOutputSchema(), expected_types); | ||
/// return empty msg after all its chunks are decoded. | ||
remote_reader->returnEmptyMsg(&result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks weird to divide the fetch result into 3 part:
- remote_reader->nextResult
- result.decodeChunks
- remote_reader->returnEmptyMsg
I suggest to redefine remote_reader->nextResult
, move the logical of Chunk
=> Block
into nextResult
, so we do not need result.decodeChunks
and remote_reader->returnEmptyMsg
, just call remote_reader->nextResult
is enough.
/run-all-tests |
1fddfe6
to
b6aeae4
Compare
/run-all-tests |
/run-all-tests |
if (records_per_chunk == -1) | ||
{ | ||
mpp::MPPDataPacket packet; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by writing to tispark uses current_memory_tracker
? Writing to TiSpark does use StreamingDAGResponseWriter
. And I think we can only guarantee that partitionAndEncodeThenWriteBlocks
is used among TiFlash.
void MPPTunnelSetBase<Tunnel>::write(mpp::MPPDataPacket & packet) | ||
{ | ||
auto tunnels_size = tunnels.size(); | ||
if (tunnels_size > 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when tunnels_size == 1
, the packet is ignored?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is a bug, fixed.
responses[i] = response; | ||
responses[i].set_encode_type(encode_type); | ||
if constexpr (for_last_response) | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For both broadcast and partition writer, only one response needs to carry the execution summary info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated in TunnelSet->write()
/run-all-tests |
1 similar comment
/run-all-tests |
/run-all-tests |
1 similar comment
/run-all-tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
others LGTM
block_queue.push(std::move(block)); | ||
} | ||
} | ||
else if (!recv_msg->packet->data().empty()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
useless code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
responses[i].set_encode_type(encode_type); | ||
if constexpr (for_last_response) | ||
{ | ||
if (i == 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some comments to make it more readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -1,5 +1,8 @@ | |||
#include <Common/CPUAffinityManager.h> | |||
#include <Common/ThreadFactory.h> | |||
#include <Flash/Coprocessor/ArrowChunkCodec.h> | |||
#include <Flash/Coprocessor/CoprocessorReader.h> | |||
#include <Flash/Coprocessor/DefaultChunkCodec.h> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
useless header file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
7415435
to
f5134d8
Compare
/merge |
@fzhedu: It seems you want to merge this PR, I will help you trigger all the tests: /run-all-tests You only need to trigger If you have any questions about the PR merge process, please refer to pr process. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
This pull request has been accepted and is ready to merge. Commit hash: f5134d8
|
What problem does this PR solve?
Issue Number: close #xxx
Problem Summary:
exchange encodes blocks twice thus decodes blocks twice, i.e., block -> response -> packet , taking more CPU, to solve this issue, this PR directly serializes blocks into packets. In detail, block -> packet for the non-last packets, but for the last packet, block -> packet.chunks and execution_summaries -> response -> packet.data. This change aims to exchanging data among tiflash, sending data from tiflash to tidb as previous doing.
What is changed and how it works?
Proposal: change kvproto
What's Changed:
Related changes
pingcap/docs
/pingcap/docs-cn
:Check List
Tests
Side effects
Release note