Skip to content

Commit

Permalink
apache#1387 No need to sort left alias for hash join
Browse files Browse the repository at this point in the history
  • Loading branch information
navis committed Feb 8, 2019
1 parent a4c3d8b commit 2c64a3e
Showing 1 changed file with 21 additions and 19 deletions.
40 changes: 21 additions & 19 deletions processing/src/main/java/io/druid/query/XJoinPostProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public Object accumulate(Object accumulated, Pair<Query, Sequence> in)
};
}

private PrioritizedCallable toJoinAlias(
private PrioritizedCallable<JoinAlias> toJoinAlias(
final JoinType type,
final boolean sorted,
final boolean hashing,
Expand All @@ -187,20 +187,20 @@ private PrioritizedCallable toJoinAlias(
final List<String> aliases = Arrays.asList(alias);
final int[] indices = GuavaUtils.indexOf(columnNames, joinColumns);
if (hashing) {
return new PrioritizedCallable.Background()
return new PrioritizedCallable.Background<JoinAlias>()
{
@Override
public Object call()
public JoinAlias call()
{
final Map<JoinKey, Object> hashed = toHashed(Sequences.concat(sequences), indices, sorted);
return new JoinAlias(aliases, columnNames, joinColumns, indices, hashed);
}
};
}
return new PrioritizedCallable.Background()
return new PrioritizedCallable.Background<JoinAlias>()
{
@Override
public Object call()
public JoinAlias call()
{
final Sequence<Object[]> sequence = Sequences.concat(sequences);
if (sorted) {
Expand Down Expand Up @@ -278,21 +278,18 @@ public Object apply(JoinKey key, Object prev)
}

@VisibleForTesting
final Iterator<Object[]> join(final Future[] futures) throws Exception
final Iterator<Object[]> join(final Future<JoinAlias>[] futures) throws Exception
{
JoinAlias left = (JoinAlias) futures[0].get();
Iterator<Object[]> iterator = Iterators.emptyIterator();
for (int i = 1; i < futures.length; i++) {
final JoinAlias right = (JoinAlias) futures[i].get();
log.info("... start joining %s to %s (%s)", left.alias, right.alias, right.isHashed() ? "Hash" : "SortedMerge");
iterator = join(left, right, i - 1);
if (i == futures.length - 1) {
break;
}
List<String> alias = GuavaUtils.concat(left.alias, right.alias);
List<String> columns = GuavaUtils.concat(left.columns, right.columns);
List<String> joinColumns = elements[i].getLeftJoinColumns();
int[] indices = GuavaUtils.indexOf(columns, joinColumns);
JoinAlias left = futures[0].get();
JoinAlias right = futures[1].get();
log.info("... start joining %s to %s (%s)", left.alias, right.alias, right.isHashed() ? "Hash" : "SortedMerge");
Iterator<Object[]> iterator = join(left, right, 0);
List<String> alias = GuavaUtils.concat(left.alias, right.alias);
List<String> columns = GuavaUtils.concat(left.columns, right.columns);
List<String> joinColumns = elements[0].getLeftJoinColumns();
for (int i = 2; i < futures.length; i++) {
right = futures[i].get();
final int[] indices = GuavaUtils.indexOf(columns, joinColumns);
boolean sorted;
Iterator<Object[]> leftRows;
if (right.isHashed() || joinColumns.equals(left.joinColumns) || joinColumns.equals(right.joinColumns)) {
Expand All @@ -304,6 +301,11 @@ final Iterator<Object[]> join(final Future[] futures) throws Exception
sorted = true;
}
left = new JoinAlias(alias, columns, joinColumns, indices, leftRows, sorted);
log.info("... start joining %s to %s (%s)", alias, right.alias, right.isHashed() ? "Hash" : "SortedMerge");
iterator = join(left, right, 0);
alias = GuavaUtils.concat(alias, right.alias);
columns = GuavaUtils.concat(columns, right.columns);
joinColumns = elements[i - 1].getLeftJoinColumns();
}
return iterator;
}
Expand Down

0 comments on commit 2c64a3e

Please sign in to comment.