Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable filter pushdown for Parquet #17161

Merged
merged 2 commits into from
Jan 12, 2022
Merged

Conversation

yingsu00
Copy link
Contributor

@yingsu00 yingsu00 commented Jan 6, 2022

This PR enables hive.pushdown_filter_enabled property for Parquet, and guard it with a new config/session property.
This PR is the preparation for Aria Parquet and Velox Parquet reader work.

== NO RELEASE NOTE ==

This commit introduces parquet_pushdown_filter_enabled session property
and parquet.pushdown-filter-enabled for the hive connector.
Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yingsu00 Thank you for enabling filter pushdown for Velox.

It is only planner change and doesn't have any effect to query execution.

Is this the case? The planner would push down filter into table scan and remove the filter node above it. If the reader is not going to process the filter, the query results will be wrong. Shouldn't we add a check in the reader to fail fast if there is a pushed down filter that it cannot yet process?

It would be nice to add a test.

@@ -2636,7 +2638,7 @@ private boolean isPushdownFilterEnabled(ConnectorSession session, ConnectorTable
boolean pushdownFilterEnabled = HiveSessionProperties.isPushdownFilterEnabled(session);
if (pushdownFilterEnabled) {
HiveStorageFormat hiveStorageFormat = getHiveStorageFormat(getTableMetadata(session, tableHandle).getProperties());
if (hiveStorageFormat == ORC || hiveStorageFormat == DWRF) {
if (hiveStorageFormat == ORC || hiveStorageFormat == DWRF || hiveStorageFormat == PARQUET && isParquetPushdownFilterEnabled(session)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you missing parentheses?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&& is higher priority than || therefore this line is equal to

if (hiveStorageFormat == ORC || hiveStorageFormat == DWRF || (hiveStorageFormat == PARQUET && isParquetPushdownFilterEnabled(session)))

I think this is correct, since PARQUET selective readers are not implemented yet, and if hive.pushdown_filter_enabled is set true in order to run ORC queries fast, Parquet queries would fail. Therefore we need a second session property on Parquet and enables the pushdown only when both conditions are met.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. This is very subtle and hard to read. I think adding parenthesis will be clearer.

@@ -522,7 +523,7 @@ private boolean isPushdownFilterSupported(ConnectorSession session, TableHandle
boolean pushdownFilterEnabled = HiveSessionProperties.isPushdownFilterEnabled(session);
if (pushdownFilterEnabled) {
HiveStorageFormat hiveStorageFormat = getHiveStorageFormat(getMetadata(tableHandle).getTableMetadata(session, tableHandle.getConnectorHandle()).getProperties());
if (hiveStorageFormat == HiveStorageFormat.ORC || hiveStorageFormat == HiveStorageFormat.DWRF) {
if (hiveStorageFormat == HiveStorageFormat.ORC || hiveStorageFormat == HiveStorageFormat.DWRF || hiveStorageFormat == HiveStorageFormat.PARQUET && isParquetPushdownFilterEnabled(session)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing parentheses?

Copy link
Contributor Author

@yingsu00 yingsu00 Jan 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above

@mbasmanova
Copy link
Contributor

CC: @majetideepak

@majetideepak
Copy link
Collaborator

If we are adding a check at the Java ParquetReader, we can probably avoid adding hive.parquet.pushdown-filter-enabled and hive.parquet_pushdown_filter_enabled now?

@yingsu00
Copy link
Contributor Author

yingsu00 commented Jan 8, 2022

If we are adding a check at the Java ParquetReader, we can probably avoid adding hive.parquet.pushdown-filter-enabled and hive.parquet_pushdown_filter_enabled now?

Unfortunately I don't think so. Users may run ORC queries and Parquet queries at the same time, and they may turn on hive.pushdown-filter-enabled to get better performance on ORC. If it's turned on and there is no additional guard for Parquet, all Parquet queries would fail.

@yingsu00
Copy link
Contributor Author

yingsu00 commented Jan 8, 2022

THanks @mbasmanova for reviewing the PR.

Is this the case? The planner would push down filter into table scan and remove the filter node above it. If the reader is not going to process the filter, the query results will be wrong. Shouldn't we add a check in the reader to fail fast if there is a pushed down filter that it cannot yet process?

My intention was that default value of hive.parquet.pushdown_filter_enabled shall remain false, but you were right, when it was turned on, we should fail the query early. I updated the PR and fail the query with NON_SUPPORTED exception in the newly introduced empty ParquetSelectivePageSourceFactory.

It would be nice to add a test.

testParquetSelectivePageSourceFails was added to TestHiveIntegrationSmokeTest.java

@yingsu00 yingsu00 force-pushed the pushdown branch 3 times, most recently from d67bd1f to 6c66c0e Compare January 9, 2022 00:49
Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yingsu00 Overall looks good. Minor comments below.

@@ -5424,6 +5424,37 @@ public void testPartialAggregatePushdownParquet()
assertFalse(getQueryRunner().tableExists(session, "test_parquet_table"));
}

@Test
public void testParquetSelectivePageSourceFails()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there no Parquet-specific test? Do you plan to add one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mbasmanova I'm a bit confused. This test is parquet-specific test as the table was created in Parquet format:

assertUpdate("CREATE TABLE test_parquet_filter_pushdoown (a BIGINT, b BOOLEAN) WITH (format = 'parquet')");

Could you please explain what kind of test you refer to? Did you mean adding tests on ORC or DWRF as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestHiveIntegrationSmokeTest.java is a collection of basic tests for the Hive connector. It cannot possible hold all the tests for the connector. A good practice is to create separate test suites for different sets of functionality. Hence, I'd expect that Parquet-specific tests would be placed in a test suite that's specific to Parquet. Hope that makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mbasmanova thanks for explaining. The existing parquet specific tests like TestParquetDistributedQueries, TestParquetReader are all integrated tests with many queries. It doesn't seem necessary if we overwrite the tests with the new config property and expect every query to fail. Other tests are targeted for very specific functionalities like definition level decoding, compression, etc. and I couldn't find any existing ones to fit this test in. We will add more Parquet tests later if the Aria Parquet gets into place.

@mbasmanova
Copy link
Contributor

  • Enable filter pushdown for Parquet. The filter pushdown can be set by hive.parquet.pushdown-filter-enabled config property and hive.parquet_pushdown_filter_enabled session property.

The release notes sound like filter pushdown is now available for Parquet which is not the case. I wonder if we can skip RN for this PR altogether.

This commit enables parquet filter pushdown if the
hive.parquet_pushdown_filter_enabled session property is enabled.
Queries on Parquet tables would fail early with NOT_SUPPORTED
exception for now, since the selective Parquet page source is not
supported yet.
@yingsu00
Copy link
Contributor Author

The release notes sound like filter pushdown is now available for Parquet which is not the case. I wonder if we can skip RN for this PR altogether.

@mbasmanova I was wondering the same, and I removed the release note for now.
Thanks for reviewing! I updated the PR with your comments addressed, except for the Parquet-specific test one which needs clarification.

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yingsu00 Thank you for adding a session property to pushdown filter into Parquet reader on the planner side and unblocking corresponding work on Velox.

@yingsu00 yingsu00 merged commit 376552c into prestodb:master Jan 12, 2022
@swapsmagic
Copy link
Contributor

swapsmagic commented Jan 13, 2022

We are seeing test com.facebook.presto.prism.plugin.TestPrismIntegrationSmokeTest.testParquetSelectivePageSourceFails failing in our jenkins build with following error:

Error Message

Unpartitioned Hive tables are immutable
Stacktrace
      java.lang.RuntimeException: Unpartitioned Hive tables are immutable
	at com.facebook.presto.tests.AbstractTestingPrestoClient.execute(AbstractTestingPrestoClient.java:124)
	at com.facebook.presto.tests.DistributedQueryRunner.execute(DistributedQueryRunner.java:601)
	at com.facebook.presto.tests.DistributedQueryRunner.execute(DistributedQueryRunner.java:569)
	at com.facebook.presto.tests.QueryAssertions.assertUpdate(QueryAssertions.java:72)
	at com.facebook.presto.tests.AbstractTestQueryFramework.assertUpdate(AbstractTestQueryFramework.java:210)
	at com.facebook.presto.hive.TestHiveIntegrationSmokeTest.testParquetSelectivePageSourceFails(TestHiveIntegrationSmokeTest.java:5431)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:104)
	at org.testng.internal.Invoker.invokeMethod(Invoker.java:645)
	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:851)
	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1177)
	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:129)
	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:112)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.facebook.presto.spi.PrestoException: Unpartitioned Hive tables are immutable
	at com.facebook.presto.hive.HiveWriterFactory.getWriterParametersForExistingUnpartitionedTable(HiveWriterFactory.java:486)
	at com.facebook.presto.hive.HiveWriterFactory.getWriterParameters(HiveWriterFactory.java:439)
	at com.facebook.presto.hive.HiveWriterFactory.createWriter(HiveWriterFactory.java:343)
	at com.facebook.presto.hive.HivePageSink.getWriterIndexes(HivePageSink.java:491)
	at com.facebook.presto.hive.HivePageSink.writePage(HivePageSink.java:360)
	at com.facebook.presto.hive.HivePageSink.doAppend(HivePageSink.java:355)
	at com.facebook.presto.hive.HivePageSink.lambda$appendPage$4(HivePageSink.java:341)
	at com.facebook.presto.hive.authentication.HdfsAuthentication.lambda$doAs$0(HdfsAuthentication.java:24)
	at com.facebook.presto.hive.authentication.NoHdfsAuthentication.doAs(NoHdfsAuthentication.java:23)
	at com.facebook.presto.hive.authentication.HdfsAuthentication.doAs(HdfsAuthentication.java:23)
	at com.facebook.presto.hive.HdfsEnvironment.doAs(HdfsEnvironment.java:86)
	at com.facebook.presto.hive.HivePageSink.appendPage(HivePageSink.java:341)
	at com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSink.appendPage(ClassLoaderSafeConnectorPageSink.java:66)
	at com.facebook.presto.operator.TableWriterOperator.addInput(TableWriterOperator.java:338)
	at com.facebook.presto.operator.Driver.processInternal(Driver.java:434)
	at com.facebook.presto.operator.Driver.lambda$processFor$9(Driver.java:307)
	at com.facebook.presto.operator.Driver.tryWithLock(Driver.java:728)
	at com.facebook.presto.operator.Driver.processFor(Driver.java:300)
	at com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1079)
	at com.facebook.presto.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:162)
	at com.facebook.presto.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:599)
	at com.facebook.presto.$gen.Presto_0_269_SNAPSHOT_376552c__testversion____20220113_180836_3.run(Unknown Source)
	... 3 more

@yingsu00
Copy link
Contributor Author

yingsu00 commented Jan 13, 2022

@swapsmagic Is this error persistent? It ran fine on my laptop and all tests in CI passed before I merged the PR. I wonder why other tests don't fail e.g. testPartialAggregatePushdownORC and testPartialAggregatePushdownParquet are both creating unpartitioned tables and the insertions are good.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants