Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix skipping in diverse scalar stream readers #12721

Closed
wants to merge 21 commits into from

Conversation

oerling
Copy link

@oerling oerling commented Apr 24, 2019

Resets the toSkip variable after skipping to the next row in the input
qualifying set. The error does not manifest if all rows are selected.

Adds a test. Note that the test is not supported by H2 and requires comparing Presto with and without Aria scan.

mbasmanova and others added 16 commits April 17, 2019 14:04
- Extracted pushdown subscripts logic into its own rule, PushDownSubscripts,
from PruneUnreferencedOutputs.
- Replaced Subfield.PathElement with a hierarchy of classes: empty base class
PathElement + NestedField, IntegerSubscript, StringSubscript sub-classes.
NestedField represents subfield of a struct: c.a, IntegerSubscript - element
of an array or map with integer keys: c[2], StringSubscript - element of a
map with varchar keys c["a"].
Rename PushdownSubscripts rule into PushdownSubfields. Change its logic to create
project node on top of scan using new filter_by_subfield_paths function instead
of modifying column handles in table scan node via calls to
ColumnHandle.createSubfieldPruningColumnHandle.

Remove ColumnHandle.createSubfieldPruningColumnHandle API.
Add an optimizer rule to pushdown subfield pruning into
connectors. The new rule uses the new metadata API
pushdownSubfieldPruning.
OrcRecordReader.getNextPage tracks the average bytes per rows of
initial qualifying set. The batch size is adapted to stay within
expected soft limits. If a hard limit is exceeded an exception is
thrown and the streams are reset to the beginning of the current row
group. This behavior only works when the streams are checkpointed,
hence ValueStreamSources are not used even if the stripe consisted of
a single row group.

This replaces the StreamReader result truncation mechanism.
@oerling oerling requested a review from mbasmanova April 24, 2019 03:24
@mbasmanova mbasmanova added the aria Presto Aria performance improvements label Apr 24, 2019
@mbasmanova mbasmanova self-assigned this Apr 24, 2019
@mbasmanova mbasmanova changed the title Fix Slice dictionary reader with row group level dictionaries Fix skipping in diverse scalar stream readers Apr 24, 2019
@mbasmanova
Copy link
Contributor

@oerling I'm seeing a test failure:

[ERROR] Tests run: 5662, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 1,255.582 s <<< FAILURE! - in TestSuite
[ERROR] testRemoveRemoteSource(com.facebook.presto.operator.TestExchangeClient)  Time elapsed: 0.012 s  <<< FAILURE!
java.lang.AssertionError: expected [4] but found [2]
	at org.testng.Assert.fail(Assert.java:94)
	at org.testng.Assert.failNotEquals(Assert.java:513)
	at org.testng.Assert.assertEqualsImpl(Assert.java:135)
	at org.testng.Assert.assertEquals(Assert.java:116)
	at org.testng.Assert.assertEquals(Assert.java:389)
	at org.testng.Assert.assertEquals(Assert.java:399)
	at com.facebook.presto.operator.TestExchangeClient.assertPageEquals(TestExchangeClient.java:416)
	at com.facebook.presto.operator.TestExchangeClient.testRemoveRemoteSource(TestExchangeClient.java:379)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:104)
	at org.testng.internal.Invoker.invokeMethod(Invoker.java:645)
	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:851)
	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1177)
	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:129)
	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:112)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Does it work locally? I'm going to re-start the build in case this error is transient.

@mbasmanova
Copy link
Contributor

@elonazoulay Elon, would you take a look at this change? I feel that we should be able to build a unit test that catches this type of error.

@oerling
Copy link
Author

oerling commented Apr 24, 2019 via email

1. If results are returned from a Slice dictionary column, there must be a Page break if the underlying dictionary changes.

2. The in-dictionary flags were read twice and in a sequence that does not correspond to the input qualifying set.

Add consistent resize and initial size setting for reused and dynamically growing arrays.
@mbasmanova
Copy link
Contributor

@oerling

This is timing related flakiness.

Indeed. Looks like so. The test passed on retry.

Moves common functions between direct and dictionary long stream readers to a common superclass.
Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oerling

Adds a test. Note that the test is not supported by H2 and requires comparing Presto with and without Aria scan.

Aria scan cannot have a dependency on default (soon to be legacy) scan. Hence, we need to write tests some other way.

For this particular case, a unit test would be most appropriate, although H2 works just fine as well:

    private static final String LINEITEM_ARIA_H2_CTE = "WITH lineitem_aria AS (SELECT\n" +
            "    orderkey,\n" +
            "    partkey,\n" +
            "    suppkey,\n" +
            "    linenumber,\n" +
            "    quantity,\n" +
            "    extendedprice,\n" +
            "    shipmode,\n" +
            "    shipdate,\n" +
            "    comment,\n" +
            "    returnflag = 'R' AS is_returned,\n" +
            "    CAST(quantity + 1 AS REAL) AS float_quantity,\n" +
            "    CAST(discount AS decimal(5, 2)) AS short_decimal_discount,\n" +
            "    CAST(discount AS decimal(38, 2)) AS long_decimal_discount,\n" +
            "    CAST(CASEWHEN(shipmode = 'SHIP', 1, CASEWHEN(shipmode = 'REG AIR', 2, CASEWHEN(shipmode = 'AIR', 3, CASEWHEN(shipmode = 'FOB', 4, CASEWHEN(shipmode = 'MAIL', 5, CASEWHEN(shipmode = 'RAIL', 6, CASEWHEN(shipmode = 'TRUCK', 7, null))))))) AS tinyint) AS tinyint_shipmode,\n" +
            "    dateadd(SECOND, suppkey, cast(shipdate as timestamp)) AS timestamp_shipdate\n" +
            "FROM lineitem)\n";

        String query = "SELECT partkey, shipmode, comment, long_decimal_discount, short_decimal_discount, " +
                "is_returned, float_quantity, timestamp_shipdate, tinyint_shipmode " +
                "FROM lineitem_aria WHERE linenumber = 4";

        assertQuery(ariaSession(), query, LINEITEM_ARIA_H2_CTE + query);

The above test reproduces the issue as expected:

Caused by: com.facebook.presto.orc.OrcCorruptionException: Malformed ORC file. Reading BigInteger past EOF [file:/Users/mbasmanova/aria/test_data3/hive_data/tpch/lineitem_aria/20190314_055037_00008_zuqv6_afaded0a-76a8-435e-a41c-dd582fa33ca3]
	at com.facebook.presto.orc.stream.DecimalInputStream.skip(DecimalInputStream.java:165)
	at com.facebook.presto.orc.reader.DecimalStreamReader.scan(DecimalStreamReader.java:309)
	at com.facebook.presto.orc.ColumnGroupReader.advance(ColumnGroupReader.java:413)
	at com.facebook.presto.orc.OrcRecordReader.getNextPage(OrcRecordReader.java:828)
	at com.facebook.presto.hive.orc.OrcPageSource.getNextPage(OrcPageSource.java:150)
	... 13 more

For queries that need to access complex types (arrays, maps, structs), we could compare Hive with Tpch, e.g.

    private static final String LINEITEM_ARIA_SQL = "SELECT\n" +
            "    orderkey,\n" +
            "    partkey,\n" +
            "    suppkey,\n" +
            "    linenumber,\n" +
            "    quantity,\n" +
            "    extendedprice,\n" +
            "    shipmode,\n" +
            "    shipdate,\n" +
            "    comment,\n" +
            "    returnflag = 'R' AS is_returned,\n" +
            "    CAST(quantity + 1 AS REAL) AS float_quantity,\n" +
            "    CAST(discount AS decimal(5, 2)) AS short_decimal_discount,\n" +
            "    CAST(discount AS decimal(38, 2)) AS long_decimal_discount,\n" +
            "    CAST(array_position(array['SHIP','REG AIR','AIR','FOB','MAIL','RAIL','TRUCK'], shipmode) AS tinyint) AS tinyint_shipmode,\n" +
            "    date_add('second', suppkey, cast(shipdate as timestamp)) AS timestamp_shipdate,\n" +
            "    MAP(\n" +
            "        ARRAY[1, 2, 3],\n" +
            "        ARRAY[orderkey, partkey, suppkey]\n" +
            "    ) AS order_part_supp_map,\n" +
            "    ARRAY[ARRAY[orderkey, partkey, suppkey]] AS order_part_supp_array\n" +
            "FROM tpch.tiny.lineitem";

    private static final String LINEITEM_ARIA_CTE = format("WITH lineitem_aria AS (%s)\n", LINEITEM_ARIA_SQL);

        Session tpchSession = Session.builder(getQueryRunner().getDefaultSession())
                .setCatalog("tpch")
                .setSchema("tiny")
                .build();

        assertQuery(ariaSession(), query, tpchSession, LINEITEM_ARIA_CTE + query, "SELECT 1");

@oerling
Copy link
Author

oerling commented Apr 26, 2019 via email

@mbasmanova
Copy link
Contributor

@oerling Orri,

I just want a formulation that does not involve 50 line SQL statements as string literals.

The amount of hard-coded SQL is the same in either approach. We need to describe the dataset once anyway. Once it is described the test is just a single assert + query.

Evaluates filters and filter functions dependent on constant columns at setup time.
Supports filter functions on combinations of constant and non-constant columns.
Adds constant columns in OrcRecordReader.resultPage().
Fills missing fields of structs with nulls.
Prunes the initial qualifying set in OrcRecordReader with any non-deterministic constant filters, evaluating these for every row.

Adds a assertQuery signature for comparing Presto execution with
different session settings.
@oerling
Copy link
Author

oerling commented Apr 26, 2019 via email

Resets the toSkip variable after skipping to the next row in the input
qualifying set. The error does not manifest if all rows are selected.

Sets the expected output size according to the input qualifying set.

Adds a test. Note that the test is not supported by H2 and requires comparing Presto with and without Aria scan.
@@ -292,6 +294,10 @@ public void testFilters()

assertQuery(ariaSession(), "SELECT shipdate FROM lineitem_aria WHERE long_decimal_discount < decimal '0.3'",
"SELECT shipdate FROM lineitem where discount < 0.3");
// Select different scalars after a filter.
assertQuery(ariaSession(), "select partkey, shipmode, comment,\n" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assertQuery(ariaSession(), "select partkey, shipmode, comment,\n" +
assertQuery(ariaSession(), "SELECT partkey, shipmode, comment, " +

boolean anyConstantFilterFunctions = false;
ImmutableList.Builder<FilterFunction> nonDeterministicBuilder = ImmutableList.builder();
for (FilterFunction filter : filterFunctions) {
if (qualifyingSet == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we initialize this in the constructor?

public Page getNextPage()
throws IOException
{
if (constantFilterIsFalse) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if we could make explicit in the code the invariant that pushdownFilterAndProjection is always called before getNextPage.

@mbasmanova
Copy link
Contributor

Closing stale PR on behalf of @oerling.

@mbasmanova mbasmanova closed this Jul 10, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
aria Presto Aria performance improvements
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants