From 613d2c5caf59502bbd69749942ac22c2ccd67ce7 Mon Sep 17 00:00:00 2001 From: Kyle Zhike Chen Date: Tue, 20 Sep 2022 10:48:02 +0800 Subject: [PATCH] [HUDI-4326] add updateTableSerDeInfo for HiveSyncTool (#5920) - This pull request fix [SUPPORT] Hudi spark datasource error after migrate from 0.8 to 0.11 #5861* - The issue is caused by after changing the table to spark data source table, the table SerDeInfo is missing. * Co-authored-by: Sagar Sumit --- .../org/apache/hudi/hive/HiveSyncTool.java | 3 ++ .../hudi/hive/HoodieHiveSyncClient.java | 44 +++++++++++++++++++ .../apache/hudi/hive/TestHiveSyncTool.java | 3 ++ .../sync/common/HoodieMetaSyncOperations.java | 6 +++ 4 files changed, 56 insertions(+) diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index adfe52f920d1..ce3114b92e00 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -290,6 +290,9 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea // Sync the table properties if the schema has changed if (config.getString(HIVE_TABLE_PROPERTIES) != null || config.getBoolean(HIVE_SYNC_AS_DATA_SOURCE_TABLE)) { syncClient.updateTableProperties(tableName, tableProperties); + HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase()); + String serDeFormatClassName = HoodieInputFormatUtils.getSerDeClassName(baseFileFormat); + syncClient.updateTableSerDeInfo(tableName, serDeFormatClassName, serdeProperties); LOG.info("Sync table properties for " + tableName + ", table properties is: " + tableProperties); } schemaChanged = true; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java index d5a85adcbacc..a740c93d65af 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java @@ -34,6 +34,8 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.log4j.LogManager; @@ -128,6 +130,41 @@ public void updateTableProperties(String tableName, Map tablePro } } + /** + * Update the table serde properties to the table. + */ + @Override + public void updateTableSerDeInfo(String tableName, String serdeClass, Map serdeProperties) { + if (serdeProperties == null || serdeProperties.isEmpty()) { + return; + } + try { + Table table = client.getTable(databaseName, tableName); + serdeProperties.put("serialization.format", "1"); + StorageDescriptor storageDescriptor = table.getSd(); + SerDeInfo serdeInfo = storageDescriptor.getSerdeInfo(); + if (serdeInfo != null && serdeInfo.getParametersSize() == serdeProperties.size()) { + Map parameters = serdeInfo.getParameters(); + boolean same = true; + for (String key : serdeProperties.keySet()) { + if (!parameters.containsKey(key) | !parameters.get(key).equals(serdeProperties.get(key))) { + same = false; + break; + } + } + if (same) { + LOG.debug("Table " + tableName + " serdeProperties already up to date, skip update"); + return; + } + } + storageDescriptor.setSerdeInfo(new SerDeInfo(null, serdeClass, serdeProperties)); + client.alter_table(databaseName, tableName, table); + } catch (Exception e) { + throw new HoodieHiveSyncException("Failed to update table serde info for table: " + + tableName, e); + } + } + @Override public void updateTableSchema(String tableName, MessageType newSchema) { ddlExecutor.updateTableDefinition(tableName, newSchema); @@ -316,4 +353,11 @@ public void updateTableComments(String tableName, List fromMetastor } } + Table getTable(String tableName) { + try { + return client.getTable(databaseName, tableName); + } catch (TException e) { + throw new HoodieHiveSyncException(String.format("Database: %s, Table: %s does not exist", databaseName, tableName), e); + } + } } diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 1d454786859e..7b58f5c3f37f 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -159,6 +159,9 @@ public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode) assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should exist after sync completes"); + assertEquals("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", + hiveClient.getTable(HiveTestUtil.TABLE_NAME).getSd().getSerdeInfo().getSerializationLib(), + "SerDe info not updated or does not match"); assertEquals(hiveClient.getMetastoreSchema(HiveTestUtil.TABLE_NAME).size(), hiveClient.getStorageSchema().getColumns().size() + 1, "Hive Schema should match the table schema + partition field"); diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java index 1c16dd13edaa..5afcf80a877e 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java @@ -173,6 +173,12 @@ default void updateTableProperties(String tableName, Map tablePr } + /** + * Update the table SerDeInfo in metastore. + */ + default void updateTableSerDeInfo(String tableName, String serdeClass, Map serdeProperties) { + } + /** * Get the timestamp of last replication. */