Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow blocking page source until dynamic filters are ready #3414

Merged
merged 3 commits into from Aug 26, 2020

Conversation

rzeyde-varada
Copy link
Contributor

By default, we wait up to 1s for the build-side filters to be ready, to allow more efficient probe-side scan.
If the filters are not ready, we fallback to the previous behaviour (use current filters without blocking probe-side).

This feature can be configured using:

SET SESSION dynamic_filtering_probe_side_blocking_timeout='10s';

To disable, use:

SET SESSION dynamic_filtering_probe_side_blocking_timeout='0s';

@cla-bot cla-bot bot added the cla-signed label Apr 11, 2020
@rzeyde-varada
Copy link
Contributor Author

Using this feature, we can add (more) deterministic integration tests for dynamic filtering, e.g.:
https://github.com/prestosql/presto/blob/cb6ca89eca386cffb7c84933df5b22a786a03ee2/presto-memory/src/test/java/io/prestosql/plugin/memory/TestMemorySmoke.java#L95-L108

Copy link
Member

@martint martint left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial comments

Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea. However, I'm thinking that it should be the connector that decides whether to wait for dynamic filter and for how long. For example some connectors might be good producing data immediately since they have many splits to process (if few splits gets processed without dynamic filters it shouldn't be a big issue). Other connectors might have fixed and small amount of splits (e.g JDBC connectors) so it's preferred to wait for dynamic filter even much longer than 1 second global default. It could be huge difference if JDBC connector issues query with or without dynamic filter.
WDYT: @martint

@sopel39
Copy link
Member

sopel39 commented May 12, 2020

To be landed after: #1072

@rzeyde-varada
Copy link
Contributor Author

However, I'm thinking that it should be the connector that decides whether to wait for dynamic filter and for how long.

SGTM - I wasn't sure about adding a new connector API, so I've tried to add the minimal changes to the engine required by this feature.
I would be happy to adapt the PR to allow the connector to decide when to block and for how long.

@sopel39
Copy link
Member

sopel39 commented May 13, 2020

I would be happy to adapt the PR to allow the connector to decide when to block and for how long

This requires making DynamicFilter part of an SPI. We can still fallback to the old method with Supplier dynamic filter.

@rzeyde-varada
Copy link
Contributor Author

Rebased over latest master - to resolve a merge conflict.

@rzeyde-varada
Copy link
Contributor Author

Rebased over latest master to resolve a merge conflict.

@findepi
Copy link
Member

findepi commented Jun 19, 2020

Other connectors might have fixed and small amount of splits (e.g JDBC connectors) so it's preferred to wait for dynamic filter even much longer than 1 second global default. It could be huge difference if JDBC connector issues query with or without dynamic filter.

I was not following, so this could have been answered already, please forgive me question in that case.

JDBC connectors to not consume the dynamic filter in any way. They just provide RecordCursor and let the engine handle the heavy lifting.
How would DF help a JDBC connector?

Second, are we positive this will not impact low latency queries? Ultimately, sleeping 1s seems to be heuristic and there seems to be a potential of a regression?

@rzeyde-varada rzeyde-varada force-pushed the df-block-probe-side branch 3 times, most recently from 0e5e831 to 0e587ab Compare July 5, 2020 07:44
@rzeyde-varada
Copy link
Contributor Author

Updated the PR to use DynamicFilter in ConnectorPageSourceProvider.
Please re-review :)

Session session = Session.builder(getSession())
.setSystemProperty(ENABLE_DYNAMIC_FILTERING, "true")
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, FeaturesConfig.JoinDistributionType.BROADCAST.name())
.setSystemProperty(PUSH_PARTIAL_AGGREGATION_THROUGH_JOIN, "true")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have rewritten the test to use multi-join in order to verify the blocking behaviour:
https://github.com/prestosql/presto/pull/3414/files#diff-6102dfcf993241d1413b4fc63e4ca5a9R128
WDYT?

.setSystemProperty(PUSH_PARTIAL_AGGREGATION_THROUGH_JOIN, "true")
.build();
DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner();
ResultWithQueryId<MaterializedResult> result = runner.executeWithQueryId(session, "SELECT MAX(quantity) FROM lineitem JOIN orders " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add aggregation explicitly below join within query, e.g:

SELECT * FROM (SELECT MAX(quantity), orderkey FROM lineitem GROUP BY orderkey) JOIN orders on ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the suggestion - if I use the following query:

SELECT * 
FROM (SELECT MAX(quantity), orderkey FROM lineitem GROUP BY orderkey) l
JOIN orders 
ON   l.orderkey = orders.orderkey;

It results in a distributed join (due to a repartition operator between the TableScan of lineitem and MAX aggregation:

 Fragment 1 [HASH]                                                                                                                                                                                          
     Output layout: [max, orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment_10]                                                                            
     Output partitioning: SINGLE []                                                                                                                                                                         
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                          
     InnerJoin[("orderkey" = "orderkey_9")][$hashvalue_59, $hashvalue_60]                                                                                                                                   
     │   Layout: [orderkey:bigint, max:double, custkey:bigint, orderstatus:varchar(1), totalprice:double, orderdate:date, orderpriority:varchar(15), clerk:varchar(15), shippriority:integer, comment_10:var
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                                                                                                            
     │   Distribution: PARTITIONED                                                                                                                                                                          
     │   dynamicFilterAssignments = {orderkey_9 -> df_283}                                                                                                                                                  
     ├─ Project[]                                                                                                                                                                                           
     │  │   Layout: [orderkey:bigint, max:double, $hashvalue_59:bigint]                                                                                                                                     
     │  │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                                                                                                         
     │  │   $hashvalue_59 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("orderkey"), 0))                                                                                                       
     │  └─ Aggregate[orderkey]                                                                                                                                                                              
     │     │   Layout: [orderkey:bigint, max:double]                                                                                                                                                        
     │     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                                                                                                      
     │     │   max := max("quantity")                                                                                                                                                                       
     │     └─ LocalExchange[HASH][$hashvalue] ("orderkey")                                                                                                                                                  
     │        │   Layout: [orderkey:bigint, quantity:double, $hashvalue:bigint]                                                                                                                             
     │        │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}                                                                                                                                  
     │        └─ RemoteSource[2]                                                                                                                                                                            
     │               Layout: [orderkey:bigint, quantity:double, $hashvalue_57:bigint]                                                                                                                       
     └─ LocalExchange[HASH][$hashvalue_60] ("orderkey_9")                                                                                                                                                   
        │   Layout: [orderkey_9:bigint, custkey:bigint, orderstatus:varchar(1), totalprice:double, orderdate:date, orderpriority:varchar(15), clerk:varchar(15), shippriority:integer, comment_10:varchar(79
        │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}                                                                                                                                        
        └─ RemoteSource[3]                                                                                                                                                                                  
               Layout: [orderkey_9:bigint, custkey:bigint, orderstatus:varchar(1), totalprice:double, orderdate:date, orderpriority:varchar(15), clerk:varchar(15), shippriority:integer, comment_10:varchar

But then, local dynamic filtering doesn't take place - because it needs broadcast join...

.map(page -> applyFilter(page, domains))
.collect(toList()));

return new FixedPageSource(pages)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead, add a wrapping page source, e.g:

DynamicFilteringPageSource

that would have FixedPageSource as a delegate field

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/**
* May contains domains for dynamic filters for different table scans
* (e.g. in case of co-located joins).
*/
@GuardedBy("this")
private Map<Symbol, Domain> dynamicFilterDomainsResult = new HashMap<>();

// Each future blocks until its dynamic filter is collected.
@GuardedBy("this")
private Map<DynamicFilterId, SettableFuture<Void>> futures = new HashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This becomes very similar to DynamicFilterService from #4224.
Can we untangle these classes a bit?

  • LocalDynamicFiltersCollector should hold Map<DynamicFilterId, SettableFuture<Domain>> dynamicFilters

  • There should be method DynamicFilter LocalDynamicFiltersCollector#getDynamicFilter(List<DynamicFilters.Descriptor> dynamicFilters, Map<Symbol, ColumnHandle> columnHandles), similar as in DynamicFilterService

  • LocalDynamicFiltersCollector#addDynamicFilter(Map<Symbol, Domain> dynamicFilterDomains) would become addDynamicFilter(Map<DynamicFilterId, Domain> dynamicFilterDomains)

This would simplify LocalDynamicFilterConsumer as it wouldn't need to perform dynamic filter id -> symbol translation in io.prestosql.sql.planner.LocalDynamicFilterConsumer#convertTupleDomainForLocalFilters and some code could be reused between DynamicFilterService and LocalDynamicFiltersCollector

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a bit different refactoring - allowing a bit more efficient implementation of DynamicFilter, by caching the resulting predicate and the blocked future:
https://github.com/prestosql/presto/pull/3414/files#diff-fc014adc6614290fe6f8a48d7b0f6739
WDYT?

@rzeyde-varada
Copy link
Contributor Author

Many thanks for the review, will fix the issues and update the PR.

@rzeyde-varada rzeyde-varada force-pushed the df-block-probe-side branch 3 times, most recently from c9e984b to ae4a9e6 Compare July 12, 2020 12:06
@rzeyde-varada
Copy link
Contributor Author

Updated the PR to ae4a9e6 - please re-review :)

@rzeyde-varada
Copy link
Contributor Author

Ping :)

Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some comments until LocalDynamicFiltersCollector.java

@rzeyde-varada
Copy link
Contributor Author

Many thanks for the review, will fix soon.

Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some comments. Looks good overall.

@rzeyde-varada
Copy link
Contributor Author

I have addressed the comments above, and rebased the PR over the latest master (to resolve a merge conflict).

Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, minor comments + tests for large build side

@rzeyde-varada
Copy link
Contributor Author

Cherry-picked the commits from #4946, fixed the comments above and rebased.

@sopel39
Copy link
Member

sopel39 commented Aug 25, 2020

there is a conflict. Please rebase. I will merge tomorrow then

@rzeyde-varada
Copy link
Contributor Author

Rebased over latest master to resolve the conflict - many thanks!

// Iterate over dynamic filters that are collected (correspond to one of the futures), and required for filtering (correspond to one of the descriptors).
// It is possible that some dynamic filters are collected in a different stage - and will not available here.
// It is also possible that not all local dynamic filters are needed for this specific table scan.
Set<DynamicFilterId> filterIds = Sets.intersection(symbolsMap.keySet(), futures.keySet());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's convert that into stream:

symbolsMap.keySet().stream()
  .filter(filterId -> futures.keySet()::contains)
  .map(...)

@@ -1998,6 +1999,13 @@ private PhysicalOperation createLookupJoin(
Optional<Symbol> buildHashSymbol,
LocalExecutionPlanContext context)
{
// Register dynamic filters, allowing the scan operators to wait for the collection completion.
// Skip dynamic filters that are not used locally (e.g. in case of distributed joins).
Set<DynamicFilterId> localDynamicFilters = Sets.intersection(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's convert that into stream:

Set<DynamicFilterId> node.getDynamicFilters().keySet().stream()
  .filter(getConsumedDynamicFilterIds(probeNode)::contains)
  .collect(toImmutableSet())

@GuardedBy("this")
private int futuresLeft;

public TableSpecificDynamicFilter(List<ListenableFuture<TupleDomain<ColumnHandle>>> predicateFutures)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make the constructor private

return null;
}
Page page = delegate.getNextPage();
if (page != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skip filtering when for predicate.isAll()

We allow the connector to block according to dynamic filter collection
state. Some connectors would prefer to wait a bit, to benefit from
more selective scanning (using the collected dynamic filters).
@rzeyde-varada
Copy link
Contributor Author

Thanks!
Fixed the comments and force-pushed d5bb8c8.

@rzeyde-varada rzeyde-varada changed the title Block probe-side scan until build-side dynamic filtering is ready Allow blocking page source until dynamic filters are ready Aug 26, 2020
@sopel39 sopel39 merged commit 7fa21f6 into trinodb:master Aug 26, 2020
@sopel39
Copy link
Member

sopel39 commented Aug 26, 2020

merged, thanks!

@sopel39 sopel39 mentioned this pull request Aug 26, 2020
9 tasks
@rzeyde-varada
Copy link
Contributor Author

Much appreciated :)

@rzeyde-varada rzeyde-varada deleted the df-block-probe-side branch August 26, 2020 12:53
@martint martint added this to the 341 milestone Sep 9, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

None yet

4 participants