-
Notifications
You must be signed in to change notification settings - Fork 6.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replacing SummingMergeTree implementation to use standard aggregation functions #1330
Replacing SummingMergeTree implementation to use standard aggregation functions #1330
Conversation
This is basically useful for testing SummingSortedBlockInputStream against the AggregatingBlockInputStream, which are used in the {Summing,Aggregating}MergeTree table engines respectively.
…ctions This replaces custom summation function implementations with an implementation using built-in aggregation functions (sum and sumMap). The goal is to be able to use specialized variants of aggregation functions, and to have a single efficient implementation.
Current implementation when just summing numeric column:
Current implementation when adding a nested column (1 key 1 value):
The PR implementation when just summing numeric column:
The PR implementation (1 key 1 value, 1 key 2 values):
|
The function is rewritten to avoid allocations on every insert with Field deserialising each array. The key type is now specialized, so it can be accessed directly. The value type is variant type, but only individual values are deserialised (which is cheap, since they're PODs). The function also support summing of multiple columns by the same key. The SummingSortedBlockInputStream uses the function in case of Nested structure with one numeric key and many values to sum.
Implemented more
The implementation is a bit faster with Binary function variant could be further specialized as it doesn't need to store vector of values for each key, but I'm not sure if the performance gain is worth code duplication at this point. |
I should proofread my comments, sorry 🙈 |
desc.function->add(desc.state.data(), &col, cursor->pos, nullptr); | ||
// This stream discards rows that are zero across all summed columns | ||
if (!res) | ||
res = col->get64(cursor->pos) != 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should discard rows, that was summed to zero (that becomes zero after summation).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, added a better comment. For unsigned types, it's true that the row is summed to zero if all input numbers are also zero. The slight difference is with signed numbers sequence, like -1 +1
, such row will result in zero, but it won't be deleted until the next merge. I don't know how to get state cheaply out of an aggregation function (there's only interface for inserting to column), which is why I used input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
(The only way to look at aggregation state is "insertResultInto" and it can be expensive.)
@@ -69,15 +69,20 @@ class SummingSortedBlockInputStream : public MergingSortedBlockInputStream | |||
* and can be deleted at any time. | |||
*/ | |||
|
|||
/// Stores numbers of key-columns and value-columns. | |||
struct MapDescription | |||
struct AggregateDescription |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
if (desc.created) | ||
{ | ||
desc.function->insertResultInto(desc.state.data(), *desc.merged_column); | ||
desc.function->destroy(desc.state.data()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code is not exception safe: destroy is not called in exception, memory leak when the function with non-trivial state (sumMap) was used. Probably you should also add a destructor for AggregateDescription.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! Added try/catch and an explicit state destructor.
ColumnPtr merged_column; | ||
std::vector<char> state; | ||
bool created = false; | ||
/* Compatibility with the mergeMap */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not separate struct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I don't know whether the legacy mergeMap
, or sumMap
will be used when identifying maps in block, so instead of building two structures, I'm using just one. I'm happy to rework it to separate structs, it doesn't matter much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost Ok, only few modifications required.
@vavrusa It looks like that your changes broke the following tests:
Could you check it, please? |
I'll take a look! |
@vavrusa Yes, we have CI and autotests for each PR, but they are run only for people from the whitelist. I made special comment here to force tests running #1367 (comment) |
@ludv1x Ah thanks. Can I see the test results when it finishes or is it internal? |
@vavrusa You can see only final statistics, logs are not available publicly. |
The issue with the current
SummingMergeTree
implementation is that merging is quite slow when the table contains nested structures ending with Map. The goal is to:SummingSortedBlockInputStream
to use built-in aggregation functions (sum
andsumMap
)sumMap()
for case with composite keysSummingSortedBlockInputStream::mergeMap()
implementationI decided to split these into separate PRs, this first replaces numeric field summation and simple nested structure summation (single key, single value). When the nested structure contains a composite key, it falls back to existing implementation.