Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate 'all' domain explicitly #4946

Merged
merged 2 commits into from
Aug 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.prestosql.sql.planner.plan.PlanNode;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -144,7 +145,11 @@ private Map<DynamicFilterId, Domain> convertTupleDomain(TupleDomain<DynamicFilte
return buildChannels.keySet().stream()
.collect(toImmutableMap(identity(), filterId -> Domain.none(filterBuildTypes.get(filterId))));
}
return result.getDomains().get();

Map<DynamicFilterId, Domain> domains = new HashMap<>(result.getDomains().get());
// Add `all` domain explicitly for dynamic filters to notify dynamic filter listeners
buildChannels.keySet().forEach(filterId -> domains.putIfAbsent(filterId, Domain.all(filterBuildTypes.get(filterId))));
return ImmutableMap.copyOf(domains);
}

public static LocalDynamicFilterConsumer create(JoinNode planNode, List<Type> buildSourceTypes, int partitionCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

import static com.google.common.collect.ImmutableList.toImmutableList;
Expand Down Expand Up @@ -60,7 +59,7 @@ public TestLocalDynamicFilterConsumer()

@Test
public void testSimple()
throws ExecutionException, InterruptedException
throws Exception
{
LocalDynamicFilterConsumer filter = new LocalDynamicFilterConsumer(
ImmutableMultimap.of(new DynamicFilterId("123"), new Symbol("a")),
Expand All @@ -80,7 +79,7 @@ public void testSimple()

@Test
public void testShortCircuitOnAllTupleDomain()
throws ExecutionException, InterruptedException
throws Exception
{
LocalDynamicFilterConsumer filter = new LocalDynamicFilterConsumer(
ImmutableMultimap.of(new DynamicFilterId("123"), new Symbol("a")),
Expand All @@ -89,22 +88,25 @@ public void testShortCircuitOnAllTupleDomain()
2);

Consumer<TupleDomain<DynamicFilterId>> consumer = filter.getTupleDomainConsumer();
ListenableFuture<Map<Symbol, Domain>> result = filter.getNodeLocalDynamicFilterForSymbols();
ListenableFuture<Map<Symbol, Domain>> localResult = filter.getNodeLocalDynamicFilterForSymbols();
ListenableFuture<Map<DynamicFilterId, Domain>> result = filter.getDynamicFilterDomains();
assertFalse(localResult.isDone());
assertFalse(result.isDone());

consumer.accept(TupleDomain.withColumnDomains(ImmutableMap.of(
new DynamicFilterId("123"), Domain.all(INTEGER))));
assertEquals(result.get(), ImmutableMap.of());
assertEquals(localResult.get(), ImmutableMap.of());
assertEquals(result.get(), ImmutableMap.of(new DynamicFilterId("123"), Domain.all(INTEGER)));

// adding another partition domain won't change final domain
consumer.accept(TupleDomain.withColumnDomains(ImmutableMap.of(
new DynamicFilterId("123"), Domain.singleValue(INTEGER, 1L))));
assertEquals(result.get(), ImmutableMap.of());
assertEquals(localResult.get(), ImmutableMap.of());
}

@Test
public void testMultipleProbeSymbols()
throws ExecutionException, InterruptedException
throws Exception
{
LocalDynamicFilterConsumer filter = new LocalDynamicFilterConsumer(
ImmutableMultimap.of(new DynamicFilterId("123"), new Symbol("a1"), new DynamicFilterId("123"), new Symbol("a2")),
Expand All @@ -125,7 +127,7 @@ public void testMultipleProbeSymbols()

@Test
public void testMultiplePartitions()
throws ExecutionException, InterruptedException
throws Exception
{
LocalDynamicFilterConsumer filter = new LocalDynamicFilterConsumer(
ImmutableMultimap.of(new DynamicFilterId("123"), new Symbol("a")),
Expand All @@ -148,9 +150,37 @@ public void testMultiplePartitions()
new Symbol("a"), Domain.multipleValues(INTEGER, ImmutableList.of(10L, 20L))));
}

@Test
public void testAllDomain()
throws Exception
{
DynamicFilterId filter1 = new DynamicFilterId("123");
DynamicFilterId filter2 = new DynamicFilterId("124");
LocalDynamicFilterConsumer filter = new LocalDynamicFilterConsumer(
ImmutableMultimap.of(
filter1, new Symbol("a"),
filter2, new Symbol("b")),
ImmutableMap.of(
filter1, 0,
filter2, 1),
ImmutableMap.of(
filter1, INTEGER,
filter2, INTEGER),
1);

Consumer<TupleDomain<DynamicFilterId>> consumer = filter.getTupleDomainConsumer();
ListenableFuture<Map<DynamicFilterId, Domain>> result = filter.getDynamicFilterDomains();
assertFalse(result.isDone());

consumer.accept(TupleDomain.withColumnDomains(ImmutableMap.of(
filter1, Domain.all(INTEGER),
filter2, Domain.singleValue(INTEGER, 1L))));
assertEquals(result.get(), ImmutableMap.of(filter1, Domain.all(INTEGER), filter2, Domain.singleValue(INTEGER, 1L)));
}

@Test
public void testNone()
throws ExecutionException, InterruptedException
throws Exception
{
LocalDynamicFilterConsumer filter = new LocalDynamicFilterConsumer(
ImmutableMultimap.of(new DynamicFilterId("123"), new Symbol("a")),
Expand All @@ -170,7 +200,7 @@ public void testNone()

@Test
public void testMultipleColumns()
throws ExecutionException, InterruptedException
throws Exception
{
LocalDynamicFilterConsumer filter = new LocalDynamicFilterConsumer(
ImmutableMultimap.of(new DynamicFilterId("123"), new Symbol("a"), new DynamicFilterId("456"), new Symbol("b")),
Expand All @@ -192,7 +222,7 @@ public void testMultipleColumns()

@Test
public void testMultiplePartitionsAndColumns()
throws ExecutionException, InterruptedException
throws Exception
{
LocalDynamicFilterConsumer filter = new LocalDynamicFilterConsumer(
ImmutableMultimap.of(new DynamicFilterId("123"), new Symbol("a"), new DynamicFilterId("456"), new Symbol("b")),
Expand Down Expand Up @@ -220,7 +250,7 @@ public void testMultiplePartitionsAndColumns()

@Test
public void testCreateSingleColumn()
throws ExecutionException, InterruptedException
throws Exception
{
SubPlan subplan = subplan(
"SELECT count() FROM lineitem, orders WHERE lineitem.orderkey = orders.orderkey " +
Expand Down Expand Up @@ -265,7 +295,7 @@ public void testCreateDistributedJoin()

@Test
public void testCreateMultipleCriteria()
throws ExecutionException, InterruptedException
throws Exception
{
SubPlan subplan = subplan(
"SELECT count() FROM lineitem, partsupp " +
Expand Down Expand Up @@ -294,7 +324,7 @@ public void testCreateMultipleCriteria()

@Test
public void testCreateMultipleJoins()
throws ExecutionException, InterruptedException
throws Exception
{
SubPlan subplan = subplan(
"SELECT count() FROM lineitem, orders, part " +
Expand All @@ -319,7 +349,7 @@ public void testCreateMultipleJoins()

@Test
public void testCreateProbeSideUnion()
throws ExecutionException, InterruptedException
throws Exception
{
SubPlan subplan = subplan(
"WITH union_table(key) AS " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,23 @@ public void testBroadcastJoinWithEmptyBuildSide()
TupleDomain.none());
}

@Test(timeOut = 30_000)
public void testJoinWithLargeBuildSide()
{
assertQueryDynamicFilters(
"SELECT * FROM lineitem JOIN tpch.tiny.orders ON lineitem.orderkey = orders.orderkey",
TupleDomain.all());
}

@Test(timeOut = 30_000)
public void testBroadcastJoinWithLargeBuildSide()
{
assertQueryDynamicFilters(
withBroadcastJoin(),
"SELECT * FROM lineitem JOIN tpch.tiny.orders ON lineitem.orderkey = orders.orderkey",
TupleDomain.all());
}

@Test(timeOut = 30_000)
public void testJoinWithSelectiveBuildSide()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ public void testBroadcastJoinWithEmptyBuildSide()
// for broadcast joins lazy dynamic filters are non blocking
}

@Test(enabled = false)
@Override
public void testBroadcastJoinWithLargeBuildSide()
{
// for broadcast joins lazy dynamic filters are non blocking
}

private class TestPlugin
implements Plugin
{
Expand Down