Skip to content

Commit

Permalink
Support lazy dynamic filtering at memory connector
Browse files Browse the repository at this point in the history
  • Loading branch information
rzeyde-varada committed Aug 22, 2020
1 parent ae0dc5d commit ba93c07
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 31 deletions.
Expand Up @@ -22,6 +22,7 @@ public class MemoryConfig
{
private int splitsPerNode = Runtime.getRuntime().availableProcessors();
private DataSize maxDataPerNode = DataSize.of(128, DataSize.Unit.MEGABYTE);
private boolean enableLazyDynamicFiltering = true;

@NotNull
public int getSplitsPerNode()
Expand All @@ -48,4 +49,16 @@ public MemoryConfig setMaxDataPerNode(DataSize maxDataPerNode)
this.maxDataPerNode = maxDataPerNode;
return this;
}

public boolean isEnableLazyDynamicFiltering()
{
return enableLazyDynamicFiltering;
}

@Config("memory.enable-lazy-dynamic-filtering")
public MemoryConfig setEnableLazyDynamicFiltering(boolean enableLazyDynamicFiltering)
{
this.enableLazyDynamicFiltering = enableLazyDynamicFiltering;
return this;
}
}
Expand Up @@ -13,7 +13,6 @@
*/
package io.prestosql.plugin.memory;

import com.google.common.collect.ImmutableList;
import io.prestosql.spi.Page;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorPageSource;
Expand All @@ -22,6 +21,7 @@
import io.prestosql.spi.connector.ConnectorSplit;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.connector.DynamicFilter;
import io.prestosql.spi.connector.FixedPageSource;
import io.prestosql.spi.predicate.Domain;
import io.prestosql.spi.predicate.TupleDomain;
Expand All @@ -32,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.concurrent.CompletableFuture;

import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
Expand All @@ -40,11 +41,13 @@ public final class MemoryPageSourceProvider
implements ConnectorPageSourceProvider
{
private final MemoryPagesStore pagesStore;
private final boolean enableLazyDynamicFiltering;

@Inject
public MemoryPageSourceProvider(MemoryPagesStore pagesStore)
public MemoryPageSourceProvider(MemoryPagesStore pagesStore, MemoryConfig config)
{
this.pagesStore = requireNonNull(pagesStore, "pagesStore is null");
this.enableLazyDynamicFiltering = config.isEnableLazyDynamicFiltering();
}

@Override
Expand All @@ -54,7 +57,7 @@ public ConnectorPageSource createPageSource(
ConnectorSplit split,
ConnectorTableHandle table,
List<ColumnHandle> columns,
TupleDomain<ColumnHandle> dynamicFilter)
DynamicFilter dynamicFilter)
{
MemorySplit memorySplit = (MemorySplit) split;
long tableId = memorySplit.getTable();
Expand All @@ -64,14 +67,6 @@ public ConnectorPageSource createPageSource(
MemoryTableHandle memoryTable = (MemoryTableHandle) table;
OptionalDouble sampleRatio = memoryTable.getSampleRatio();

if (dynamicFilter.isNone()) {
return new FixedPageSource(ImmutableList.of());
}
Map<Integer, Domain> domains = dynamicFilter
.transform(columns::indexOf)
.getDomains()
.get();

List<Integer> columnIndexes = columns.stream()
.map(MemoryColumnHandle.class::cast)
.map(MemoryColumnHandle::getColumnIndex).collect(toList());
Expand All @@ -83,12 +78,85 @@ public ConnectorPageSource createPageSource(
expectedRows,
memorySplit.getLimit(),
sampleRatio);
return new FixedPageSource(pages.stream()
.map(page -> applyFilter(page, domains))
.collect(toList()));

return new DynamicFilteringPageSource(new FixedPageSource(pages), columns, dynamicFilter, enableLazyDynamicFiltering);
}

private static class DynamicFilteringPageSource
implements ConnectorPageSource
{
private final FixedPageSource delegate;
private final List<ColumnHandle> columns;
private final DynamicFilter dynamicFilter;
private final boolean enableLazyDynamicFiltering;

private DynamicFilteringPageSource(FixedPageSource delegate, List<ColumnHandle> columns, DynamicFilter dynamicFilter, boolean enableLazyDynamicFiltering)
{
this.delegate = delegate;
this.columns = columns;
this.dynamicFilter = dynamicFilter;
this.enableLazyDynamicFiltering = enableLazyDynamicFiltering;
}

@Override
public long getCompletedBytes()
{
return delegate.getCompletedBytes();
}

@Override
public long getReadTimeNanos()
{
return delegate.getReadTimeNanos();
}

@Override
public boolean isFinished()
{
return delegate.isFinished();
}

@Override
public Page getNextPage()
{
if (enableLazyDynamicFiltering && !dynamicFilter.isComplete()) {
return null;
}
TupleDomain<ColumnHandle> predicate = dynamicFilter.getCurrentPredicate();
if (predicate.isNone()) {
close();
return null;
}
Page page = delegate.getNextPage();
if (page != null) {
page = applyFilter(page, predicate.transform(columns::indexOf).getDomains().get());
}
return page;
}

@Override
public CompletableFuture<?> isBlocked()
{
if (enableLazyDynamicFiltering) {
return dynamicFilter.isBlocked();
}
return NOT_BLOCKED;
}

@Override
public long getSystemMemoryUsage()
{
return delegate.getSystemMemoryUsage();
}

@Override
public void close()
{
delegate.close();
}
}

private Page applyFilter(Page page, Map<Integer, Domain> domains)
private static Page applyFilter(Page page, Map<Integer, Domain> domains)
{
int[] positions = new int[page.getPositionCount()];
int length = 0;
Expand All @@ -100,7 +168,7 @@ private Page applyFilter(Page page, Map<Integer, Domain> domains)
return page.getPositions(positions, 0, length);
}

private boolean positionMatchesPredicate(Page page, int position, Map<Integer, Domain> domains)
private static boolean positionMatchesPredicate(Page page, int position, Map<Integer, Domain> domains)
{
for (Map.Entry<Integer, Domain> entry : domains.entrySet()) {
int channel = entry.getKey();
Expand Down
Expand Up @@ -18,6 +18,7 @@
import io.prestosql.execution.QueryStats;
import io.prestosql.metadata.QualifiedObjectName;
import io.prestosql.operator.OperatorStats;
import io.prestosql.spi.QueryId;
import io.prestosql.sql.analyzer.FeaturesConfig;
import io.prestosql.testing.AbstractTestQueryFramework;
import io.prestosql.testing.DistributedQueryRunner;
Expand All @@ -43,6 +44,10 @@
public class TestMemorySmoke
extends AbstractTestQueryFramework
{
private static final long LINEITEM_COUNT = 60175;
private static final long ORDERS_COUNT = 15000;
private static final long PART_COUNT = 2000;

@Override
protected QueryRunner createQueryRunner()
throws Exception
Expand Down Expand Up @@ -86,8 +91,6 @@ public void testSelect()
@Test
public void testJoinDynamicFilteringNone()
{
final long buildSideRowsCount = 15_000L;
assertQueryResult("SELECT COUNT() FROM orders", buildSideRowsCount);
assertQueryResult("SELECT COUNT() FROM orders WHERE totalprice < 0", 0L);

Session session = Session.builder(getSession())
Expand All @@ -100,21 +103,32 @@ public void testJoinDynamicFilteringNone()
assertEquals(result.getResult().getRowCount(), 0);

// Probe-side is not scanned at all, due to dynamic filtering:
QueryStats stats = runner.getCoordinator().getQueryManager().getFullQueryInfo(result.getQueryId()).getQueryStats();
Set<Long> rowsRead = stats.getOperatorSummaries()
.stream()
.filter(summary -> summary.getOperatorType().equals("ScanFilterAndProjectOperator"))
.map(OperatorStats::getInputPositions)
.collect(toImmutableSet());
assertEquals(rowsRead, ImmutableSet.of(0L, buildSideRowsCount));
Set<Long> rowsRead = getOperatorRowsRead(runner, result.getQueryId());
assertEquals(rowsRead, ImmutableSet.of(0L, ORDERS_COUNT));
}

@Test
public void testJoinDynamicFilteringSingleValue()
public void testPartitionedJoinNoDynamicFiltering()
{
final long buildSideRowsCount = 15_000L;
assertQueryResult("SELECT COUNT() FROM orders WHERE totalprice < 0", 0L);

Session session = Session.builder(getSession())
.setSystemProperty(ENABLE_DYNAMIC_FILTERING, "true")
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, FeaturesConfig.JoinDistributionType.PARTITIONED.name())
.build();
DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner();
ResultWithQueryId<MaterializedResult> result = runner.executeWithQueryId(session, "SELECT * FROM lineitem JOIN orders " +
"ON lineitem.orderkey = orders.orderkey AND orders.totalprice < 0");
assertEquals(result.getResult().getRowCount(), 0);

// Probe-side is fully scanned, because local dynamic filtering does not work for partitioned joins:
Set<Long> rowsRead = getOperatorRowsRead(runner, result.getQueryId());
assertEquals(rowsRead, ImmutableSet.of(LINEITEM_COUNT, ORDERS_COUNT));
}

assertQueryResult("SELECT COUNT() FROM orders", buildSideRowsCount);
@Test
public void testJoinDynamicFilteringSingleValue()
{
assertQueryResult("SELECT COUNT() FROM orders WHERE comment = 'nstructions sleep furiously among '", 1L);
assertQueryResult("SELECT orderkey FROM orders WHERE comment = 'nstructions sleep furiously among '", 1L);
assertQueryResult("SELECT COUNT() FROM lineitem WHERE orderkey = 1", 6L);
Expand All @@ -129,13 +143,48 @@ public void testJoinDynamicFilteringSingleValue()
assertEquals(result.getResult().getRowCount(), 6);

// Probe-side is dynamically filtered:
QueryStats stats = runner.getCoordinator().getQueryManager().getFullQueryInfo(result.getQueryId()).getQueryStats();
Set<Long> rowsRead = stats.getOperatorSummaries()
Set<Long> rowsRead = getOperatorRowsRead(runner, result.getQueryId());
assertEquals(rowsRead, ImmutableSet.of(6L, ORDERS_COUNT));
}

@Test
public void testJoinDynamicFilteringBlockProbeSide()
{
Session session = Session.builder(getSession())
.setSystemProperty(ENABLE_DYNAMIC_FILTERING, "true")
.setSystemProperty(JOIN_REORDERING_STRATEGY, FeaturesConfig.JoinReorderingStrategy.NONE.name())
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, FeaturesConfig.JoinDistributionType.BROADCAST.name())
.build();
DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner();
// Wait for both build sides to finish before starting the scan of 'lineitem' table (should be very selective given the dynamic filters).
ResultWithQueryId<MaterializedResult> result = runner.executeWithQueryId(
session,
"SELECT l.comment" +
" FROM lineitem l, part p, orders o" +
" WHERE l.orderkey = o.orderkey AND o.comment = 'nstructions sleep furiously among '" +
" AND p.partkey = l.partkey AND p.comment = 'onic deposits'");
assertEquals(result.getResult().getRowCount(), 1);
assertEquals(getOperatorRowsRead(runner, result.getQueryId()), ImmutableSet.of(1L, ORDERS_COUNT, PART_COUNT));

// Make sure that a single join doesn't doesn't narrow lineitem to single row.
result = runner.executeWithQueryId(
session,
"SELECT l.comment" +
" FROM lineitem l, part p, orders o" +
" WHERE l.orderkey = o.orderkey AND o.comment = 'nstructions sleep furiously among '" +
" AND p.partkey = l.partkey AND p.comment = 'onic deposits'");
assertEquals(result.getResult().getRowCount(), 1);
assertEquals(getOperatorRowsRead(runner, result.getQueryId()), ImmutableSet.of(1L, ORDERS_COUNT, PART_COUNT));
}

private static Set<Long> getOperatorRowsRead(DistributedQueryRunner runner, QueryId queryId)
{
QueryStats stats = runner.getCoordinator().getQueryManager().getFullQueryInfo(queryId).getQueryStats();
return stats.getOperatorSummaries()
.stream()
.filter(summary -> summary.getOperatorType().equals("ScanFilterAndProjectOperator"))
.map(OperatorStats::getInputPositions)
.collect(toImmutableSet());
assertEquals(rowsRead, ImmutableSet.of(6L, buildSideRowsCount));
}

@Test
Expand Down

0 comments on commit ba93c07

Please sign in to comment.