Skip to content

Commit

Permalink
Revert "Finish inner join if build is empty"
Browse files Browse the repository at this point in the history
This change caused queries to hang, because a task could finish its join
before receiving splits for all the remote sources that the probe side
should pull from. Those would then never be closed.
  • Loading branch information
cberner committed Jul 1, 2015
1 parent cfa9c0a commit 4c6ad34
Show file tree
Hide file tree
Showing 8 changed files with 3 additions and 64 deletions.
Expand Up @@ -92,12 +92,6 @@ public final int getChannelCount()
return channelCount;
}

@Override
public boolean isEmpty()
{
return addresses.isEmpty();
}

@Override
public long getInMemorySizeInBytes()
{
Expand Down
Expand Up @@ -25,7 +25,6 @@
import java.util.List;

import static com.facebook.presto.operator.LookupJoinOperators.JoinType.FULL_OUTER;
import static com.facebook.presto.operator.LookupJoinOperators.JoinType.INNER;
import static com.facebook.presto.operator.LookupJoinOperators.JoinType.LOOKUP_OUTER;
import static com.facebook.presto.operator.LookupJoinOperators.JoinType.PROBE_OUTER;
import static com.facebook.presto.util.MoreFutures.tryGetUnchecked;
Expand All @@ -46,7 +45,6 @@ public class LookupJoinOperator

private final boolean lookupOnOuterSide;
private final boolean probeOnOuterSide;
private final JoinType joinType;

private LookupSource lookupSource;
private JoinProbe probe;
Expand All @@ -73,7 +71,6 @@ public LookupJoinOperator(

this.lookupSourceFuture = lookupSourceSupplier.getLookupSource(operatorContext);
this.joinProbeFactory = joinProbeFactory;
this.joinType = joinType;

// Cannot use switch case here, because javac will synthesize an inner class and cause IllegalAccessError
probeOnOuterSide = joinType == PROBE_OUTER || joinType == FULL_OUTER;
Expand Down Expand Up @@ -140,10 +137,7 @@ public boolean needsInput()
}

if (lookupSource == null) {
tryGetLookupSource();
if (finishing) {
return false;
}
lookupSource = tryGetUnchecked(lookupSourceFuture);
}
return lookupSource != null && probe == null;
}
Expand All @@ -168,8 +162,8 @@ public Page getOutput()
{
// If needsInput was never called, lookupSource has not been initialized so far.
if (lookupSource == null) {
tryGetLookupSource();
if (lookupSource == null || finishing) {
lookupSource = tryGetUnchecked(lookupSourceFuture);
if (lookupSource == null) {
return null;
}
}
Expand Down Expand Up @@ -291,13 +285,4 @@ private void buildSideOuterJoinUnvisitedPositions()
}
}
}

private void tryGetLookupSource()
{
lookupSource = tryGetUnchecked(lookupSourceFuture);
// for inner joins, if lookup source does not have any positions, we can finish
if (lookupSource != null && joinType == INNER && lookupSource.isEmpty()) {
finishing = true;
}
}
}
Expand Up @@ -24,8 +24,6 @@ public interface LookupSource
{
int getChannelCount();

boolean isEmpty();

long getInMemorySizeInBytes();

long getJoinPosition(int position, Page page, int rawHash);
Expand Down
Expand Up @@ -51,16 +51,6 @@ public int getChannelCount()
return lookupSources[0].getChannelCount();
}

@Override
public boolean isEmpty()
{
boolean empty = true;
for (LookupSource lookupSource : lookupSources) {
empty = empty && lookupSource.isEmpty();
}
return empty;
}

@Override
public long getInMemorySizeInBytes()
{
Expand Down
Expand Up @@ -43,12 +43,6 @@ public int getChannelCount()
return lookupSource.getChannelCount();
}

@Override
public boolean isEmpty()
{
return lookupSource.isEmpty();
}

@Override
public long getInMemorySizeInBytes()
{
Expand Down
Expand Up @@ -375,12 +375,6 @@ public int getChannelCount()
return channelCount;
}

@Override
public boolean isEmpty()
{
return true;
}

@Override
public long getInMemorySizeInBytes()
{
Expand Down
Expand Up @@ -44,12 +44,6 @@ public int getChannelCount()
return indexLoader.getChannelCount();
}

@Override
public boolean isEmpty()
{
return false;
}

@Override
public long getInMemorySizeInBytes()
{
Expand Down
Expand Up @@ -482,16 +482,6 @@ public void testDistinctJoin()
"GROUP BY a.orderstatus");
}

@Test
public void testJoinEmptyBuild()
throws Exception
{
assertQuery("SELECT COUNT(b.quantity) " +
"FROM orders a " +
"JOIN (SELECT * FROM lineitem WHERE returnflag = 'foo') b " +
"ON a.orderkey = b.orderkey");
}

@Test
public void testArithmeticNegation()
throws Exception
Expand Down

0 comments on commit 4c6ad34

Please sign in to comment.