Skip to content

Commit

Permalink
[HUDI-5083]Fixed a bug when schema evolution (apache#7045)
Browse files Browse the repository at this point in the history
  • Loading branch information
shenshengli authored and satishkotha committed Dec 11, 2022
1 parent 2cd4b53 commit 01806ae
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 2 deletions.
Expand Up @@ -767,14 +767,14 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvr
Schema.Field field = fields.get(i);
String fieldName = field.name();
fieldNames.push(fieldName);
if (oldSchema.getField(field.name()) != null) {
if (oldSchema.getField(field.name()) != null && !renameCols.containsKey(field.name())) {
Schema.Field oldField = oldSchema.getField(field.name());
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
} else {
String fieldFullName = createFullName(fieldNames);
String fieldNameFromOldSchema = renameCols.getOrDefault(fieldFullName, "");
// deal with rename
if (oldSchema.getField(field.name()) == null && oldSchema.getField(fieldNameFromOldSchema) != null) {
if (oldSchema.getField(fieldNameFromOldSchema) != null) {
// find rename
Schema.Field oldField = oldSchema.getField(fieldNameFromOldSchema);
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
Expand Down
Expand Up @@ -385,6 +385,44 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
}
}

test("Test Alter Table multiple times") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}"
if (HoodieSparkUtils.gteqSpark3_1) {
spark.sql("set hoodie.schema.on.read.enable=true")
spark.sql(
s"""
|create table $tableName (
| id int,
| col1 string,
| col2 string,
| ts long
|) using hudi
| location '$tablePath'
| options (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
spark.sql(s"show create table ${tableName}").show(false)
spark.sql(s"insert into ${tableName} values (1, 'aaa', 'bbb', 1000)")

// Rename to a previously existing column name + insert
spark.sql(s"alter table ${tableName} drop column col1")
spark.sql(s"alter table ${tableName} rename column col2 to col1")

spark.sql(s"insert into ${tableName} values (2, 'aaa', 1000)")
checkAnswer(spark.sql(s"select col1 from ${tableName} order by id").collect())(
Seq("bbb"), Seq("aaa")
)
}
}
}
}

test("Test Alter Table complex") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
Expand Down

0 comments on commit 01806ae

Please sign in to comment.