New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize PartitionedOutputOperator #13183
Conversation
bc58acb
to
ba429f3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yingsu00 Ying, here are some initial comments on Introduce UncheckedByteArrays
commit.
presto-main/src/main/java/com/facebook/presto/operator/UncheckedByteArrays.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/operator/UncheckedByteArrays.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/operator/UncheckedByteArrays.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/operator/UncheckedByteArrays.java
Outdated
Show resolved
Hide resolved
presto-main/src/test/java/com/facebook/presto/operator/TestUncheckedByteArrays.java
Outdated
Show resolved
Hide resolved
presto-main/src/test/java/com/facebook/presto/operator/TestUncheckedByteArrays.java
Outdated
Show resolved
Hide resolved
presto-main/src/test/java/com/facebook/presto/operator/TestUncheckedByteArrays.java
Outdated
Show resolved
Hide resolved
presto-main/src/test/java/com/facebook/presto/operator/TestUncheckedByteArrays.java
Outdated
Show resolved
Hide resolved
presto-main/src/test/java/com/facebook/presto/operator/TestUncheckedByteArrays.java
Outdated
Show resolved
Hide resolved
presto-main/src/test/java/com/facebook/presto/operator/TestUncheckedByteArrays.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yingsu00 Here are some partial comments for Optimize repartitioning for BIGINT, DOUBLE and SHORT_DECIMAL types
commit. I'll continue reviewing next week.
{ | ||
checkArgument(slice.isCompact(), "slice is not compact"); | ||
|
||
int uncompressedSize = slice.length(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method's logic appears to be an exact copy of serialize(Page page)
except for slice = Slices.copyOf(slice);
line. Would it make sense to extract this logic into a helper method and make a copy logic conditional on slice.isCompact()
or have a boolean flag that tells whether the copy should occur or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are not exactly the same. The new serialize(Slice slice, int positionCount) has additional optimizations that use byte[] instead of ByteBuffer as the compressionBuffer and make it a class member to avoid allocating new memory each time. So to compare the performance of the two operators I'd like to make them separate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yingsu00 Thanks for explaining. Looks like this is a small change that existing operator can benefit from. Any reason not to apply this optimization and reap some benefits earlier? You can always put old code into a benchmark to test/confirm the gains.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure I can cut a separate PR for it if that's what you're talking about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yingsu00 That would be great!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have exactly the same thought as @mbasmanova . With the optimization in #13232 , does that mean we don't need to have to replicate the code ? If so I think this method (SerializedPage serialize(Slice slice, int positionCount)
) can be added in a separate commit.
Also note this method, in theory, is not doing "serialize": the page is already serialized into slice
. Would wrapSerializedSlice
a better name? Do you have any suggestion? @mbasmanova
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also note this method, in theory, is not doing "serialize": the page is already serialized into slice. Would wrapSerializedSlice a better name? Do you have any suggestion? @mbasmanova
I agree that the name could be improved. I don't have a good suggestion though. For now, I think it would be reasonable to use the same name as an existing method and unify their implementations. A rename of both methods can happen separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a new commit c8eb31102d "Extract wrapSlice method in PagesSerde" as @wenleix suggested.
presto-main/src/main/java/com/facebook/presto/operator/BlockEncodingBuffers.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/operator/BlockEncodingBuffers.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/operator/BlockEncodingBuffers.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/operator/BlockEncodingBuffers.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/operator/OptimizedPartitionedOutputOperator.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/operator/OptimizedPartitionedOutputOperator.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/operator/OptimizedPartitionedOutputOperator.java
Outdated
Show resolved
Hide resolved
pagesAdded.incrementAndGet(); | ||
rowsAdded.addAndGet(bufferedRowCount); | ||
|
||
if (blockEncodingBuffers != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this logic is inconsistent with the for loop above; change it to
for (int i = 0; i < channelCount; i++) {
blockEncodingBuffers[i].resetBuffers();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this comment is marked as resolved? Since on line 477 blockEncodingBuffers
is not checked whether it's null or not, so I assume here we don't need the check as well?
Also, does it make sense to have one single serializeAndReset
(a.k.a flush
) API ? As it looks to me serializeTo
call is always coupled with resetBuffers
. -- We can do this refactor after this PR is merged, of course.
presto-main/src/main/java/com/facebook/presto/operator/OptimizedPartitionedOutputOperator.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yingsu00 Finished reading the OptimizedPartitionedOutputOperator. Just a few minor comments.
presto-main/src/main/java/com/facebook/presto/operator/OptimizedPartitionedOutputOperator.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/operator/OptimizedPartitionedOutputOperator.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/operator/OptimizedPartitionedOutputOperator.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/operator/OptimizedPartitionedOutputOperator.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/operator/OptimizedPartitionedOutputOperator.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/operator/OptimizedPartitionedOutputOperator.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yingsu00 Some initial comments on Introduce TestingBlockBuilders and TestingPageBuilders
commit
presto-main/src/test/java/com/facebook/presto/operator/TestingBlockBuilders.java
Outdated
Show resolved
Hide resolved
presto-main/src/test/java/com/facebook/presto/operator/TestingPageBuilders.java
Outdated
Show resolved
Hide resolved
presto-main/src/test/java/com/facebook/presto/operator/TestingPageBuilders.java
Outdated
Show resolved
Hide resolved
presto-main/src/test/java/com/facebook/presto/operator/TestingPageBuilders.java
Outdated
Show resolved
Hide resolved
presto-main/src/test/java/com/facebook/presto/operator/TestingPageBuilders.java
Outdated
Show resolved
Hide resolved
presto-main/src/test/java/com/facebook/presto/operator/TestingPageBuilders.java
Outdated
Show resolved
Hide resolved
presto-main/src/test/java/com/facebook/presto/operator/TestingBlockBuilders.java
Outdated
Show resolved
Hide resolved
presto-main/src/test/java/com/facebook/presto/operator/TestingBlockBuilders.java
Outdated
Show resolved
Hide resolved
presto-main/src/test/java/com/facebook/presto/operator/TestingBlockBuilders.java
Outdated
Show resolved
Hide resolved
presto-main/src/test/java/com/facebook/presto/operator/TestingBlockBuilders.java
Outdated
Show resolved
Hide resolved
5f94f6d
to
3990e92
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copy JvmUtils from Airlift to presto-spi
Looks good.
// We need to use getRawSlice() to get the raw slice whose address is not advanced by getSlice(). It's incorrect to call getSlice() | ||
// because the returned slice's address may be advanced if it's based on a slice view. | ||
Slice rawSlice = variableWidthBlock.getRawSlice(0); | ||
byte[] sliceBase = (byte[]) rawSlice.getBase(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While today a VariableWidthBlock
is always holding a Slice
backed by a byte array, I do feel this downcast is a bit hacky and breaks the abstraction.
What do you think ? @highker , @arhimondr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wenleix the downcast from Object to byte[] was pre-existing in the code base. I saw examples from PagesSerde(Original serialize(Page) method), OrcOutputBuffer, OrcInputStream, and ParquetCompressionUtils.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Example from PagesSerde.serialize(Page page)
int compressedSize = compressor.get().compress(
(byte[]) slice.getBase(),
(int) (slice.getAddress() - ARRAY_BYTE_BASE_OFFSET),
uncompressedSize,
compressionBuffer,
0,
maxCompressedSize);
From OrcOutputBuffer.writeBytes(Slice source, int sourceIndex, int length)
writeDirectlyToOutputStream((byte[]) source.getBase(), sourceIndex + (int) (source.getAddress() - ARRAY_BYTE_BASE_OFFSET), length);`
From OrcInputStream.advance()
:
int uncompressedSize = decompressor.get().decompress((byte[]) chunk.getBase(), (int) (chunk.getAddress() - ARRAY_BYTE_BASE_OFFSET), chunk.length(), output);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Fix "position is not valid" for Mapblock": LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Fix "invalid position in block" error" LGTM.
HiveQueryRunner produced a lot of logs and caused Travis to fail with "The job exceeded the maximum log length" error. This commit increases the logging level for com.facebook.presto.event from INFO to WARN to reduce the amount of logs produced.
Operators that need to use BlockFlattener need to instantiate an ArrayAllocator. However the ArrayAllocators were package private and can't be accessed by operators not under the root folder of operator. This commit makes them public.
The code is entirely type safe. The alternative abstraction is to have a method that copies bytes out of a block, like
Block.copyBytes(int position, int start, int length, byte[] target, int targetOffset);
But this is more code, more overhead, misses the opportunity of decoding offsets outside of loops, etc. Presto needs a handle on CPU efficiency and this is part of getting this.
.
From: Wenlei Xie <notifications@github.com>
Sent: Wednesday, October 9, 2019 11:37 AM
To: prestodb/presto <presto@noreply.github.com>
Cc: oerling <erling@xs4all.nl>; Mention <mention@noreply.github.com>
Subject: Re: [prestodb/presto] Optimize PartitionedOutputOperator (#13183)
@wenleix commented on this pull request.
_____
In presto-main/src/main/java/com/facebook/presto/operator/repartition/VariableWidthBlockEncodingBuffer.java <#13183 (comment)> :
@@ -165,10 +167,15 @@ private void appendOffsetsAndSlices()
AbstractVariableWidthBlock variableWidthBlock = (AbstractVariableWidthBlock) decodedBlock;
int[] positions = getPositions();
- // This implementation uses variableWidthBlock.getPositionOffset() to achieve high performance
- int sliceLength = variableWidthBlock.getPositionOffset(variableWidthBlock.getPositionCount()) - variableWidthBlock.getPositionOffset(0);
+ // We need to use getRawSlice() to get the raw slice whose address is not advanced by getSlice(). It's incorrect to call getSlice()
+ // because the returned slice's address may be advanced if it's based on a slice view.
+ Slice rawSlice = variableWidthBlock.getRawSlice(0);
+ byte[] sliceBase = (byte[]) rawSlice.getBase();
While today a VariableWidthBlock is always holding a Slice backed by a byte array, I do feel this downcast is a bit hacky and breaks the abstraction.
What do you think ? @highker <https://github.com/highker> , @arhimondr <https://github.com/arhimondr>
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub <#13183?email_source=notifications&email_token=AKPPPT3C3PUOQL55TWSXHX3QNYQDJA5CNFSM4IJH4I6KYY3PNVWWK3TUL52HS4DFWFIHK3DMKJSXC5LFON2FEZLWNFSXPKTDN5WW2ZLOORPWSZGOCHN4DGY#pullrequestreview-299614619> , or unsubscribe <https://github.com/notifications/unsubscribe-auth/AKPPPTZR5IAG4EKB4TPASXLQNYQDJANCNFSM4IJH4I6A> .
|
The new OptimizedPartitionedOutputOperator achieves better performance by skipping block building. It appends data directly to output buffers, then wraps these buffers into SerializedPage. Includes support for types represented as long, e.g. BIGINT, DOUBLE and short DECIMAL.
@wenleix Hi Wenlei, thank you for reviewing the PR again. I have squashed "Fix "invalid position in block" error" and "Fix "position is not valid" for Mapblock" into the previous commits. Regarding the downcast in VariableWidthBlockEncodingBuffer, do you think it's blocking? Like I said, there're existing code examples that downcasts from Slice.getBase() to byte[], and VariableWidthBlock is always backed by byte[], so I think it's pretty type safe. Actually the downcasting was already in the commit "834e12c8ea Optimize repartitioning for VARCHAR type". The change in this "ce775377a7 Fix INVALID_CAST_ARGUMENT error in VariableWidthBlockEncodingBuffers" was to change the I understand your concern about exposing the byte[] from VariableWidthBlock, but as we talked before, we have a TODO item to expose the getRawSlice() and getPositionOffset() in ColumnarSlice and we will discuss about it more as a follow up item. Also as @oerling pointed out, an alternative would be to make the Block to support this
How about we create a TODO item on this and I will host a meeting to disucss it next week? If you really think this is a blocker, can we merge the other types other than VariableWidthBlock first? |
Type hasNull channels baseline optimized Gain (x) BIGINT FALSE 1 2,677 882 3.0 BIGINT FALSE 2 3,961 1,084 3.7 BIGINT TRUE 1 3,348 1,245 2.7 BIGINT TRUE 2 4,375 1,557 2.8 BOOLEAN FALSE 1 2,915 873 3.3 BOOLEAN FALSE 2 3,818 1,140 3.3 BOOLEAN TRUE 1 3,003 1,112 2.7 BOOLEAN TRUE 2 4,084 1,553 2.6 INTEGER FALSE 1 2,804 954 2.9 INTEGER FALSE 2 3,921 1,101 3.6 INTEGER TRUE 1 3,143 1,181 2.7 INTEGER TRUE 2 4,442 1,542 2.9 LONG_DECIMAL FALSE 1 3,697 1,118 3.3 LONG_DECIMAL FALSE 2 5,550 1,476 3.8 LONG_DECIMAL TRUE 1 3,654 1,311 2.8 LONG_DECIMAL TRUE 2 6,089 1,821 3.3 SMALLINT FALSE 1 2,821 1,032 2.7 SMALLINT FALSE 2 4,007 1,239 3.2 SMALLINT TRUE 1 2,908 1,141 2.5 SMALLINT TRUE 2 4,244 1,580 2.7 VARCHAR FALSE 1 5,821 1,972 3.0 VARCHAR FALSE 2 10,021 3,117 3.2 VARCHAR TRUE 1 5,732 2,226 2.6 VARCHAR TRUE 2 9,484 3,436 2.8 DICTIONARY(BIGINT) FALSE 1 1,694 628 2.7 DICTIONARY(BIGINT) FALSE 2 2,450 822 3.0 DICTIONARY(BIGINT) TRUE 1 1,940 823 2.4 DICTIONARY(BIGINT) TRUE 2 2,960 1,096 2.7 RLE(BIGINT) FALSE 1 1,663 592 2.8 RLE(BIGINT) FALSE 2 2,404 753 3.2 RLE(BIGINT) TRUE 1 1,645 684 2.4 RLE(BIGINT) TRUE 2 2,429 851 2.9 ARRAY(BIGINT) FALSE 1 1,128 660 1.7 ARRAY(BIGINT) FALSE 2 1,924 958 2.0 ARRAY(BIGINT) TRUE 1 1,190 702 1.7 ARRAY(BIGINT) TRUE 2 2,002 1,160 1.7 ARRAY(VARCHAR) FALSE 1 2,049 1,204 1.7 ARRAY(VARCHAR) FALSE 2 3,937 2,162 1.8 ARRAY(VARCHAR) TRUE 1 1,913 1,216 1.6 ARRAY(VARCHAR) TRUE 2 3,482 2,177 1.6 ARRAY(ARRAY(BIGINT)) FALSE 1 2,148 1,290 1.7 ARRAY(ARRAY(BIGINT)) FALSE 2 4,271 2,430 1.8 ARRAY(ARRAY(BIGINT)) TRUE 1 2,122 1,441 1.5 ARRAY(ARRAY(BIGINT)) TRUE 2 3,937 2,495 1.6 MAP(BIGINT,BIGINT) FALSE 1 2,266 838 2.7 MAP(BIGINT,BIGINT) FALSE 2 4,468 1,441 3.1 MAP(BIGINT,BIGINT) TRUE 1 2,137 936 2.3 MAP(BIGINT,BIGINT) TRUE 2 4,165 1,644 2.5 MAP(BIGINT,MAP(BIGINT,BIGINT)) FALSE 1 5,941 2,154 2.8 MAP(BIGINT,MAP(BIGINT,BIGINT)) FALSE 2 11,707 4,564 2.6 MAP(BIGINT,MAP(BIGINT,BIGINT)) TRUE 1 5,301 2,060 2.6 MAP(BIGINT,MAP(BIGINT,BIGINT)) TRUE 2 10,225 4,169 2.5 ROW(BIGINT,BIGINT) FALSE 1 1,239 575 2.2 ROW(BIGINT,BIGINT) FALSE 2 2,289 933 2.5 ROW(BIGINT,BIGINT) TRUE 1 1,354 721 1.9 ROW(BIGINT,BIGINT) TRUE 2 2,380 1,121 2.1 ROW(ARRAY(BIGINT),ARRAY(BIGINT)) FALSE 1 2,349 1,238 1.9 ROW(ARRAY(BIGINT),ARRAY(BIGINT)) FALSE 2 4,647 2,476 1.9 ROW(ARRAY(BIGINT),ARRAY(BIGINT)) TRUE 1 2,412 1,568 1.5 ROW(ARRAY(BIGINT),ARRAY(BIGINT)) TRUE 2 4,698 3,093 1.5
@wenleix Just squashed all commits. Thank you very much! |
Merged #13183. Thanks for the contribution! |
Hi @yingsu00 , we notice that this feature is still experimental (FeaturesConfig 'experimental.optimized-repartitioning'). We want to enable this feature in our product environment,but we worry it may have some potential issues. Do we have any data to indicate the performance and stability of this feature in product environment(e.g. facebook product environment)? Thanks. |
Github issue #13015
The new OptimizedPartitionedOutputOperator achieves better performance by
skipping block building. It appends data directly to output buffers, then wraps
these buffers into SerializedPage.
Internal shadow run shows 2x improvement in the operator CPU usage, and 5%
CPU improvement over all queries.
Benchmark results show 1.5x - 3.7x improvements on 5000 pages (unit ms/op):
Updates
Sept 30 2019
Sept 26 2019
Sept 23 2019
Sept 16 2019
Sept 13 2019
Moved the operator and BlockEncodingBuffers to operator/repartition folder
Sept 10 2019
Sept 09 2019
TODO
Previous WIP PR
#13032