New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Port semi-join to WorkProcessorOperator #1119
Conversation
public WorkProcessor.TransformationState<Page> process(Page inputPage) | ||
{ | ||
if (!this.channelSetFuture.isDone()) { | ||
return WorkProcessor.TransformationState.blocked(this.channelSetFuture); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static import TransformationState
public WorkProcessor.TransformationState<Page> process(Page inputPage) | ||
{ | ||
if (!this.channelSetFuture.isDone()) { | ||
return WorkProcessor.TransformationState.blocked(this.channelSetFuture); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
technically, single input page will be buffered until future is done. We could account it's memory here
} | ||
|
||
if (channelSet == null) { | ||
channelSet = tryGetFutureValue(channelSetFuture).orElse(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
future must be done at this point, so tryGetFutureValue
is not needed.
You can use io.airlift.concurrent.MoreFutures#checkSuccess
instead
|
||
@Override | ||
public WorkProcessor.TransformationState<Page> process(Page inputPage) | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could short-circut here by checking inputPage==null
early. This way we won't wait for future if probe side doesn't contain any data.
// add the new boolean column to the page | ||
outputPage = page.appendColumn(blockBuilder.build()); | ||
} | ||
if (inputPage != null && channelSet != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
channelSet
cannot be null at this point
@Override | ||
public WorkProcessor.TransformationState<Page> process(Page inputPage) | ||
{ | ||
if (!this.channelSetFuture.isDone()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would change this if too:
if (channelSet == null) {
if (channelSetFuture.isDone()) {
...
} else {
return WorkProcessor.TransformationState.blocked(this.channelSetFuture);
}
}
@Override | ||
public WorkProcessor.TransformationState<Page> process(Page inputPage) | ||
{ | ||
if (!this.channelSetFuture.isDone()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this
is not needed
21cac9f
to
f0b4b35
Compare
} | ||
|
||
@Override | ||
public WorkProcessor.TransformationState<Page> process(Page inputPage) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static import TransformationState
654277f
to
d9ecd4f
Compare
@sopel39 Thanks for your comments. Have applied the comments. |
part of: #49 |
|
||
if (channelSet == null) { | ||
if (!channelSetFuture.isDone()) { | ||
localMemoryContext.setBytes(inputPage.getSizeInBytes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you could add comment that this will materialize page, but it shouldn't matter for first page
} | ||
if (!contains && channelSet.containsNull()) { | ||
blockBuilder.appendNull(); | ||
checkSuccess(channelSetFuture, "Set building failed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set building failed
-> ChannelSet building failed
private final Optional<Integer> probeHashChannel; | ||
private final LocalMemoryContext localMemoryContext; | ||
|
||
private ChannelSet channelSet; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add @Nullable
annotation
} | ||
} | ||
// add the new boolean column to the page | ||
return TransformationState.ofResult(inputPage.appendColumn(blockBuilder.build()), true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just use ofResult(T result)
this.channelSetFuture = requireNonNull(channelSetFuture, "hashProvider is null").getChannelSet(); | ||
this.probeJoinChannel = probeJoinChannel; | ||
this.probeHashChannel = requireNonNull(probeHashChannel, "hashChannel is null"); | ||
this.localMemoryContext = requireNonNull(aggregatedMemoryContext, "AggregatedMemoryContext is null").newLocalMemoryContext(SemiJoinPages.class.getSimpleName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AggregatedMemoryContext
-> aggregatedMemoryContext
minor comments |
d9ecd4f
to
a12f6cb
Compare
@sopel39 Have applied the comments. |
if (channelSet == null) { | ||
if (!channelSetFuture.isDone()) { | ||
// This will materialize page but it shouldn't matter for the first page | ||
localMemoryContext.setBytes(inputPage.getSizeInBytes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dain this makes HashSemiJoinOperator
keep first input page (to progress probe computations before build future is done). I don't think it's relevant memory usage. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For more context we want to progress upstream operators computations (e.g aggregation) by pulling initial page from probe side when build side future is not done. In current operator model this is also the case. However, in current model computations are performed up to input of next upstream operator. In WorkProcessorOperator
model computations are performed up to input of HashSemiJoinOperator
.
@@ -11,6 +11,6 @@ node.environment=test | |||
coordinator=false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commit message too long. This should go before port
commit
@@ -11,6 +11,6 @@ node.environment=test | |||
coordinator=false | |||
http-server.http.port=8081 | |||
query.max-memory=1GB | |||
query.max-memory-per-node=512MB | |||
query.max-memory-per-node=768MB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sql_tests.testcases.hive_tpch.q18
failed because exceeding node query memory limit. However it was already on the edge with top consuming operators:
Query exceeded per-node user memory limit of 512MB [Allocated: 511.83MB, Delta: 1.69MB,
Top Consumers: {HashBuilderOperator=363.73MB, HashAggregationOperator=144.22MB, PartitionedOutputOperator=42.01MB}]
HashSemiJoinOperator
is not even on top consumers list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this change related to the PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Memory accounting was improved as part of this PR (and other work processor PRs). Also first page is hold until build side is ready. It tipped already borderline mem limit (this operator is not even one of the top consumers)
if (channelSet == null) { | ||
if (!channelSetFuture.isDone()) { | ||
// This will materialize page but it shouldn't matter for the first page | ||
localMemoryContext.setBytes(inputPage.getSizeInBytes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For more context we want to progress upstream operators computations (e.g aggregation) by pulling initial page from probe side when build side future is not done. In current operator model this is also the case. However, in current model computations are performed up to input of next upstream operator. In WorkProcessorOperator
model computations are performed up to input of HashSemiJoinOperator
.
d65303e
to
437938c
Compare
Some synthetic experiment:
|
437938c
to
d6b8666
Compare
No description provided.