Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Characterize Hive ACID schema evolution #6316

Closed
djsstarburst opened this issue Dec 11, 2020 · 21 comments
Closed

Characterize Hive ACID schema evolution #6316

djsstarburst opened this issue Dec 11, 2020 · 21 comments

Comments

@djsstarburst
Copy link
Member

djsstarburst commented Dec 11, 2020

Executive Summary

#6070 and #6280 show that Trino queries of Hive ACID tables that have been schema evolved don't produce the same results as Hive itself does. PR #6479 changes Trino to by default match Hive's behavior on column rename/add/drop for formats ORC and Parquet.

PR #6479 does not address column type changes; that is left for a follow-on PR.

Kinds of Schema Evolution

There are multiple schema evolution cases to consider:

  • Renaming a column.
  • Changing a column type.
  • Adding a column.
  • Removing a column.

This ticket is opened to characterize Hive's behavior in response to schema evolution, and to recommend changes to Presto to match results from Hive.

Here is a summary of what I've learned running schema evolution tests against Hive 3.1.2-5 and Trino before the fixes in #6479.

Summary of Parquet Results Before #6479 Changes

Parquet Column Renaming

The results for non-partitioned Parquet tables are simple:

  • If parameter hive.parquet.use-column-names is false, Trino sees the old values of column data in data files created before the column was renamed. This does not match Hive behavior.
  • If parameter hive.parquet.use-column-names is true, Trino reads nulls from as column data from data files created before the column was renamed. This matches Hive behavior.

If parameter hive.partition-use-column-values is true, then hive.parquet.use-column-names must be true or an exception is thrown.

These rules are succinctly encoded in this table:

hive.parquet.use-column-names = false hive.parquet.use-column-names = true
hive.partition-use-column-values = true ERROR Sees nulls in renamed columns
hive.partition-use-column-values = false Sees old data column values Sees nulls in renamed columns

Adding or Dropping Parquet Columns

If Trino drops the last non-partition column in a populated Parquet table, and then adds it back with the same name, when Hive queries the table, the old data for the last column will be returned. Trino exactly matches Hive's behavior whether the parameter hive.parquet.use-column-names is true or false.

If Trino drops the last non-partition column in a populated Parquet table, and then adds it back with a different name, when Hive queries the table, the nulls will be returned for the value of the last column. Trino exactly matches Hive's behavior if the parameter hive.parquet.use-column-names is true. If the parameter hive.parquet.use-column-names is false, Trino sees the old values for the last column, which does not match Hive's behavior.

Summary of ORC Results Before #6479 Changes

These tests were run using file format ORC, though some used transactional tables and some used non-transactional tables.

The changes required to make Trino match the behavior we see in Hive have been made in PR #6479, which also contains all the tests that demonstrate the behavior of Hive and Trino.

ORC Column Renaming

By default, Trino ORC identifies columns exclusively by column order. If you insert rows, using either Trino or Hive, and then rename a column, using either Trino or Hive, a query of the old data using the new column name will always succeed in Hive. This is true for non-partition columns whether or not the table is transactional, and whether or not the table is partitioned. It's also true after a series of renames of the same non-partition column. Hive accomplishes this by matching the non-partition columns in files it reads based on column order and column type and not on the column name in the data file.

Like Parquet, ORC supports a pair of session config parameters that enable tracking of columns by name: hive.partition-use-column-values and hive.orc.use-column-names. Both default to false. This table specifies the behavior when these parameters have non-default values:

hive.orc.use-column-names = false hive.orc.use-column-names = true
hive.partition-use-column-values = true ERROR Sees nulls in renamed columns
hive.partition-use-column-values = false Sees old data column values Sees nulls in renamed columns

If parameter hive.partition-use-column-values is true, then hive.orc.use-column-names must be true or an exception is thrown. If hive.partition-use-column-values is false and hive.orc.use-column-names is true,

Renaming partition columns is not allowed. An attempt to rename a partition column results in this error from the Hive metastore: Renaming partition columns is not supported.

ORC Column Type Changes

Whether the table is transactional or non-transactional, Hive ALTER TABLE CHANGE COLUMN old_name new_name new_type will succeed only if the new_type is at least as wide as the old type. For example, a change from SHORT to INT is allowed; a change from INT to SHORT is not allowed, and the ALTER TABLE statement fails, as shown above.

Trino does not provide a way to change the column type, but Hive does. As new tests in #6479 show, if a column type is widened using Hive's ALTER TABLE REPLACE COLUMN old_name, new_name, new_type, a subsequent Trino query will fail, saying that the types are incompatible even though the Hive query will succeed.

Adding and Dropping ORC Columns

Whether the table is transactional or non-transactional, if Trino drops the last non-partition column in a populated table, and then adds it back, perhaps with a new name and a new, wider type, when Hive queries the table, the old data for the last column will be returned. With the #6479 changes, Trino exactly matches the Hive behavior.

Hive 3.1.2-5 does not support the syntax ALTER TABLE ADD COLUMN ... or ALTER TABLE DROP COLUMN .... Instead it has ALTER TABLE REPLACE COLUMNS (col1 col1_type, ...). However, dropping any column using the ALTER TABLE REPLACE COLUMNS... syntax fails in Hive with this error message: Replacing columns cannot drop columns for table default.test_test_hive_renames_false_NONE_4739xndjz4di. SerDe may be incompatible.. Looking at the Hive 3.1.2-5 codebase, this error will be raised if the table format is ORC and the number of columns in the replace list is less than the number of columns in the table.

Summary of AVRO Results

AVRO does not support parameter partition_use_column_names, and has no analog to parameters like orc_use_column_names. Tests show that AVRO tracks columns exclusively by column name. This means that column values in a data file written before a column rename will all be null after the rename.

@djsstarburst
Copy link
Member Author

The simplest case of schema evolution is column renaming. The last SELECT in the sequence below fails on Presto, because it gets a value of NULL for column3. Hive knows that column3 is the new name for column1, and gets the right value, 11:

            onPresto().executeQuery("CREATE TABLE foo (column1 INT, column2 BIGINT) WITH (transactional = true)");
            onPresto().executeQuery("INSERT INTO foo VALUES (11, 100)");
            onPresto().executeQuery("SELECT * FROM foo", row(11, 100L));
            onPresto().executeQuery("ALTER TABLE foo RENAME COLUMN column1 TO column3");
            onPresto().executeQuery("SELECT column3 FROM foo", row(11));

This test fails on Presto for transactional tables, but passes for non-transactional tables. HIve doesn't re-write data files when schema evolution takes place, and in the example above, the data files will refer only to "column1". So the schema history needed to correctly interpret column3 must be in the metastore itself. Looking over metastore, I see that the Schema object has a properties map. I'm guessing that's where the history information is kept.

@sjx782392329
Copy link

sjx782392329 commented Dec 21, 2020

What is the progress of this issue? @djsstarburst

@djsstarburst
Copy link
Member Author

djsstarburst commented Dec 22, 2020

Hi, @sjx782392329, I've returned this week to looking at this issue.

This test shows that Hive tracks column values across multiple renames, even though in this case Presto is doing the renames:

            onHive().executeQuery("CREATE TABLE foo (old_name STRING, age INT) STORED AS ORC TBLPROPERTIES ('transactional'='true'f)");
            onHive().executeQuery("INSERT INTO foo VALUES ('Katy', 57), ('Joe', 72)");
            verifySelectForPrestoAndHive("SELECT * FROM foo", "true", row("Katy", 57), row("Joe", 72));
            onPresto().executeQuery("ALTER TABLE foo RENAME COLUMN old_name TO new_name");
            log.info("This shows that hive survives a single rename");
            assertThat(onHive().executeQuery("SELECT age FROM foo WHERE new_name = 'Katy'"))
                    .containsOnly(row(57));
            onPresto().executeQuery("ALTER TABLE foo RENAME COLUMN new_name TO newer_name");
            log.info("This shows that hive survives a double rename");
            assertThat(onHive().executeQuery("SELECT age FROM foo WHERE newer_name = 'Katy'"))
                    .containsOnly(row(57));
            onPresto().executeQuery("ALTER TABLE foo RENAME COLUMN newer_name TO old_name");
            log.info("This shows that hive survives a rename back to the original name");
            assertThat(onHive().executeQuery("SELECT age FROM foo WHERE old_name = 'Katy'"))
                    .containsOnly(row(57));
            assertThat(onHive().executeQuery("SELECT * FROM foo")).containsOnly(row("Katy", 57), row("Joe", 72));

I believe I've found the Hive code that deals with column renames at ALTER TABLE RENAME COLUMN time. Hive calls method checkColTypeChangeCompatible at https://github.com/apache/hive/blob/branch-3.1/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java#L928.

There seem to be multiple places in the Hive code base where schema reconciliation could be happening when ORC ACID data files are read, but I'm unsure which places are actually used. I posted this to the ASF #hive Slack room:

Hello, I work on the open-source Presto Hive connector. I've observed that in Hive branch 3.1 after a column rename, Hive is able to query old data files using the new column name, in spite of the fact that the data file footers contain the old column name. We'd like to duplicate Hive's ability to follow column renames in the Presto Hive connector. However, I haven't been able to figure out exactly where in the 3.1 code base reconciliation between the footer schema and the metastore schema takes place. I see that when a column is renamed, Hive calls method checkColTypeChangeCompatible at https://github.com/apache/hive/blob/branch-3.1/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java#L928. But I haven't been able to figure out exactly where the corresponding schema adjustment is made when reading ORC ACID files. Can someone point out where that is happening? Is Hive using the JobConf properties SCHEMA_EVOLUTION_COLUMNS and SCHEMA_EVOLUTION_COLUMNS_TYPES to cause the two schemas to come into agreement? Thanks!

@sjx782392329
Copy link

sjx782392329 commented Dec 22, 2020

In my view, your test cannot cover my test case. My problem is change the field's type of Hive. For example type of field from integer to byte, and then turn byte to integer. Presto v344 cannot read it rightly, but presto v0.214 can. @djsstarburst
#6070

@djsstarburst
Copy link
Member Author

I captured traffic metastore traffic generated by a SELECT after renaming a column, and viewed it using WireShark. AFAICT that traffic contains no trace of the old column name after the rename:

select-after-rename.mcap.gz

@djsstarburst
Copy link
Member Author

djsstarburst commented Dec 22, 2020

RE: #6070

This test program shows that widening an integral column type preserves the data in Hive:

            onHive().executeQuery("CREATE TABLE foo (name STRING, age TINYINT) STORED AS ORC TBLPROPERTIES ('transactional'='true')");
            onHive().executeQuery("INSERT INTO foo VALUES ('Katy', 57), ('Joe', 72)");
            verifySelectForPrestoAndHive("SELECT * FROM foo", "true", row("Katy", 57), row("Joe", 72));
            onHive().executeQuery("ALTER TABLE foo CHANGE COLUMN age new_age SMALLINT");
            log.info("This shows that hive survives a single rename");
            assertThat(onHive().executeQuery("SELECT name FROM foo WHERE new_age = 57"))
                    .containsOnly(row("Katy"));
            onHive().executeQuery("ALTER TABLE foo CHANGE COLUMN new_age newer_age INT");
            log.info("This shows that hive survives a double rename and type change");
            assertThat(onHive().executeQuery("SELECT newer_age FROM foo WHERE name = 'Katy'"))
                    .containsOnly(row(57));
            onHive().executeQuery("ALTER TABLE foo CHANGE COLUMN newer_age age BIGINT");
            log.info("This shows that hive allows rename back to the original name and if the type isn't narrower than the current column type");
            assertThat(onHive().executeQuery("SELECT age FROM foo WHERE name = 'Katy'"))
                    .containsOnly(row(57));
            assertThat(onHive().executeQuery("SELECT * FROM foo")).containsOnly(row("Katy", 57), row("Joe", 72));

However, in this test, the ALTER TABLE fails, because the new column type, TINYINT, is narrower than the original type, INT:

            onHive().executeQuery("CREATE TABLE foo (name STRING, age INT) STORED AS ORC TBLPROPERTIES ('transactional'='true')");
            onHive().executeQuery("INSERT INTO foo VALUES ('Katy', 57), ('Joe', 72)");
            verifySelectForPrestoAndHive("SELECT * FROM foo", "true", row("Katy", 57), row("Joe", 72));
            log.info("This Hive ALTER TABLE fails because the type has been narrowed");
            onHive().executeQuery("ALTER TABLE foo CHANGE COLUMN age new_age TINYINT");

@sjx782392329
Copy link

I test this case in my local machine. I can change field type from integer to tinyint by executing this ALTER TABLE student CHANGE COLUMN age age TINYINT. @djsstarburst

@djsstarburst
Copy link
Member Author

I test this case in my local machine. I can change field type from integer to tinyint by executing this ALTER TABLE student CHANGE COLUMN age age TINYINT. @djsstarburst

When I run the test changing from column age from INT to TINYINT, I get this backtrace:

tests               | 2020-12-24 22:03:08 INFO: FAILURE     /    io.prestosql.tests.hive.TestHiveTransactionalTable.testHiveColumnTypeChanges (Groups: hive_transactional) took 24.5 seconds
tests               | 2020-12-24 22:03:08 SEVERE: Failure cause:
tests               | io.prestosql.tempto.query.QueryExecutionException: java.sql.SQLException: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Unable to alter table. The following columns have types incompatible with the existing columns in their respective positions :
tests               | new_age
tests               | 	at io.prestosql.tempto.query.JdbcQueryExecutor.execute(JdbcQueryExecutor.java:119)
tests               | 	at io.prestosql.tempto.query.JdbcQueryExecutor.executeQuery(JdbcQueryExecutor.java:84)
tests               | 	at io.prestosql.tests.hive.TestHiveTransactionalTable.lambda$testHiveColumnTypeChanges$21(TestHiveTransactionalTable.java:824)
tests               | 	at io.prestosql.tests.hive.TestHiveTransactionalTable.withTemporaryTable(TestHiveTransactionalTable.java:892)
tests               | 	at io.prestosql.tests.hive.TestHiveTransactionalTable.testHiveColumnTypeChanges(TestHiveTransactionalTable.java:820)
tests               | 	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
tests               | 	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
tests               | 	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
tests               | 	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
tests               | 	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:104)
tests               | 	at org.testng.internal.Invoker.invokeMethod(Invoker.java:645)
tests               | 	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:851)
tests               | 	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1177)
tests               | 	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:129)
tests               | 	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:112)
tests               | 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
tests               | 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
tests               | 	at java.base/java.lang.Thread.run(Thread.java:834)
tests               | Caused by: java.sql.SQLException: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Unable to alter table. The following columns have types incompatible with the existing columns in their respective positions :
tests               | new_age
tests               | 	at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:275)
tests               | 	at io.prestosql.tempto.query.JdbcQueryExecutor.executeQueryNoParams(JdbcQueryExecutor.java:128)
tests               | 	at io.prestosql.tempto.query.JdbcQueryExecutor.execute(JdbcQueryExecutor.java:112)
tests               | 	... 17 more
tests               | 	Suppressed: java.lang.Exception: Query: ALTER TABLE test_test_hive_renames_true_NONE_17rq0ilovq70 CHANGE COLUMN age new_age TINYINT
tests               | 		at io.prestosql.tempto.query.JdbcQueryExecutor.executeQueryNoParams(JdbcQueryExecutor.java:136)
tests               | 		... 18 more

@sjx782392329
Copy link

@djsstarburst I use hive table by executing command in the hive, it seems that you changed type of field by using presto. I think you should use Hive to modify fields instead of Presto.

@findepi
Copy link
Member

findepi commented Dec 28, 2020

The failure shown by @djsstarburst comes from ALTER done via HiveServer2 ("in Hive").
BTW the set of supported type changes for ALTER varies between Hive versions, so one Hive version can throw "The following columns have types incompatible with the existing columns" while other will be OK with the change.

@sjx782392329 what Hive version are you comparing with?

@sjx782392329
Copy link

hi, the version of hive is 1.2.1 @findepi @djsstarburst

@djsstarburst
Copy link
Member Author

We're using hive 3.1.2-5, and as the link above shows, that version contains the error check. The error check is correct in general - - narrowing an integral type can cause data loss.

@sjx782392329
Copy link

@djsstarburst But, It's normal in v0.214 presto. I want to upgrade the version of presto. SQL previously executed by the user will fail. They will complain about the upgrade

@findepi
Copy link
Member

findepi commented Dec 29, 2020

I am confused. To the best of my knowledge 0.214 does not support transactional tables, does it?

@sjx782392329
Copy link

I don't know why this case is related to transactional tables. Presto v0.214 can read data rightly though type of field had been changed from integer -> tinyint -> integer.

@findepi
Copy link
Member

findepi commented Dec 29, 2020

Presto v0.214 can read data rightly though type of field had been changed from integer -> tinyint -> integer.

Old version's behavior should not be treated as a "requirement" for the future, we wouldn't be able to fix bugs otherwise. The rule here is -- if Hive allows certain schema evolution, and can read from such a table, Trino should read as well.
It gets a bit tricky since different Hive versions allow different schema evolutions, and we want to be compatible with Hive 1.x, 2.x and 3.x at the same time -- as long as it possible.

I don't know why this case is related to transactional tables.

i related to transactional tables as per this issue's title I'd suggest having a separate issue the problem you're experiencing. Since schema evolution needs to be taken care of by the file reader, please make sure to provide exact repro steps, including file format in use.

@sjx782392329
Copy link

I use Hive 3.x can query table rightly, but hive 1.2.1 can't read it correctly

@djsstarburst
Copy link
Member Author

I just read through HiveAlterHandler, which implements all of Hive's variations of ALTER TABLE. AFAICT, Hive pays no attention at all to the column names contained in the data files - - it looks exclusively at the data file column types, matching them up by column order with the latest column names and types for the table from the Hive metastore.

Trino must do the same thing - - ignore the column names from the data files.

@djsstarburst
Copy link
Member Author

djsstarburst commented Dec 29, 2020

Here is the code in OrcPageSourceFactory that is assuming that column names must agree between the metastore schema and schema from the data file footer: https://github.com/trinodb/trino/blob/master/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcPageSourceFactory.java#L318-L327. As you can see, it doesn't even try to read any column whose name from the footer isn't in the set of requested column names:

                if (useOrcColumnNames || isFullAcid) {
                    String columnName = column.getName().toLowerCase(ENGLISH);
                    orcColumn = fileColumnsByName.get(columnName);
                    if (orcColumn != null) {
                        projectedLayout = createProjectedLayout(orcColumn, projectionsByColumnName.get(columnName));
                        columnDomains = effectivePredicateDomains.entrySet().stream()
                                .filter(columnDomain -> columnDomain.getKey().getBaseColumnName().toLowerCase(ENGLISH).equals(columnName))
                                .collect(toImmutableMap(columnDomain -> columnDomain.getKey().getHiveColumnProjectionInfo(), Map.Entry::getValue));
                    }
                }

Near the start of OrcPageSourceFactory.createPageSource, the file column types are extracted from the OrcReader:

            OrcReader reader = optionalOrcReader.get();

            List<OrcColumn> fileColumns = reader.getRootColumn().getNestedColumns();
            ...

It seems to me that the way to ensure data durability across column renames, compatible with Hive, is to post-process the List<OrcColumn>, updating the column name of any OrcColumn element whose corresponding metastore column has changed its name but still has a compatible type.

@djsstarburst
Copy link
Member Author

djsstarburst commented Dec 30, 2020

I created PR #6479 to update Trino to use the latest column names from the metastore when accessing Hive ORC ACID tables. The cases Presto to match Hive behavior after column renaming(s).

@djsstarburst
Copy link
Member Author

#6479 has been merged, so this issue can be closed.

@findepi findepi closed this as completed Mar 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

3 participants