Skip to content

Commit

Permalink
[HUDI-4326] add updateTableSerDeInfo for HiveSyncTool (apache#5920)
Browse files Browse the repository at this point in the history
- This pull request fix [SUPPORT] Hudi spark datasource error after migrate from 0.8 to 0.11 apache#5861*
- The issue is caused by after changing the table to spark data source table, the table SerDeInfo is missing. *

Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com>
  • Loading branch information
2 people authored and yuzhaojing committed Sep 29, 2022
1 parent 28624be commit 613d2c5
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea
// Sync the table properties if the schema has changed
if (config.getString(HIVE_TABLE_PROPERTIES) != null || config.getBoolean(HIVE_SYNC_AS_DATA_SOURCE_TABLE)) {
syncClient.updateTableProperties(tableName, tableProperties);
HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase());
String serDeFormatClassName = HoodieInputFormatUtils.getSerDeClassName(baseFileFormat);
syncClient.updateTableSerDeInfo(tableName, serDeFormatClassName, serdeProperties);
LOG.info("Sync table properties for " + tableName + ", table properties is: " + tableProperties);
}
schemaChanged = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.log4j.LogManager;
Expand Down Expand Up @@ -128,6 +130,41 @@ public void updateTableProperties(String tableName, Map<String, String> tablePro
}
}

/**
* Update the table serde properties to the table.
*/
@Override
public void updateTableSerDeInfo(String tableName, String serdeClass, Map<String, String> serdeProperties) {
if (serdeProperties == null || serdeProperties.isEmpty()) {
return;
}
try {
Table table = client.getTable(databaseName, tableName);
serdeProperties.put("serialization.format", "1");
StorageDescriptor storageDescriptor = table.getSd();
SerDeInfo serdeInfo = storageDescriptor.getSerdeInfo();
if (serdeInfo != null && serdeInfo.getParametersSize() == serdeProperties.size()) {
Map<String, String> parameters = serdeInfo.getParameters();
boolean same = true;
for (String key : serdeProperties.keySet()) {
if (!parameters.containsKey(key) | !parameters.get(key).equals(serdeProperties.get(key))) {
same = false;
break;
}
}
if (same) {
LOG.debug("Table " + tableName + " serdeProperties already up to date, skip update");
return;
}
}
storageDescriptor.setSerdeInfo(new SerDeInfo(null, serdeClass, serdeProperties));
client.alter_table(databaseName, tableName, table);
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to update table serde info for table: "
+ tableName, e);
}
}

@Override
public void updateTableSchema(String tableName, MessageType newSchema) {
ddlExecutor.updateTableDefinition(tableName, newSchema);
Expand Down Expand Up @@ -316,4 +353,11 @@ public void updateTableComments(String tableName, List<FieldSchema> fromMetastor
}
}

Table getTable(String tableName) {
try {
return client.getTable(databaseName, tableName);
} catch (TException e) {
throw new HoodieHiveSyncException(String.format("Database: %s, Table: %s does not exist", databaseName, tableName), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode)

assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should exist after sync completes");
assertEquals("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
hiveClient.getTable(HiveTestUtil.TABLE_NAME).getSd().getSerdeInfo().getSerializationLib(),
"SerDe info not updated or does not match");
assertEquals(hiveClient.getMetastoreSchema(HiveTestUtil.TABLE_NAME).size(),
hiveClient.getStorageSchema().getColumns().size() + 1,
"Hive Schema should match the table schema + partition field");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ default void updateTableProperties(String tableName, Map<String, String> tablePr

}

/**
* Update the table SerDeInfo in metastore.
*/
default void updateTableSerDeInfo(String tableName, String serdeClass, Map<String, String> serdeProperties) {
}

/**
* Get the timestamp of last replication.
*/
Expand Down

0 comments on commit 613d2c5

Please sign in to comment.