Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adaptive planning framework in FTE #20276

Merged
merged 6 commits into from
Mar 12, 2024

Conversation

gaurav8297
Copy link
Member

@gaurav8297 gaurav8297 commented Jan 4, 2024

Description

Adaptive planning is part of FTE, wherein the engine can modify the plan at runtime based on exchange-level statistics. For example, reordering of join or mitigation of skewness. It will significantly impact cost and performance if the plan chosen by the static optimiser isn’t the best due to the underestimation of statistics or lack of statistics.

Included in PR:

  • Optimizer framework for Adaptive Planning.
  • Migrated the first rule for adaptive partitioning to the Adaptive Planning framework.

High Level Design

Available Runtime Stats (at exchange level)

  • Row count
  • The output data size for all partitions

RemoteSourceNode Stats Rule:

  • A new stats rule that takes estimated stats from source stages and updates the rowCount and averageRowSize based on runtime statistics if available.
  • This rule is needed during replanning for stats calculation.

Replanning Steps:

  • Replanning will get triggered whenever a stage is finished, and the FTE scheduler has run-time statistics available for that stage.
  • At replanning, the engine will first merge all SubPlans into a single PlanNode where RemoteSourceNode for stages that haven’t finished will get replaced with Remote Exchanges. On the other hand, RemoteSourceNode for finished stages will remain as it is in the plan.
  • Once we have a merged plan which contains all the unfinished parts, we will reoptimize it using a set of PlanOptimizers.
  • During re-optimization, it is possible that some new exchanges need to be added due to the change in partitioning strategy. For instance, if a rule changes the distribution type of the join from BROADCAST to PARTITIONED.
  • It is also possible that some remote exchanges are removed. For example, while changing the order of the join.
  • Ultimately, the planner will fragment the optimized PlanNode again and generate the SubPlans with new PlanFragmentIds.
  • Ultimately, the planner will fragment the optimized PlanNode again and generate the SubPlans with new PlanFragmentIds. The re-fragmentation will only happen if the old plan and the new plan have some differences. To check these differences, we rely on PlanOptimizer#optimizeAndReturnChanges API which also returns changes in plan ids.

Note: We do not change the fragment ids which have no changes and are not downstream of the changed plan nodes. This optimization is done to avoid unnecessary stage restarts due to speculative execution.

Explain Analyze:

  • In case, some adaptive optimizations get triggered, the output of EXPLAIN ANALYZE will show both the old and new plans.

An example of adaptive partitioning, where the partition count changes from 50 to 1000.

Fragment 7 [HASH]                                                                                                                        >
     CPU: 22.49m, Scheduled: 49.02m, Blocked 2.87h (Input: 39.99m, Output: 0.00ns), Input: 7499989709 rows (62.86GB); per task: avg.: 5999>
     Output layout: [count_5]                                                                                                             >
     Output partitioning: SINGLE []                                                                                                       >
     Input partition count: 1000                                                                                                          >
     Aggregate[type = PARTIAL]                                                                                                            >
     │   Layout: [count_5:bigint]                                                                                                         >
     │   CPU: 8.63s (0.35%), Scheduled: 18.02s (0.27%), Blocked: 0.00ns (0.00%), Output: 1000 rows (8.79kB)                               >
     │   Input avg.: 5999989.71 rows, Input std.dev.: 22.01%                                                                              >
     │   count_5 := count(*)                                                                                                              >
     └─ InnerJoin[criteria = ("orderkey" = "orderkey_0"), distribution = PARTITIONED]                                                     >
        │   Layout: []                                                                                                                    >
        │   CPU: 19.39m (47.06%), Scheduled: 41.60m (37.94%), Blocked: 1.57h (58.30%), Output: 5999989709 rows (0B)                       >
        │   Left (probe) Input avg.: 5999989.71 rows, Input std.dev.: 22.01%                                                              >
        │   Right (build) Input avg.: 1500000.00 rows, Input std.dev.: 0.08%                                                              >
        │   Distribution: PARTITIONED                                                                                                     >
        ├─ AdaptivePlan[]                                                                                                                 >
        │  │   Layout: [orderkey:bigint]                                                                                                  >
        │  ├─ [Initial Plan] RemoteSource[sourceFragmentIds = [3]]                                                                        >
        │  │      Layout: [orderkey:bigint]                                                                                               >
        │  │      CPU: 6.45m (15.67%), Scheduled: 16.97m (15.48%), Blocked: 2.53m (1.57%), Output: 5999989709 rows (50.29GB)              >
        │  │      Input avg.: 14999974.27 rows, Input std.dev.: 12.86%                                                                    >
        │  └─ [Current Plan] RemoteSource[sourceFragmentIds = [8]]                                                                        >
        │         Layout: [orderkey:bigint]                                                                                               >
        │         CPU: 49.21s (1.99%), Scheduled: 2.31m (2.11%), Blocked: 20.45m (12.67%), Output: 5999989709 rows (50.29GB)              >
        │         Input avg.: 5999989.71 rows, Input std.dev.: 22.01%                                                                     >
        └─ LocalExchange[partitioning = HASH, arguments = ["orderkey_0"]]                                                                 >
           │   Layout: [orderkey_0:bigint]                                                                                                >
           │   CPU: 57.05s (2.31%), Scheduled: 1.93m (1.76%), Blocked: 23.04m (14.28%), Output: 1500000000 rows (12.57GB)                 >
           │   Input avg.: 1500000.00 rows, Input std.dev.: 39.40%                                                                        >
           └─ AdaptivePlan[]                                                                                                              >
              │   Layout: [orderkey_0:bigint]                                                                                             >
              ├─ [Initial Plan] RemoteSource[sourceFragmentIds = [4]]                                                                     >
              │      Layout: [orderkey_0:bigint]                                                                                          >
              │      CPU: 2.14m (5.19%), Scheduled: 5.36m (4.89%), Blocked: 1.75m (1.08%), Output: 1500000000 rows (12.57GB)              >
              │      Input avg.: 7500000.00 rows, Input std.dev.: 17.35%                                                                  >
              └─ [Current Plan] RemoteSource[sourceFragmentIds = [9]]                                                                     >
                     Layout: [orderkey_0:bigint]                                                                                          >
                     CPU: 1.18m (2.87%), Scheduled: 2.72m (2.48%), Blocked: 19.54m (12.11%), Output: 1500000000 rows (12.57GB)            >
                     Input avg.: 1500000.00 rows, Input std.dev.: 39.40%

Fragment 8 [HASH]                                                                                                                        >
     CPU: 6.46m, Scheduled: 16.97m, Blocked 2.53m (Input: 2.53m, Output: 0.00ns), Input: 5999989709 rows (50.29GB); per task: avg.: 119999>
     Output layout: [orderkey]                                                                                                            >
     Output partitioning: HASH [orderkey]                                                                                                 >
     Output partition count: 1000                                                                                                         >
     RemoteSource[sourceFragmentIds = [3]]                                                                                                >
         Layout: [orderkey:bigint]                                                                                                        >
         CPU: 6.45m (15.67%), Scheduled: 16.97m (15.48%), Blocked: 2.53m (1.57%), Output: 5999989709 rows (50.29GB)                       >
         Input avg.: 14999974.27 rows, Input std.dev.: 12.86%                                                                             >
                                                                                         >
 Fragment 3 [SOURCE]                                                                                      >
     CPU: 8.12m, Scheduled: 30.37m, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 5999989709 rows (50.29GB); per task: avg.: 2542>
     Amount of input data processed by the workers for this stage might be skewed                                                         >
     Output layout: [orderkey]                                                                                                            >
     Output partitioning: HASH [orderkey]                                                                                                 >
     ScanFilter[table = hive:tpch_sf1000_orc:lineitem, dynamicFilters = {"orderkey" = #df_375}]                                           >
         Layout: [orderkey:bigint]                                                                                                        >
         Estimates: {rows: 5999989709 (50.29GB), cpu: 50.29G, memory: 0B, network: 0B}/{rows: 5999989709 (50.29GB), cpu: 50.29G, memory: 0>
         CPU: 8.12m (19.71%), Scheduled: 30.37m (27.70%), Blocked: 0.00ns (0.00%), Output: 5999989709 rows (50.29GB)                      >
         Input avg.: 2111185.68 rows, Input std.dev.: 22.98%                                                                              >
         orderkey := orderkey:bigint:REGULAR                                                                                              >
         Input: 5999989709 rows (50.29GB), Filtered: 0.00%, Physical input: 4.96GB, Physical input time: 21.81m                           >
         Dynamic filters:                                                                                                                 >
             - df_375, ALL, collection time=276.25ms

Additional context and related issues

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

# Section
* Fix some things. ({issue}`issuenumber`)

@cla-bot cla-bot bot added the cla-signed label Jan 4, 2024
@gaurav8297 gaurav8297 force-pushed the adaptive_planning_fte branch 2 times, most recently from 2694490 to 27234a6 Compare January 8, 2024 08:03
@gaurav8297 gaurav8297 changed the title Adaptive planning in FTE Adaptive planning framework in FTE Jan 8, 2024
@gaurav8297 gaurav8297 force-pushed the adaptive_planning_fte branch 3 times, most recently from b654a9c to d1c836f Compare January 10, 2024 12:10
@gaurav8297 gaurav8297 marked this pull request as ready for review January 10, 2024 12:11
@sopel39 sopel39 requested a review from losipiuk January 10, 2024 14:45
Copy link
Member

@losipiuk losipiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put some comments but I am a bit lost on main logic

Comment on lines 77 to 80
public boolean isStageFinished(PlanFragmentId planFragmentId)
{
return runtimeOutputStats.containsKey(planFragmentId);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this accurate? We can get output stats for stages which are still running and perform adaptive optimization based on those.
I did not get to that commit yet but it looks you can put stuff in runtimeOutputStats for stages which are running and stats are estimated via ESTIMATED_BY_PROGRESS

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline, add comment

.filter(planFragmentId -> stageExecutions.containsKey(getStageId(planFragmentId)))
.collect(toImmutableSet()));
if (isReadyForExecutionResult.isReadyForExecution()) {
return adaptivePlanner.optimize(plan, createStageRuntimeInfoProvider());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why return here after we replan single stage. What about the others?

return root;
}

PlanNode adaptivePlan = addAdaptivePlanNode(idAllocator, initialPlan, optimizedPlan);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have hard time understanding this - can you explain why we need extra plan node here?
Maybe code comment?

@losipiuk
Copy link
Member

I put some comments but I am a bit lost on main logic

A bit more clearer after I read PR description (shoul have done that at the beginning I guess). Will get back to reading in a while.

@gaurav8297 gaurav8297 force-pushed the adaptive_planning_fte branch 8 times, most recently from 72fecaa to 74ab6f2 Compare January 28, 2024 22:11
@gaurav8297
Copy link
Member Author

Taking a look at failed CI

@sopel39
Copy link
Member

sopel39 commented Jan 29, 2024

@gaurav8297 Could you extract prefix commits as separate PRs? They should land much quicker

@gaurav8297 gaurav8297 force-pushed the adaptive_planning_fte branch 2 times, most recently from 6859c62 to 8b6513b Compare January 30, 2024 18:33
@gaurav8297 gaurav8297 force-pushed the adaptive_planning_fte branch 2 times, most recently from fbb8325 to b0d1c4d Compare January 31, 2024 08:34
@gaurav8297
Copy link
Member Author

CI Issue: #16315

TypeProvider types,
Set<PlanNodeId> changedPlanNodes)
{
if (changedPlanNodes.contains(optimizedPlanNode.getId())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we testing changedPlanNodes for optimizedPlanNode.getId. Should be initialPlan.getId(), right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be optimizedPlanNode since it is possible that new nodes (new planNodeIds) are added to the optimized plan, and they are not part of initialPlan. However, we need to add an adaptive plan node above that.

Copy link
Member Author

@gaurav8297 gaurav8297 Mar 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a test for this case?


import static java.util.Objects.requireNonNull;

public class AdaptivePlanNode
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can be many Visitors in the code which would not support this one.
E.g. UnaliasSymbolReferences.Visitor - but it is just a random find. How are we sure no important ones are missing?

Copy link
Member Author

@gaurav8297 gaurav8297 Mar 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnaliasSymbolReferences -> I don't think we need to handle AdaptivePlanNode in this visitor since it is used during static planning. The same goes for other similar kind of visitors. However, for visitors that are used during execution time, we need to handle AdaptivePlanNode there. @losipiuk

@@ -508,13 +559,28 @@ else if (exchange.getType() == ExchangeNode.Type.REPARTITION) {
isWorkerCoordinatorBoundary(context.get(), childrenProperties.build()) ? getRetryPolicy(session) : RetryPolicy.NONE);
}

private SubPlan buildSubPlan(PlanNode node, FragmentProperties properties, RewriteContext<FragmentProperties> context)
private SubPlan buildSubPlan(PlanNode node, FragmentIdentifier fragmentIdentifier, FragmentProperties properties, RewriteContext<FragmentProperties> context)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dihotomy of FragmentIdentifier and PlanFragmentId is confusing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also why do we need to rebuild fragment for "unchanged" ones. Would it be more logical to cache whole unchanged SubPlan and return here instead of rebuilding?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added caching

This rule is only applicable for FTE where we
have exchange runtime statistics. We will use
this rule in adaptive re-planning.
We will require this while re-planning if there
are more exchanges added to the existing stage plan.
@gaurav8297 gaurav8297 force-pushed the adaptive_planning_fte branch 2 times, most recently from bea3729 to 0c4fdb1 Compare March 11, 2024 03:50
@losipiuk
Copy link
Member

LGTM. Thanks

@losipiuk losipiuk merged commit f679565 into trinodb:master Mar 12, 2024
97 of 98 checks passed
@github-actions github-actions bot added this to the 441 milestone Mar 12, 2024
@colebow
Copy link
Member

colebow commented Mar 13, 2024

Does this need a release note? Or is it non-user impacting at current moment due to just being a framework?

@losipiuk
Copy link
Member

losipiuk commented Mar 14, 2024

It does not i think- it refactors a feature which was handcoded to use the new framework added by this PR - but from user perspective it is mostly not visible.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

None yet

5 participants