Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set optimizer.force-single-node-output to false by default #13217

Merged
merged 2 commits into from Jul 20, 2022

Conversation

arhimondr
Copy link
Contributor

Description

With this setting enabled the optimizer will insert an additional exchange
to make sure the coordinator always consumes query results from a single
task. The exchange client used by the coordinator can consume results from
as many tasks and stages as needed. The setting was introduced in 2017
by 6143485
as a transitional setting to migrate from introducing an additional
exchange but has never been set to false by default.

Is this change a fix, improvement, new feature, refactoring, or other?

Improvement

Is this a change to the core query engine, a connector, client library, or the SPI interfaces? (be specific)

Core engine

How would you describe this change to a non-technical end user or system administrator?

Non user visiblle

Related issues, pull requests, and links

6143485

Documentation

(x) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.

Release notes

( ) No release notes entries required.
(x) Release notes entries required with the following suggested text:

# Core
* Set optimizer.force-single-node-output to false by default

@cla-bot cla-bot bot added the cla-signed label Jul 18, 2022
@arhimondr arhimondr force-pushed the disable-force-single-node-output branch from b1d550f to 87056bb Compare July 18, 2022 23:16
@arhimondr arhimondr requested review from losipiuk and dain July 18, 2022 23:29
Stage execution could remain in the same SCHEDULING state while already
running some tasks and not transition the state further until more
tasks are scheduled.
With this setting enabled the optimizer will insert an additional exchange
to make sure the coordinator always consumes query results from a single
task. The exchange client used by the coordinator can consume results from
as many tasks and stages as needed. The setting was introduced in 2017
by trinodb@6143485
as a transitional setting to migrate from introducing an additional
exchange but has never been set to false by default.
@arhimondr arhimondr force-pushed the disable-force-single-node-output branch from 87056bb to 32d1f29 Compare July 19, 2022 02:18
@sopel39
Copy link
Member

sopel39 commented Jul 19, 2022

Will it cause coordinator to perform actual computations (e.g. final aggregations?). If so, then IMO we should keep it as is.

@findepi findepi requested a review from sopel39 July 19, 2022 11:02
@findepi
Copy link
Member

findepi commented Jul 19, 2022

Will it cause coordinator to perform actual computations (e.g. final aggregations?). If so, them IMO we should keep it as is.

I agree.

@arhimondr can you verify that after the change, no computations (aggregation, non-identity projection, filter, etc.) get scheduled on the coordinator?

@arhimondr
Copy link
Contributor Author

@sopel39 @findepi When the optimizer.force-single-node-output is set to true the optimizer ensures that query results are consumed always from a single node. For example:

trino:tiny> EXPLAIN (TYPE DISTRIBUTED) SELECT * FROM nation;
                                           Query Plan                                            
-------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]                                                                             
     Output layout: [nationkey, name, regionkey, comment]                                        
     Output partitioning: SINGLE []                                                              
     Output[columnNames = [nationkey, name, regionkey, comment]]                                 
     │   Layout: [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:varchar(152)]    
     │   Estimates: {rows: 25 (2.67kB), cpu: 2.67k, memory: 0B, network: 2.67kB}                 
     └─ RemoteSource[sourceFragmentIds = [1]]                                                    
            Layout: [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:varchar(152)] 
            Estimates:                                                                           
                                                                                                 
 Fragment 1 [SOURCE]                                                                             
     Output layout: [nationkey, name, regionkey, comment]                                        
     Output partitioning: SINGLE []                                                              
     TableScan[table = tpch:tiny:nation]                                                         
         Layout: [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:varchar(152)]    
         Estimates: {rows: 25 (2.67kB), cpu: 2.67k, memory: 0B, network: 0B}                     
         nationkey := tpch:nationkey                                                             
         regionkey := tpch:regionkey                                                             
         name := tpch:name                                                                       
         comment := tpch:comment                                                                 
                                                                                                 
                                                                                                 
(1 row)

However coordinator can consume query results from any number of nodes with no issues. Here's how the plan looks like for an identical query when optimizer.force-single-node-output is set to false:

trino:tiny> EXPLAIN (TYPE DISTRIBUTED) SELECT * FROM nation;
                                           Query Plan                                            
-------------------------------------------------------------------------------------------------
 Fragment 0 [SOURCE]                                                                             
     Output layout: [nationkey, name, regionkey, comment]                                        
     Output partitioning: SINGLE []                                                              
     Output[columnNames = [nationkey, name, regionkey, comment]]                                 
     │   Layout: [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:varchar(152)]    
     │   Estimates: {rows: 25 (2.67kB), cpu: 2.67k, memory: 0B, network: 0B}                     
     └─ TableScan[table = tpch:tiny:nation]                                                      
            Layout: [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:varchar(152)] 
            Estimates: {rows: 25 (2.67kB), cpu: 2.67k, memory: 0B, network: 0B}                  
            nationkey := tpch:nationkey                                                          
            regionkey := tpch:regionkey                                                          
            name := tpch:name                                                                    
            comment := tpch:comment                                                              
                                                                                                 
                                                                                                 
(1 row)

For queries with the top most stage of SINGLE distribution (such as an aggregation) the plan remains unchanged. Here is the plan when the optimizer.force-single-node-output is set to true:

trino:tiny> EXPLAIN (TYPE DISTRIBUTED) SELECT count(*) FROM nation;
                               Query Plan                               
------------------------------------------------------------------------
 Fragment 0 [SINGLE]                                                    
     Output layout: [count]                                             
     Output partitioning: SINGLE []                                     
     Output[columnNames = [_col0]]                                      
     │   Layout: [count:bigint]                                         
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}        
     │   _col0 := count                                                 
     └─ Aggregate[type = FINAL]                                         
        │   Layout: [count:bigint]                                      
        │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}     
        │   count := count("count_0")                                   
        └─ LocalExchange[partitioning = SINGLE]                         
           │   Layout: [count_0:bigint]                                 
           │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}  
           └─ RemoteSource[sourceFragmentIds = [1]]                     
                  Layout: [count_0:bigint]                              
                  Estimates:                                            
                                                                        
 Fragment 1 [SOURCE]                                                    
     Output layout: [count_0]                                           
     Output partitioning: SINGLE []                                     
     Aggregate[type = PARTIAL]                                          
     │   Layout: [count_0:bigint]                                       
     │   Estimates:                                                     
     │   count_0 := count(*)                                            
     └─ TableScan[table = tpch:tiny:nation]                             
            Layout: []                                                  
            Estimates: {rows: 25 (0B), cpu: 0, memory: 0B, network: 0B} 
                                                                        
                                                                        
(1 row)

Here is the plan when the optimizer.force-single-node-output is set to false:

trino:tiny> EXPLAIN (TYPE DISTRIBUTED) SELECT count(*) FROM nation;
                               Query Plan                               
------------------------------------------------------------------------
 Fragment 0 [SINGLE]                                                    
     Output layout: [count]                                             
     Output partitioning: SINGLE []                                     
     Output[columnNames = [_col0]]                                      
     │   Layout: [count:bigint]                                         
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}        
     │   _col0 := count                                                 
     └─ Aggregate[type = FINAL]                                         
        │   Layout: [count:bigint]                                      
        │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}     
        │   count := count("count_0")                                   
        └─ LocalExchange[partitioning = SINGLE]                         
           │   Layout: [count_0:bigint]                                 
           │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}  
           └─ RemoteSource[sourceFragmentIds = [1]]                     
                  Layout: [count_0:bigint]                              
                  Estimates:                                            
                                                                        
 Fragment 1 [SOURCE]                                                    
     Output layout: [count_0]                                           
     Output partitioning: SINGLE []                                     
     Aggregate[type = PARTIAL]                                          
     │   Layout: [count_0:bigint]                                       
     │   Estimates:                                                     
     │   count_0 := count(*)                                            
     └─ TableScan[table = tpch:tiny:nation]                             
            Layout: []                                                  
            Estimates: {rows: 25 (0B), cpu: 0, memory: 0B, network: 0B} 
                                                                        
                                                                        
(1 row)

@arhimondr arhimondr requested a review from findepi July 20, 2022 00:37
@arhimondr
Copy link
Contributor Author

The Transition query to RUNNING state when at least one task is running commit is needed as previously the bug was not visible, as there was always a stage with an exactly a single task

@@ -1042,6 +1040,9 @@ public void schedule()
ImmutableMultimap.of());
stageExecution.schedulingComplete();
remoteTask.ifPresent(task -> coordinatorTaskManager.addSourceTaskFailureListener(task.getTaskId(), failureReporter));
if (queryStateMachine.getQueryState() == STARTING && remoteTask.isPresent()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why just STARTING?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order of states is STARTING -> RUNNING -> FINSHING -> FINISHED | FAILED. If it's already in a later stage there's no need to transition

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically you do not need if (queryStateMachine.getQueryState() == STARTING) at all as queryStateMachine.transitionToRunning does the check (it does queryState.setIf(RUNNING, currentState -> currentState.ordinal() < RUNNING.ordinal());)

Copy link
Member

@losipiuk losipiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

second commit ok. I need some explanation on the first one.

@@ -1577,6 +1575,10 @@ public void schedule()
ScheduleResult result = stageSchedulers.get(stageExecution.getStageId())
.schedule();

if (stateMachine.getState() == DistributedStagesSchedulerState.PLANNED && stageExecution.getAllTasks().size() > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here: if (stateMachine.getState() == DistributedStagesSchedulerState.PLANNED is not needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to avoid calling stageExecution.getAllTasks() every time as it has to acquire a list of tasks under a lock and made the similar code in the coordinator scheduler consistent with this. Likely it won't be a problem, but at the same time I wonder if it introduces enough confusion to justify extra CPU cycles?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Keep it then

@arhimondr arhimondr merged commit ea28b6a into trinodb:master Jul 20, 2022
@arhimondr arhimondr deleted the disable-force-single-node-output branch July 20, 2022 20:39
@github-actions github-actions bot added this to the 391 milestone Jul 20, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

None yet

4 participants