Skip to content

Commit

Permalink
Enable colocated join by default
Browse files Browse the repository at this point in the history
  • Loading branch information
radek-kondziolka authored and raunaqmorarka committed Mar 13, 2023
1 parent 0675b4a commit 0b2e189
Show file tree
Hide file tree
Showing 6 changed files with 315 additions and 25 deletions.
Expand Up @@ -429,7 +429,7 @@ public SystemSessionProperties(
value -> value),
booleanProperty(
COLOCATED_JOIN,
"Experimental: Use a colocated join when possible",
"Use a colocated join when possible",
optimizerConfig.isColocatedJoinsEnabled(),
false),
booleanProperty(
Expand Down
Expand Up @@ -50,7 +50,7 @@ public class OptimizerConfig
private double filterConjunctionIndependenceFactor = 0.75;
private boolean nonEstimatablePredicateApproximationEnabled = true;

private boolean colocatedJoinsEnabled;
private boolean colocatedJoinsEnabled = true;
private boolean spatialJoinsEnabled = true;
private boolean distributedSort = true;

Expand Down Expand Up @@ -327,7 +327,7 @@ public boolean isColocatedJoinsEnabled()
}

@Config("colocated-joins-enabled")
@ConfigDescription("Experimental: Use a colocated join when possible")
@ConfigDescription("Use a colocated join when possible")
public OptimizerConfig setColocatedJoinsEnabled(boolean colocatedJoinsEnabled)
{
this.colocatedJoinsEnabled = colocatedJoinsEnabled;
Expand Down
Expand Up @@ -48,7 +48,7 @@ public void testDefaults()
.setJoinMultiClauseIndependenceFactor(0.25)
.setJoinReorderingStrategy(JoinReorderingStrategy.AUTOMATIC)
.setMaxReorderedJoins(9)
.setColocatedJoinsEnabled(false)
.setColocatedJoinsEnabled(true)
.setSpatialJoinsEnabled(true)
.setUsePreferredWritePartitioning(true)
.setPreferredWritePartitioningMinNumberOfPartitions(50)
Expand Down Expand Up @@ -115,7 +115,7 @@ public void testExplicitPropertyMappings()
.put("optimizer.max-reordered-joins", "5")
.put("iterative-optimizer-timeout", "10s")
.put("enable-forced-exchange-below-group-id", "false")
.put("colocated-joins-enabled", "true")
.put("colocated-joins-enabled", "false")
.put("spatial-joins-enabled", "false")
.put("distributed-sort", "false")
.put("use-preferred-write-partitioning", "false")
Expand Down Expand Up @@ -167,7 +167,7 @@ public void testExplicitPropertyMappings()
.setMaxReorderedJoins(5)
.setIterativeOptimizerTimeout(new Duration(10, SECONDS))
.setEnableForcedExchangeBelowGroupId(false)
.setColocatedJoinsEnabled(true)
.setColocatedJoinsEnabled(false)
.setSpatialJoinsEnabled(false)
.setUsePreferredWritePartitioning(false)
.setPreferredWritePartitioningMinNumberOfPartitions(10)
Expand Down
Expand Up @@ -29,13 +29,15 @@
import io.trino.sql.planner.plan.FilterNode;
import io.trino.sql.planner.plan.JoinNode.DistributionType;
import io.trino.sql.planner.plan.MarkDistinctNode;
import io.trino.sql.query.QueryAssertions;
import io.trino.sql.tree.GenericLiteral;
import io.trino.sql.tree.LongLiteral;
import io.trino.testing.LocalQueryRunner;
import org.testng.annotations.Test;

import java.util.Optional;

import static io.trino.SystemSessionProperties.COLOCATED_JOIN;
import static io.trino.SystemSessionProperties.ENABLE_DYNAMIC_FILTERING;
import static io.trino.SystemSessionProperties.ENABLE_STATS_CALCULATOR;
import static io.trino.SystemSessionProperties.IGNORE_DOWNSTREAM_PREFERENCES;
Expand All @@ -46,6 +48,7 @@
import static io.trino.SystemSessionProperties.SPILL_ENABLED;
import static io.trino.SystemSessionProperties.TASK_CONCURRENCY;
import static io.trino.SystemSessionProperties.USE_EXACT_PARTITIONING;
import static io.trino.spi.type.VarcharType.createVarcharType;
import static io.trino.sql.planner.OptimizerConfig.JoinDistributionType.PARTITIONED;
import static io.trino.sql.planner.OptimizerConfig.JoinReorderingStrategy.ELIMINATE_CROSS_JOINS;
import static io.trino.sql.planner.assertions.PlanMatchPattern.aggregation;
Expand Down Expand Up @@ -80,7 +83,9 @@
import static io.trino.sql.planner.plan.TopNNode.Step.FINAL;
import static io.trino.sql.tree.SortItem.NullOrdering.LAST;
import static io.trino.sql.tree.SortItem.Ordering.ASCENDING;
import static io.trino.testing.MaterializedResult.resultBuilder;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static org.assertj.core.api.Assertions.assertThat;

public class TestAddExchangesPlans
extends BasePlanTest
Expand Down Expand Up @@ -752,28 +757,30 @@ public void testMarkDistinctIsExactlyPartitioned()
"orderdate", "orderdate"))))))))));
}

// Negative test for use-exact-partitioning
// Negative test for use-exact-partitioning when colocated join is disabled
@Test
public void testJoinNotExactlyPartitioned()
public void testJoinNotExactlyPartitionedWhenColocatedJoinDisabled()
{
assertDistributedPlan(
"SELECT\n" +
" orders.orderkey,\n" +
" orders.orderstatus\n" +
"FROM (\n" +
" SELECT\n" +
" orderkey,\n" +
" ARBITRARY(orderstatus) AS orderstatus,\n" +
" COUNT(*)\n" +
" FROM orders\n" +
" GROUP BY\n" +
" orderkey\n" +
") t,\n" +
"orders\n" +
"WHERE\n" +
" orders.orderkey = t.orderkey\n" +
" AND orders.orderstatus = t.orderstatus",
noJoinReordering(),
"""
SELECT
orders.orderkey,
orders.orderstatus
FROM (
SELECT
orderkey,
ARBITRARY(orderstatus) AS orderstatus,
COUNT(*)
FROM orders
GROUP BY
orderkey
) t,
orders
WHERE
orders.orderkey = t.orderkey
AND orders.orderstatus = t.orderstatus
""",
noJoinReorderingColocatedJoinDisabled(),
anyTree(
project(
anyTree(
Expand All @@ -784,6 +791,51 @@ public void testJoinNotExactlyPartitioned()
tableScan("orders"))))));
}

// Negative test for use-exact-partitioning when colocated join is enabled (default)
@Test
public void testJoinNotExactlyPartitioned()
{
QueryAssertions queryAssertions = new QueryAssertions(getQueryRunner());
assertThat(queryAssertions.query("SHOW SESSION LIKE 'colocated_join'")).matches(
resultBuilder(
getQueryRunner().getDefaultSession(),
createVarcharType(56),
createVarcharType(14),
createVarcharType(14),
createVarcharType(7),
createVarcharType(151))
.row("colocated_join", "true", "true", "boolean", "Use a colocated join when possible")
.build());

assertDistributedPlan(
"""
SELECT
orders.orderkey,
orders.orderstatus
FROM (
SELECT
orderkey,
ARBITRARY(orderstatus) AS orderstatus,
COUNT(*)
FROM orders
GROUP BY
orderkey
) t,
orders
WHERE
orders.orderkey = t.orderkey
AND orders.orderstatus = t.orderstatus
""",
noJoinReordering(),
anyTree(
project(
anyTree(
tableScan("orders"))),
exchange(LOCAL, GATHER,
anyTree(
tableScan("orders")))));
}

private Session spillEnabledWithJoinDistributionType(JoinDistributionType joinDistributionType)
{
return Session.builder(getQueryRunner().getDefaultSession())
Expand All @@ -803,6 +855,16 @@ private Session noJoinReordering()
.build();
}

private Session noJoinReorderingColocatedJoinDisabled()
{
return Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(JOIN_REORDERING_STRATEGY, JoinReorderingStrategy.NONE.name())
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, JoinDistributionType.BROADCAST.name())
.setSystemProperty(TASK_CONCURRENCY, "16")
.setSystemProperty(COLOCATED_JOIN, "false")
.build();
}

private Session useExactPartitioning()
{
return Session.builder(getQueryRunner().getDefaultSession())
Expand Down

0 comments on commit 0b2e189

Please sign in to comment.