Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove eliminating sort optimization #19387

Merged
merged 1 commit into from Oct 19, 2023

Conversation

takezoe
Copy link
Member

@takezoe takezoe commented Oct 12, 2023

Description

ORDER BY doesn't work properly in the following case because optimizer eliminates Sort before LocalExchange is added.

trino> SELECT * FROM (
    ->   SELECT l_quantity, row_number() OVER (ORDER BY l_quantity) as row_number 
    ->   FROM tpch.sf1.lineitem
    -> ) WHERE mod(row_number, 1000000) = 0 ORDER BY l_quantity;
 l_quantity | row_number
------------+------------
      26.00 |    3000000
      50.00 |    6000000
      17.00 |    2000000
       9.00 |    1000000
      42.00 |    5000000
      34.00 |    4000000
(6 rows)

Additional context and related issues

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

# Section
* Fix some things. ({issue}`issuenumber`)

Fixes #19399

@findepi
Copy link
Member

findepi commented Oct 13, 2023

@takezoe good find!

i created an issue based on this PR description, for visibility
#19399

ActualProperties
.builderFrom(child.getProperties())
// Exclude order sensitive properties
.local(child.getProperties().getLocalProperties().stream().filter(p -> !p.isOrderSensitive()).toList())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filter should not exclude any properties. I think the problem is due to an local exchange that's being added after WindowNode, which messes up the ordering guarantees on a global level, even if it preserves the ordering within the stream:

 Fragment 0 [SINGLE]
     Output layout: [quantity, row_number_0]
     Output partitioning: SINGLE []
     Output[columnNames = [quantity, row_number]]
     │   Layout: [quantity:double, row_number_0:bigint]
     │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
     │   row_number := row_number_0
     └─ Filter[filterPredicate = (mod("row_number_0", BIGINT '1000000') = BIGINT '0')]
        │   Layout: [quantity:double, row_number_0:bigint]
        │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
        └─ LocalExchange[partitioning = ROUND_ROBIN]         <<<<<<<<<<<<<<<
           │   Layout: [quantity:double, row_number_0:bigint]
           │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
           └─ Window[orderBy = [quantity ASC NULLS LAST]]
              │   Layout: [quantity:double, row_number_0:bigint]
              │   row_number_0 := row_number() RANGE UNBOUNDED_PRECEDING CURRENT_ROW
              └─ LocalExchange[partitioning = SINGLE]
                 │   Layout: [quantity:double]
                 │   Estimates: {rows: 6001215 (51.51MB), cpu: 0, memory: 0B, network: 0B}
                 └─ RemoteSource[sourceFragmentIds = [1]]
                        Layout: [quantity:double]

For example, here's another query that has the same issue and does not involve any filters:

SELECT row_number * 10 FROM (
    SELECT quantity, row_number() OVER (ORDER BY quantity) as row_number
    FROM tpch.sf1.lineitem
)
ORDER BY quantity;
 Fragment 0 [SINGLE]
     Output layout: [expr]
     Output partitioning: SINGLE []
     Output[columnNames = [_col0]]
     │   Layout: [expr:bigint]
     │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
     │   _col0 := expr
     └─ Project[]
        │   Layout: [expr:bigint]
        │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
        │   expr := ("row_number_0" * BIGINT '10')
        └─ LocalExchange[partitioning = ROUND_ROBIN]       <<<<<<<<<<<<<<<
           │   Layout: [quantity:double, row_number_0:bigint]
           │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
           └─ Window[orderBy = [quantity ASC NULLS LAST]]
              │   Layout: [quantity:double, row_number_0:bigint]
              │   row_number_0 := row_number() RANGE UNBOUNDED_PRECEDING CURRENT_ROW
              └─ LocalExchange[partitioning = SINGLE]
                 │   Layout: [quantity:double]
                 │   Estimates: {rows: 6001215 (51.51MB), cpu: 0, memory: 0B, network: 0B}
                 └─ RemoteSource[sourceFragmentIds = [1]]
                        Layout: [quantity:double]

I didn't look too closely, but I think it may be being caused by AddExchanges.visitSort removing the SortNode, and then AddLocalExchanges (incorrectly) adding a local exchange after the WindowNode.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I didn't understand the issue properly. I will try to revise the fix.

Copy link
Member Author

@takezoe takezoe Oct 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case,

SELECT * FROM (
  SELECT l_quantity, row_number() OVER (ORDER BY l_quantity) as row_number 
  FROM tpch.sf1.lineitem
) WHERE mod(row_number, 1000000) = 0 ORDER BY l_quantity;

FilterNode requires unpartitioned parallel streams so LocalExchange is added here:

Optional<List<Symbol>> requiredPartitionColumns = requiredProperties.getPartitioningColumns();
if (requiredPartitionColumns.isEmpty()) {
// unpartitioned parallel streams required
ExchangeNode exchangeNode = partitionedExchange(
idAllocator.getNextId(),
LOCAL,
planWithProperties.getNode(),
new PartitioningScheme(Partitioning.create(FIXED_ARBITRARY_DISTRIBUTION, ImmutableList.of()), planWithProperties.getNode().getOutputSymbols()));
return deriveProperties(exchangeNode, planWithProperties.getProperties());
}

In this case,

SELECT row_number * 10 FROM (
    SELECT quantity, row_number() OVER (ORDER BY quantity) as row_number
    FROM tpch.sf1.lineitem
)
ORDER BY quantity;

AddExchanges.visitProject() adds LocalExchange here:

return planAndEnforceChildren(
node,
parentPreferences.withoutPreference().withDefaultParallelism(session),
parentPreferences.withDefaultParallelism(session));

I wonder if there are other cases and how we can fix all of them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look at AddExchanges.visitSort(). It's doing the following:

 if (child.getProperties().isSingleNode()) {
     // current plan so far is single node, so local properties are effectively global properties
     // skip the SortNode if the local properties guarantee ordering on Sort keys
     // TODO: This should be extracted as a separate optimizer once the planner is able to reason about the ordering of each operator
     List<LocalProperty<Symbol>> desiredProperties = node.getOrderingScheme().toLocalProperties();

     if (LocalProperties.match(child.getProperties().getLocalProperties(), desiredProperties).stream()
             .noneMatch(Optional::isPresent)) {
         return child;
     }
 }

This bit of code is making the assumption that it's safe to remove the Sort because there are no exchanges yet. However, it's forgetting the fact that the data needs to be sorted, so later when the exchange is added this expectation is broken.

I think the simplest solution is to get rid of that block of code altogether, as it's based on incorrect assumptions -- that the plan will continue to be "single node" after the SortNode is removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. And this optimization needs to be applied after all exchanges are added if we would keep it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, either that or the plan needs to retain information about required properties, but that’s a much larger and invasive change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, need to do almost same thing as AddExchanges but only for eliminating duplicated sort...

For now, I just removed sort elimination from AddExchanges.

@takezoe takezoe changed the title Do not eliminate sort if filter exists before sort Remove eliminating sort optimization as it could produce wrongly ordered result Oct 16, 2023
Comment on lines 58 to 71
// @Test
// public void testEliminateSorts()
// {
// @Language("SQL") String sql = "SELECT quantity, row_number() OVER (ORDER BY quantity) FROM lineitem ORDER BY quantity";
//
// PlanMatchPattern pattern =
// output(
// window(windowMatcherBuilder -> windowMatcherBuilder
// .specification(windowSpec)
// .addFunction(functionCall("row_number", Optional.empty(), ImmutableList.of())),
// anyTree(LINEITEM_TABLESCAN_Q)));
//
// assertUnitPlan(sql, pattern);
// }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this commented-out code. If and when we reintroduce that behavior, we can add a test as appropriate.

Copy link
Member Author

@takezoe takezoe Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I have forgotten to remove these lines after confirming the test passes. 🙇‍♂️

@martint
Copy link
Member

martint commented Oct 18, 2023

The commit message title is too long. Also, please add some details in the commit as to why we're removing it. You can use my comment from above:

This bit of code is making the assumption that it's safe to remove the Sort because there are no exchanges yet. However, it's forgetting the fact that the data needs to be sorted, so later when the exchange is added this expectation is broken.

@takezoe takezoe changed the title Remove eliminating sort optimization as it could produce wrongly ordered result Remove eliminating sort optimization Oct 19, 2023
This bit of code is making the assumption that it's safe to remove the Sort because there are no exchanges yet. However, it's forgetting the fact that the data needs to be sorted, so later when the exchange is added this expectation is broken.
@martint martint merged commit d9fbf8a into trinodb:master Oct 19, 2023
89 checks passed
@github-actions github-actions bot added this to the 430 milestone Oct 19, 2023
@findepi
Copy link
Member

findepi commented Oct 23, 2023

@takezoe thanks for fixing this.

it's awesome that the PR is updating the tests. From what I can see this is only test removal.
Would it be possible to add a regression test for this? or is this not testable?

Perhaps a test query like in the PR description could work.

@takezoe takezoe deleted the not-eliminate-sort branch October 23, 2023 15:51
@takezoe
Copy link
Member Author

takezoe commented Oct 23, 2023

@findepi Thank you for suggestion! Actually, there was a test case in the original version of my PR but seems that I deleted it accidentally during revising this PR: 710ec45

I will send another PR to resurrect it.

@takezoe
Copy link
Member Author

takezoe commented Oct 23, 2023

Created PR for regression test: #19496

@findepi
Copy link
Member

findepi commented Oct 24, 2023

@takezoe thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

Unordered results returned when query has window function with same order by
4 participants