Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Support of Adaptive Optimization with Presto Unlimited #14675

Merged
merged 2 commits into from
Jul 15, 2020

Conversation

pguofb
Copy link
Contributor

@pguofb pguofb commented Jun 18, 2020

Background: Presto Unlimited will materialize exchange outputs into temporary tables (see #12387) , which creates an opportunity to invoke CBO during runtime on later stages based on temporary table statistics generated by previous stages. This will result in more reliable optimizations for complex queries whose later stages often have less accurate estimated statistics.

Specifically, this PR achieves the following goals.

  1. Enable table and column statistics to be collected for temporary tables, and enable the statistics to be correctly fetched and processed by statsCalculators.
  2. Create an iterative optimization rule that can leverage temporary table statistics to calculate statistics for the probe and build side of a Join node and swap them when the build side is larger.
  3. Make CBO invokable at LegacySqlQueryScheduler and optimize the plan right before scheduling and actual execution.

Basic testing:

  • In TestHiveIntegrationSmokeTest::testMaterializedPartitioning test suite, the rule correctly captures two queries 1 and 2 where build side is larger than probe side, and invoking CBO did not affect query result correctness.
  • Will add more unit tests.

== NO RELEASE NOTE ==

Copy link
Contributor

@rschlussel rschlussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still reviewing the last three commits, but if the first two commits are ready sooner, we can merge them first.


Map<PlanFragment, PlanFragment> oldToNewFragment = stream(forTree(StreamingSubPlan::getChildren).depthFirstPreOrder(section.getPlan()))
// filter leaf stages
.filter(plan -> plan.getChildren().isEmpty())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally all this logic to filter relevant plans should go into the optimizer rule

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I thought about moving all the filtering logic inside the rule, but my concern is it seems not possible to easily tell from the iterative optimizer output if the returned plan is actually changed or not. If we are not able to tell, then for each ready section, we will waste time rebuilding the whole section and stageExecution/scheduler etc. even the optimizer rule is not fired at all.

@@ -91,6 +91,12 @@ public static HiveBasicStatistics reduce(HiveBasicStatistics first, HiveBasicSta

public static Map<String, HiveColumnStatistics> merge(Map<String, HiveColumnStatistics> first, Map<String, HiveColumnStatistics> second)
{
// To correctly merge statistics during temporary table finish insertion. When ``first'' have exactly the same columns as ``second'' but all empty statistics,
// then the ``first'' is the placeholder empty statistics left at temporary table creation and safe to directly return second.
if (first.values().stream().allMatch(statistics -> statistics.equals(HiveColumnStatistics.empty()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arhimondr can you check this logic?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about instead of adding this logic we set all statistics to 0 when creating a temporary table?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@arhimondr arhimondr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First two commits:

Fix StatsUtil function ...
Enable statistics aggregation in temp

Some comments

Please make sure the commit message complies with our commit message style guidelines:
https://github.com/prestodb/presto/wiki/Review-and-Commit-guidelines#commit-formatting-and-pull-requests
https://chris.beams.io/posts/git-commit/

More specifically the commit summary should be ~50 characters. You can have mode descriptive message in the body. For example:

Enable statistics collection for temporary tables

Enable column level statistics collection when writing intermediate data
to temporary table for materialized exchanges. Collected statistics will
be used to automatically change join order in runtime.

@@ -91,6 +91,12 @@ public static HiveBasicStatistics reduce(HiveBasicStatistics first, HiveBasicSta

public static Map<String, HiveColumnStatistics> merge(Map<String, HiveColumnStatistics> first, Map<String, HiveColumnStatistics> second)
{
// To correctly merge statistics during temporary table finish insertion. When ``first'' have exactly the same columns as ``second'' but all empty statistics,
// then the ``first'' is the placeholder empty statistics left at temporary table creation and safe to directly return second.
if (first.values().stream().allMatch(statistics -> statistics.equals(HiveColumnStatistics.empty()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about instead of adding this logic we set all statistics to 0 when creating a temporary table?

@@ -91,6 +91,12 @@ public static HiveBasicStatistics reduce(HiveBasicStatistics first, HiveBasicSta

public static Map<String, HiveColumnStatistics> merge(Map<String, HiveColumnStatistics> first, Map<String, HiveColumnStatistics> second)
{
// To correctly merge statistics during temporary table finish insertion. When ``first'' have exactly the same columns as ``second'' but all empty statistics,
// then the ``first'' is the placeholder empty statistics left at temporary table creation and safe to directly return second.
if (first.values().stream().allMatch(statistics -> statistics.equals(HiveColumnStatistics.empty()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arhimondr
Copy link
Member

@rschlussel @pguofb

It feels like on the high level collecting the stats for each and ever column is a little overkill. Given that the planner does the pruning of all unnecessary columns it is certain that all the columns will be used. Ideally we should simply collect the overall size for all the inputs and use it as an input for the CBO algorithm. It will not give the best results in case there are more nodes before the JoinNode, but historically we've seen that the estimates are not very reliable anyway. Thus I'm not very confident if it makes sense to pay the cost of collecting the detailed stats that are much more expensive, as usually there will be no additional nodes before the join, and if there are - we are not very confident in the estimates anyway.

Anyway, changing this will go beyond the scope of this project. But it feels like it is something we should consider doing in the feature.

@pguofb pguofb force-pushed the milestone1 branch 4 times, most recently from 71d7387 to 369b6dd Compare June 25, 2020 19:45
Copy link
Member

@arhimondr arhimondr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enable statistics aggregation for temporary table.

LGTM

@aweisberg
Copy link
Contributor

Would this benefit from integration tests for SqlQueryScheduler and LegacySqlQueryScheduler?

@pguofb
Copy link
Contributor Author

pguofb commented Jul 1, 2020

Would this benefit from integration tests for SqlQueryScheduler and LegacySqlQueryScheduler?

I suppose integration test could further verify the correctness of invoking runtime CBO. Ideally, we might also want to use some queries in prod that are currently ill-optimized to see if CBO can fix the join sides in runtime.

Copy link
Member

@arhimondr arhimondr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments

@@ -591,7 +706,7 @@ public BasicStageExecutionStats getBasicStageStats()
public StageInfo getStageInfo()
{
ListMultimap<StageId, SqlStageExecution> stageExecutions = getStageExecutions();
return buildStageInfo(plan, stageExecutions);
return buildStageInfo(plan.get(), stageExecutions);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is likely to break statistics

CC: @rschlussel

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pguofb is looking at that next. Right now the info in the live plan will be incorrect, but the final plan in the querycompletedevent and the stage info will be correct.

@pguofb pguofb force-pushed the milestone1 branch 2 times, most recently from e3b6cd8 to d03265a Compare July 6, 2020 15:07
@wenleix
Copy link
Contributor

wenleix commented Jul 6, 2020

Nit about commit header: No period at the end :). See the seven rules: https://chris.beams.io/posts/git-commit/#seven-rules 😃

@wenleix wenleix changed the title Invoke CBO in LegacySqlQueryScheduler based on temporary table statistics Initial Support of Adaptive Optimization with Presto Unlimited Jul 6, 2020
Copy link
Contributor

@wenleix wenleix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Add a session property for runtime optimizer". LGTM. Note "adaptive optimizer" is probably a more sexy name than "runtime optimizer". 😃

Copy link
Contributor

@wenleix wenleix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Invoke CBO at SqlQueryScheduler for Join Swapping". Generally looks good to me. I will take a separate look into RuntimeReorderJoinSides later. But don't be blocked on my pass if you think it's ready to merge.

Just to confirm my understanding: when adaptive optimization kicked in legacy scheduler, the logic of creating new sections actually leverage what implements in the new scheduler -- this makes sense since section execution abstraction is only introduced in new scheduler, and it will be much more difficult to do adaptive execution in legacy scheduler.

Now here is an interesting note, when legacy scheduler works together with adaptive execution, the behavior will be a mix of "legacy" and "new" scheduler right? i.e. the code path follows legacy scheduler when adaptive execution hasn't kicked in. But once adaptive execution kicked, the code logic is more like the section retry in the new scheduler. 😃

cc @rschlussel

outputBuffers = createDiscardingOutputBuffers();
locationsConsumer = (fragmentId, tasks, noMoreExchangeLocations) -> {};
}
SectionExecution sectionExecution = sectionExecutionFactory.createSectionExecutions(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rschlussel : Is SectionExecutionFactory introduced for the new sql query scheduler? So essentially we are calling something in the new scheduler from the legacy scheduler? ;)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sectionexecutionfactory is already used in the legacySqlQueryScheduler (and the logic was originally extracted from there)

remoteTaskFactory,
splitSourceFactory,
0);
addStateChangeListeners(sectionExecution);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to remove the state change listeners for old sections?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I thought about this. When we created new stageExecutions and replace the old ones in this.stageExecutions, these old stageExecutions will have nothing referencing to them and will be garbage collected eventually without getting triggered. Therefore, I'm not too worried about explicitly removing listeners. Besides, stateMachine class also does not provide methods to explicitly remove listeners, so I think it is ok to trust on the GC to do its own job :)

Map<StageId, StageExecutionAndScheduler> updatedStageExecutions = sectionExecution.getSectionStages().stream()
.collect(toImmutableMap(execution -> execution.getStageExecution().getStageExecutionId().getStageId(), identity()));
synchronized (this) {
stageExecutions.putAll(updatedStageExecutions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: do we need to remove old stage executions from stageExecutions ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Samewise, stageExecutions is a map from StageId -> StageExecutionAndScheduler. The rewritten stages have the same id from the old one (a bit different from retry because we optimize them right before we schedule & execute them), and so we actually replace the old ones, not simply inserting a batch of new ones.

@wenleix
Copy link
Contributor

wenleix commented Jul 6, 2020

two high level questions:

  1. In the query completion event, will the adapted plan being reported, or the original plan? (looks like original plan will be reported?)
  2. Do we plan to have field in query completion event to indicate if the query get adapted? -- this helps understand the impact in production, verifier run, etc .

@pguofb
Copy link
Contributor Author

pguofb commented Jul 6, 2020

two high level questions:

  1. In the query completion event, will the adapted plan being reported, or the original plan? (looks like original plan will be reported?)
  2. Do we plan to have field in query completion event to indicate if the query get adapted? -- this helps understand the impact in production, verifier run, etc .

Actually, according to my project plan, the next step (after enable runtime join swapping) is to fix the statistics show-up in the UI. And, I'm currently starting to look at this part.

  1. So far, I checked the UI that Stage Performance (zoom into per-stage details) and JSON page show the correct (swapped) statistics. But the Live Plan page (showing the overall SubPlan) is not updated to the swapped one, even though we updated this.plan every time we adapted a stage via updatePlan function.

  2. We have not decided a concrete plan on modifying the query completion event, but ideally we definitely want to reflect whether runtime adaptation happens.

@pguofb
Copy link
Contributor Author

pguofb commented Jul 10, 2020

The latest commit fixes the broken statistics in LivePlan. Now, both the the Query Completion Event as well as the Live Plan shown during runtime (essentially via periodically issuing a http GET to QueryResource.getQueryInfo fetch latest queryInfo and render) will reflect the correct plan information and statistics.

@@ -190,6 +196,7 @@ private LegacySqlQueryScheduler(
this.queryStateMachine = requireNonNull(queryStateMachine, "queryStateMachine is null");
this.plan.compareAndSet(null, requireNonNull(plan, "plan is null"));
this.session = requireNonNull(session, "session is null");
this.metadata = requireNonNull(metadata, "metadata is null");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just get the functionmanager from the metadata since that's all you need (and same below)

@rschlussel
Copy link
Contributor

@wenleix @arhimondr did you have any other comments or concerns?

@wenleix
Copy link
Contributor

wenleix commented Jul 10, 2020

@pguofb :

So far, I checked the UI that Stage Performance (zoom into per-stage details) and JSON page show the correct (swapped) statistics. But the Live Plan page (showing the overall SubPlan) is not updated to the swapped one, even though we updated this.plan every time we adapted a stage via updatePlan function.

Sounds good. What about the plan json in the query completion event? :)

Update: sorry , just see you latest comment (#14675 (comment)) . Nice work!

@wenleix
Copy link
Contributor

wenleix commented Jul 10, 2020

thanks @rschlussel and @pguofb . I have no other comments :)

Copy link
Member

@arhimondr arhimondr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % nits

private final SectionExecutionFactory sectionExecutionFactory;
private final RemoteTaskFactory remoteTaskFactory;
private final SplitSourceFactory splitSourceFactory;
private static final Logger log = Logger.get(LegacySqlQueryScheduler.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Usually we define log as a very first field in the class and separate it from the other fields

public class RuntimeReorderJoinSides
implements Rule<JoinNode>
{
private final Logger log = Logger.get(RuntimeReorderJoinSides.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private static final

swapped.getRightHashVariable(),
swapped.getDistributionType());

log.debug("Probe size: " + leftOutputSizeInBytes + " is smaller than Build size: " + rightOutputSizeInBytes + " => invoke runtime join swapping on JoinNode ID: " + newJoinNode.getId());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use pattern Probe size: %s is smaller than build size: %s => ...

@pguofb pguofb force-pushed the milestone1 branch 3 times, most recently from 45f9a57 to 8752111 Compare July 15, 2020 14:43
@rschlussel
Copy link
Contributor

Looks great! can you squash the last 4 commits together?

SubPlan fragmentedPlan = planFragmenter.createSubPlans(stateMachine.getSession(), plan, false, idAllocator, stateMachine.getWarningCollector());
// the variableAllocator is finally passed to SqlQueryScheduler for runtime cost-based optimizations
variableAllocator.set(new PlanVariableAllocator(plan.getTypes().allVariables()));
SubPlan fragmentedPlan = planFragmenter.createSubPlans(stateMachine.getSession(), plan, false, idAllocator, variableAllocator.get(), stateMachine.getWarningCollector());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to pass the variableAllocator here - createSubPlans creates a variableallocator in the exact same way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, createSubPlans internally create a variableAllocator this way, but we want this variableAllocator to be exposed after creating the subplans, and then use it for runtime optimizers. Therefore, we moved the creation outside here, pass it in as an argument, and later on can feed it in SqlQueryScheduler.

@pguofb pguofb force-pushed the milestone1 branch 2 times, most recently from cad2c36 to e60d5a3 Compare July 15, 2020 16:36
- Pass in CBO and make it invokable at [Legacy/]SqlQueryScheduler
during runtime.
- Rename get() method of PlanOptimizers to
getPlanningTimeOptimizers() to distinguish getRuntimeOptimizers()
- Adjust IterativeOptimizer optimize() function to return the original
plan when the plan is not changed, instead of always calling
memo.extract() to rebuild a new one.
- Create a join swapping rule (RuntimeReorderJoinSides) based on
the probe and build side statistics, and adjust the local exchange
when necessary (add at build side, remove at probe side).
- Rebuild the section, re-generate stageExecutionAndSchedulers of the
section, and adjust the overall subplan when the join is swapped to
reflect the correct statistics to web UI and QueryCompletionEvent.
@rschlussel rschlussel merged commit 0890b3d into prestodb:master Jul 15, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants