Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit and topn pushdown for Phoenix. #7490

Merged
merged 2 commits into from
Apr 8, 2021
Merged

Conversation

lhofhansl
Copy link
Member

@lhofhansl lhofhansl commented Apr 2, 2021

This adds topn pushdown for the Phoenix connector.
It turns out limit pushdown never worked, this PR fixes that as well.

The current code worked by "luck" only. When calculating the splits, the PhoenixConnector also generates the relevant HBase scans to execute as part of a split. However, those scans are created based on the original query, not the rewritten one, and so the scans would miss some of the annotation required by the rewritten query. It happened to work, but the limit was never applied.
When I first pushed in topn the results were non-sensical since the scans and the expected results disagreed on the shape. The change now uses the same code to generate the split-scans and the expected results during execution.

Only made the phoenix5 change, if the tests pass and we agree with the implementation I'll make the same change on Phoenix.

@vincentpoon @findepi

@findepi
Copy link
Member

findepi commented Apr 2, 2021

It turns out limit pushdown never worked, this PR fixes that as well.

Why wasn't it caught by io.trino.testing.AbstractTestQueries#testLimit as run with TestPhoenixConnectorTest?

@lhofhansl
Copy link
Member Author

I assume because the result was functionally correct, the limit would not be applied at the Phoenix level, but Trino would still apply it (isLimitGuaranteed returns false for the PhoenixConnector)

@hashhar hashhar self-requested a review April 2, 2021 18:35
@lhofhansl
Copy link
Member Author

lhofhansl commented Apr 2, 2021

Huh? Looks like it is trying to allocate over 390GB of memory in Phoenix?!

TPCH:ORDERS,\x06\x00\x00\x00\x00\x00\x00\x00\x00,1617387855993.1e70bc7f8da6b4824febfd877f685e9c.: java.sql.SQLException: ERROR 999 (50M01): Unable to allocate enough memory. Requested memory of 397284474695 bytes is larger than global pool of 483183820 bytes.

2021-04-02T18:26:28.8464706Z at org.apache.phoenix.memory.GlobalMemoryManager.allocateBytes(GlobalMemoryManager.java:73) 2021-04-02T18:26:28.8552976Z at org.apache.phoenix.memory.GlobalMemoryManager.allocate(GlobalMemoryManager.java:98) 2021-04-02T18:26:28.8568572Z at org.apache.phoenix.memory.GlobalMemoryManager.allocate(GlobalMemoryManager.java:104) 2021-04-02T18:26:28.8571580Z at org.apache.phoenix.iterate.NonAggregateRegionScannerFactory.getTopNScanner(NonAggregateRegionScannerFactory.java:350) 2021-04-02T18:26:28.8575777Z at org.apache.phoenix.iterate.NonAggregateRegionScannerFactory.getRegionScanner(NonAggregateRegionScannerFactory.java:186) 2021-04-02T18:26:28.8703697Z at org.apache.phoenix.coprocessor.ScanRegionObserver.doPostScannerOpen(ScanRegionObserver.java:194) 2021-04-02T18:26:28.8706745Z at org.apache.phoenix.coprocessor.BaseScannerRegionObserver$RegionScannerHolder.overrideDelegate(BaseScannerRegionObserver.java:273) 2021-04-02T18:26:28.8708677Z ... 8 more

Is that a negative test?

Update: Phoenix estimates the needed memory as (limit + offset) * row size, it does not actually allocate that memory, it just guards again this.
While that can be improved in Phoenix, I do not think that is a reason not to push topn down.

@lhofhansl
Copy link
Member Author

lhofhansl commented Apr 2, 2021

It's testLimitMax:

    @Test
    public void testLimitMax()
    {
        // max int
        assertQuery("SELECT orderkey FROM orders LIMIT " + Integer.MAX_VALUE);
        assertQuery("SELECT orderkey FROM orders ORDER BY orderkey LIMIT " + Integer.MAX_VALUE);

        // max long; a connector may attempt a pushdown while remote system may not accept such high limit values
        assertQuery("SELECT nationkey FROM nation LIMIT " + Long.MAX_VALUE, "SELECT nationkey FROM nation");
        // Currently this is not supported but once it's supported, it should be tested with connectors as well
        assertQueryFails("SELECT nationkey FROM nation ORDER BY nationkey LIMIT " + Long.MAX_VALUE, "ORDER BY LIMIT > 2147483647 is not supported");
    }

Should I override that test with a comment in TestPhoenixConnectorTest? ... Or put a reasonable limit in the connector after which we do not attempt to push the topn down?
(I can cause the same problem directly in Phoenix with a large limit.)

@lhofhansl
Copy link
Member Author

See also https://issues.apache.org/jira/browse/PHOENIX-6436. I feel I do not want to fix this at the Trino level, so I would just override the test.

@findepi
Copy link
Member

findepi commented Apr 2, 2021

See also https://issues.apache.org/jira/browse/PHOENIX-6436. I feel I do not want to fix this at the Trino level, so I would just override the test.

That's only hiding a problem... if a problem exists.

The ERROR 999 (50M01): Unable to allocate enough memory. Requested memory of 397284474695 bytes is larger than global pool of 483183820 bytes. is an unconditional exception, or something that we can prevent with some kind of a hint, toggle, session config (on Phoenix side), or whatever?

Otherwise, what's the safe LIMIT or TopN value that can be pushed down safety into Phoenix?

@lhofhansl
Copy link
Member Author

Otherwise, what's the safe LIMIT or TopN value that can be pushed down safety into Phoenix?

That's the "thing"... It depends on the H/W. The highest that value can set to is 100% of the available heap (again, it does not actually allocate that RAM, it just guards against abuse with a memory chunk manager, but it will lock other tasks that also try to reserve memory chunks for safety). Default is 15% of the heap.

Phoenix is not very smart about this. A query might be leading to 1000's of scans, each of which is handed the same overall limit, with a final pass. To make that test pass the limit is about 2.000.000.

We could avoid pushing down topn beyond that limit and let Trino do the work in that case, but that's not right either.

I tried checking that in supportsTopN(...), but when that method is called the passed JdbcTableHandle does not yet have the limit applied to it. So I'd have to change the TopNFunction itself.

something that we can prevent with some kind of a hint, toggle, session config (on Phoenix side), or whatever?

Unfortunately the logic is current hardcoded in Phoenix. This is a problem in Phoenix... Still not convinced that we want to solve that in Trino.

@lhofhansl
Copy link
Member Author

lhofhansl commented Apr 2, 2021

It's not hard to put a 1m or 2m max on the topN limit and have Trino not push it into Phoenix beyond that.
We can certainly do that and then remove that once it is fixed in Phoenix. That might be the best compromise.

Assuming you run the HBase region servers with 32GB of heap (that's common). The default 15% give a maximum allocation of 4.8GB.
A limit of 1.000.000 would then allow row sizes of about 5k before Phoenix runs into the memory chunk limit.
2.000.000 would trigger this with row size about 2k.

Either is good enough, IMHO.

@findepi Let me know whether you want to proceed in that direction.

One more update: If there is an index that matches the sort order there is no limit in Phoenix, since it just streams out the result in that case, but I cannot tell if this is so.

@lhofhansl
Copy link
Member Author

Latest change does that. Maximum limit hardcoded to 2m. Could make it configurable, but that seems overly complicated to understand for a user.

Still not sure whether we should not just let Phoenix decide whether it can run the query or not.

@lhofhansl
Copy link
Member Author

lhofhansl commented Apr 3, 2021

Tests pass. But I don't like this very much. Would rather have some queries fail outright and get streaming of any size when there's a proper index (in fact that way I can get ORDER BY push down by just passing a very large LIMIT)

The user can always disable topn push-down via the topn_pushdown_enabled session variable.

Update: I provided a fix for Phoenix that will presumably be in 5.1.2.

@lhofhansl
Copy link
Member Author

Apologies for the barrage of comments here... Even with the size-accounting fixed in Phoenix it turns out that Phoenix always involves a client merge-sort on the client, i.e. the Trino worker, and for larger limits Trino's own method is just faster).

So I think the patch as is is good. For small values we get the benefit of Phoenix doing the work, and for larger values we defer to Trino. The right threshold depends a bit, but from my testing it is somewhere between 1m and 10m rows, so the 2m in this patch are good enough heuristic.

I'll stop here and wait for comment :)

@lhofhansl
Copy link
Member Author

Friendly (ping)

@vincentpoon
Copy link
Member

vincentpoon commented Apr 5, 2021

@lhofhansl
Copy link
Member Author

lhofhansl commented Apr 5, 2021

@vincentpoon Thanks for taking a look.

Do we need to change the TestPhoenixConnectorTest to support the topN pushdown?

Oh that's weird. Is there another bug? The test clearly failed when it pushed down topN. I'll double check.
Update: Made that change, let's wait for a test run.

What's your opinion on only pushing topN when the limit <= 2.000.000. (Interestingly, when doing the client size merge sort - even when the sort order matches an index on the server is slower than letting Trino do from a certain size on.)

@lhofhansl
Copy link
Member Author

lhofhansl commented Apr 5, 2021

Looking at test failures... For the Phoenix connector isTopNLimitGuaranteed() return false, so the topN is indeed not fully pushed down. It looks like it the currently the only connector where this is the case.

What now? It seems I should revert that last change suggested by @vincentpoon .

This is the generated plan:

Output[orderkey]
 │   Layout: [orderkey:bigint]
 └─ TopN[10 by (orderkey ASC_NULLS_LAST)]
    │   Layout: [orderkey:bigint]
    └─ LocalExchange[SINGLE] ()
       │   Layout: [orderkey:bigint]
       └─ RemoteExchange[GATHER]
          │   Layout: [orderkey:bigint]
          └─ TopNPartial[10 by (orderkey ASC_NULLS_LAST)]
             │   Layout: [orderkey:bigint]
             └─ TableScan[phoenix:tpch.orders TPCH.ORDERS sortOrder=[io.trino.plugin.jdbc.JdbcSortItem@ca279a6b] limit=10 columns=[ORDERKEY:bigint:BIGINT]]
                    Layout: [orderkey:bigint]
                    orderkey := ORDERKEY:bigint:BIGINT

Hence, could also override the test and then match Output -> * -> TableScanNode -> SortNode, to verify that the sorting is pushed down.

@vincentpoon
Copy link
Member

Ok, wasn't aware that testTopNPushdown just tests if the topN is fully pushed down. So I think your original patch is fine.

What's your opinion on only pushing topN when the limit <= 2.000.000. (Interestingly, when doing the client size merge sort - even when the sort order matches an index on the server is slower than letting Trino do from a certain size on.)

Wondering how you're testing this. e.g. Is your Trino cluster larger than your HBase cluster? (more workers vs region servers).
Is your query/index covered? (is fetching the data col what's slowing things down?)
Otherwise, Trino's topN shouldn't be faster than Phoenix querying a covered index.
For the case without an index, you might be getting more parallelism in sorting if you have more Trino workers than region servers?

Copy link
Member

@hashhar hashhar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall % some nits about comments.

Are the buildSql changes necessary or are they a cleanup? Could be a separate commit.
EDIT: Nevermind, saw the PR description - should be a separate commit then "Fix limit pushdown".

@lhofhansl
Copy link
Member Author

Wondering how you're testing this. e.g. Is your Trino cluster larger than your HBase cluster? (more workers vs region servers).
Is your query/index covered? (is fetching the data col what's slowing things down?)

I isolated this on a single machine. Since the post op client merge happens on the Phoenix client (i.e. on the Trino worker) that seemed like a good way to test this.

When the limit is larger (and there are a of values to be passed) it kinda makes sense that Trino could be faster: Phoenix would have ship all the rows to the Phoenix client then do the merge, that seems to be fairly inefficient for large sets (past a few million), I'm also debugging that as I find time, but as is Trino seems faster.

When Phoenix can limit that work, most notably when there's an index with the right order and the set of small'ish, then Phoenix is (much) faster.

@lhofhansl
Copy link
Member Author

lhofhansl commented Apr 7, 2021

Whoops I messed something up in git. That looks different from my local history.

LIMIT push down did not work since the HBase scan are created during the
split phase based on the query it is transformed.
With this commit splits are create from the transformed query.
Copy link
Member

@hashhar hashhar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

Comment on lines 296 to 299
// TODO: Remove when this is improved in Phoenix
// Phoenix performs a client side merge sort.
// For large sets this is slower than passing
// the unsorted data to Trino and sort it there.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect this comment to talk about memory implications of changing (or not having) the limit, or a link to Phoenix issue describing that.

Also, if the Trino-side sorting is faster, I am not sure the TODO is actionable. If it is not, please just remove the "TODO" and please retain the rest of the info.

Comment on lines 297 to 300
// TODO: Remove when this is improved in Phoenix
// Phoenix performs a client side merge sort.
// For large sets this is slower than passing
// the unsorted data to Trino and sort it there.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

obviously same here

@lhofhansl
Copy link
Member Author

lhofhansl commented Apr 7, 2021

@findepi The situation is not that simple in Phoenix unfortunately. With Phoenix 5.1.1 (current version) queries with large LIMITs will just fail due to the overestimation.

Upon running the Phoenix side through the profiler I found that Phoenix is only slower when using a local index that does not cover all the column requested in the query
Note: In Phoenix you can request more columns to be copied into the index, so that some queries do not need to go back to the main table at all. Local indexes allow queries that cannot be answered from just the index to proceed, in return for a somewhat expensive local merge, so that's recommended only for selective queries (which topN queries for large sets are not).

In other cases sorting in Phoenix is faster. Apologies for the confusion - I had to get clarity myself, too.
The other cases are: No index, indexes that cover the query (both local and global). The trade-offs of using uncovered indexes in Phoenix are known, so no special treatment needed.

So I'll revise my statement: For Phoenix 5.1.1 we need put a maximum on the LIMIT as this PR does. Once Phoenix 5.1.2 is out, we should push all topN to Phoenix.

Again, my apologies for the back-and-forth. Should I update the comment in the code?
Edit: Updated the comment (again)

@lhofhansl
Copy link
Member Author

lhofhansl commented Apr 8, 2021

BTW. we obviously get much better performance when we can return true from isTopNLimitGuaranteed.
Since Trino does not allow ORDER BY LIMIT > Integer.MAX_VALUE anyway, once Phoenix 5.1.2 is released, that method can return true.

I'll file a reminder issue for when Phoenix 5.1.2 is released.

Copy link
Member

@vincentpoon vincentpoon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@findepi
Copy link
Member

findepi commented Apr 8, 2021

i rerun CI since Phoenix tests failed to start.

columns,
ImmutableMap.of(),
split);
return new QueryBuilder(this).prepareStatement(session, connection, preparedQuery);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change mean we can return true from isLimitGuaranteed in Phoenix?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. If you pass Phoenix a limit > Integer.MAX_VALUE it will still completely ignore it, so isLimitGuaranteed has to return false.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but that's JdbcMetadata responsibility, not PhoenixClient, right? PhoenixClient doesn't ignore anything.

@findepi findepi merged commit a7d0f54 into trinodb:master Apr 8, 2021
@findepi findepi added this to the 355 milestone Apr 8, 2021
@findepi findepi added the enhancement New feature or request label Apr 8, 2021
@findepi
Copy link
Member

findepi commented Apr 8, 2021

depending on answer #7490 (comment), please follow up.

@findepi findepi mentioned this pull request Apr 8, 2021
7 tasks
@lhofhansl
Copy link
Member Author

Thanks @findepi @vincentpoon @hashhar . I'll followup when Phoenix 5.1.2 is out.

@whutpencil
Copy link
Contributor

@lhofhansl I have a simple query as follows: select * from table_a where ROW = 'xxx' limit 1. The amount of data is 1.1T, but its overhead time is basically more than 2 seconds, while the time of direct query using Phoenix is about 0.1 seconds. Can it be improved after introducing this patch?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed enhancement New feature or request
Development

Successfully merging this pull request may close these issues.

None yet

5 participants