Skip to content

Commit

Permalink
[HUDI-7826] Make column nullable when setNullForMissingColumns is true (
Browse files Browse the repository at this point in the history
  • Loading branch information
the-other-tim-brown authored Jun 11, 2024
1 parent 2a56065 commit 51ef709
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom
internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
SerDeHelper.parseSchemas(historySchemaStr));
}
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(avroSchema, internalSchema);
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(avroSchema, internalSchema, config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS));
if (evolvedSchema.equals(internalSchema)) {
metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(evolvedSchema));
//TODO save history schema by metaTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private Option<Function<HoodieRecord, HoodieRecord>> composeSchemaEvolutionTrans
// TODO support bootstrap
if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) {
// check implicitly add columns, and position reorder(spark sql may change cols order)
InternalSchema querySchema = AvroSchemaEvolutionUtils.reconcileSchema(writerSchema, querySchemaOpt.get());
InternalSchema querySchema = AvroSchemaEvolutionUtils.reconcileSchema(writerSchema, querySchemaOpt.get(), writeConfig.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS));
long commitInstantTime = Long.parseLong(baseFile.getCommitTime());
InternalSchema fileSchema = InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, metaClient);
if (fileSchema.isEmptySchema() && writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@

package org.apache.hudi.internal.schema.utils;

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.action.TableChanges;
import org.apache.hudi.internal.schema.action.TableChangesHelper;

import org.apache.avro.Schema;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;

Expand All @@ -36,6 +41,8 @@
* Utility methods to support evolve old avro schema based on a given schema.
*/
public class AvroSchemaEvolutionUtils {
private static final Set<String> META_FIELD_NAMES = Arrays.stream(HoodieRecord.HoodieMetadataField.values())
.map(HoodieRecord.HoodieMetadataField::getFieldName).collect(Collectors.toSet());

/**
* Support reconcile from a new avroSchema.
Expand All @@ -50,11 +57,13 @@ public class AvroSchemaEvolutionUtils {
* for example: incoming schema: int a, int b, int d; oldTableSchema int a, int b, int c, int d
* we must guarantee the column c is missing semantic, instead of delete semantic.
*
* @param incomingSchema implicitly evolution of avro when hoodie write operation
* @param oldTableSchema old internalSchema
* @param incomingSchema implicitly evolution of avro when hoodie write operation
* @param oldTableSchema old internalSchema
* @param makeMissingFieldsNullable if true, fields missing from the incoming schema when compared to the oldTableSchema will become
* nullable in the result. Otherwise, no updates will be made to those fields.
* @return reconcile Schema
*/
public static InternalSchema reconcileSchema(Schema incomingSchema, InternalSchema oldTableSchema) {
public static InternalSchema reconcileSchema(Schema incomingSchema, InternalSchema oldTableSchema, boolean makeMissingFieldsNullable) {
/* If incoming schema is null, we fall back on table schema. */
if (incomingSchema.getType() == Schema.Type.NULL) {
return oldTableSchema;
Expand Down Expand Up @@ -114,11 +123,28 @@ public static InternalSchema reconcileSchema(Schema incomingSchema, InternalSche
typeChange.updateColumnType(col, inComingInternalSchema.findType(col));
});

if (makeMissingFieldsNullable) {
// mark columns missing from incoming schema as nullable
Set<String> visited = new HashSet<>();
diffFromOldSchema.stream()
// ignore meta fields
.filter(col -> !META_FIELD_NAMES.contains(col))
.sorted()
.forEach(col -> {
// if parent is marked as nullable, only update the parent and not all the missing children field
String parent = TableChangesHelper.getParentName(col);
if (!visited.contains(parent)) {
typeChange.updateColumnNullability(col, true);
}
visited.add(col);
});
}

return SchemaChangeUtils.applyTableChanges2Schema(internalSchemaAfterAddColumns, typeChange);
}

public static Schema reconcileSchema(Schema incomingSchema, Schema oldTableSchema) {
return convert(reconcileSchema(incomingSchema, convert(oldTableSchema)), oldTableSchema.getFullName());
public static Schema reconcileSchema(Schema incomingSchema, Schema oldTableSchema, boolean makeMissingFieldsNullable) {
return convert(reconcileSchema(incomingSchema, convert(oldTableSchema), makeMissingFieldsNullable), oldTableSchema.getFullName());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ public static Schema.Field createArrayField(String name, Schema schema) {
return new Schema.Field(name, Schema.createArray(schema), null, null);
}

public static Schema.Field createNullableArrayField(String name, Schema schema) {
return new Schema.Field(name, Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.createArray(schema)), null, Schema.Field.NULL_VALUE);
}

public static Schema.Field createMapField(String name, Schema.Type type) {
return createMapField(name, Schema.create(type));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ public void testEvolutionSchemaFromNewAvroSchema() {
);
evolvedRecord = (Types.RecordType)InternalSchemaBuilder.getBuilder().refreshNewId(evolvedRecord, new AtomicInteger(0));
Schema evolvedAvroSchema = AvroInternalSchemaConverter.convert(evolvedRecord, "test1");
InternalSchema result = AvroSchemaEvolutionUtils.reconcileSchema(evolvedAvroSchema, oldSchema);
InternalSchema result = AvroSchemaEvolutionUtils.reconcileSchema(evolvedAvroSchema, oldSchema, false);
Types.RecordType checkedRecord = Types.RecordType.get(
Types.Field.get(0, false, "id", Types.IntType.get()),
Types.Field.get(1, true, "data", Types.StringType.get()),
Expand Down Expand Up @@ -535,7 +535,7 @@ public void testReconcileSchema() {
+ "{\"name\":\"d2\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null}]}");

Schema simpleReconcileSchema = AvroInternalSchemaConverter.convert(AvroSchemaEvolutionUtils
.reconcileSchema(incomingSchema, AvroInternalSchemaConverter.convert(schema)), "schemaNameFallback");
.reconcileSchema(incomingSchema, AvroInternalSchemaConverter.convert(schema), false), "schemaNameFallback");
Assertions.assertEquals(simpleCheckSchema, simpleReconcileSchema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hudi.HoodieSparkSqlWriter.{CANONICALIZE_SCHEMA, SQL_MERGE_INTO
import org.apache.hudi.avro.AvroSchemaUtils.{checkSchemaCompatible, checkValidEvolution, isCompatibleProjectionOf, isSchemaCompatible}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.avro.HoodieAvroUtils.removeMetadataFields
import org.apache.hudi.common.config.{HoodieConfig, TypedProperties}
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieConfig, TypedProperties}
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.config.HoodieWriteConfig
Expand All @@ -32,7 +32,6 @@ import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileSchemaRequirements

import org.apache.avro.Schema
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -133,7 +132,7 @@ object HoodieSchemaUtils {
if (!mergeIntoWrites && !shouldValidateSchemasCompatibility && !allowAutoEvolutionColumnDrop) {
// Default behaviour
val reconciledSchema = if (setNullForMissingColumns) {
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, latestTableSchema)
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, latestTableSchema, setNullForMissingColumns)
} else {
canonicalizedSourceSchema
}
Expand Down Expand Up @@ -163,7 +162,9 @@ object HoodieSchemaUtils {
internalSchemaOpt match {
case Some(internalSchema) =>
// Apply schema evolution, by auto-merging write schema and read schema
val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, internalSchema)
val setNullForMissingColumns = opts.getOrElse(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.key(),
HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.defaultValue()).toBoolean
val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, internalSchema, setNullForMissingColumns)
val evolvedSchema = AvroInternalSchemaConverter.convert(mergedInternalSchema, latestTableSchema.getFullName)
val shouldRemoveMetaDataFromInternalSchema = sourceSchema.getFields().asScala.filter(f => f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty
if (shouldRemoveMetaDataFromInternalSchema) HoodieAvroUtils.removeMetadataFields(evolvedSchema) else evolvedSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.apache.hudi.avro.AvroSchemaTestUtils.createArrayField;
import static org.apache.hudi.avro.AvroSchemaTestUtils.createMapField;
import static org.apache.hudi.avro.AvroSchemaTestUtils.createNestedField;
import static org.apache.hudi.avro.AvroSchemaTestUtils.createNullableArrayField;
import static org.apache.hudi.avro.AvroSchemaTestUtils.createNullablePrimitiveField;
import static org.apache.hudi.avro.AvroSchemaTestUtils.createPrimitiveField;
import static org.apache.hudi.avro.AvroSchemaTestUtils.createRecord;
Expand Down Expand Up @@ -199,7 +200,12 @@ void testMissingColumn(boolean allowDroppedColumns) {
createPrimitiveField("field1", Schema.Type.INT),
createPrimitiveField("field3", Schema.Type.INT));
try {
assertEquals(start, deduceWriterSchema(end, start, allowDroppedColumns));
Schema actual = deduceWriterSchema(end, start, allowDroppedColumns);
Schema expected = createRecord("missingSimpleField",
createPrimitiveField("field1", Schema.Type.INT),
createNullablePrimitiveField("field2", Schema.Type.INT),
createPrimitiveField("field3", Schema.Type.INT));
assertEquals(expected, actual);
assertTrue(allowDroppedColumns);
} catch (MissingSchemaFieldException e) {
assertFalse(allowDroppedColumns);
Expand All @@ -220,7 +226,16 @@ void testMissingColumn(boolean allowDroppedColumns) {
createPrimitiveField("field2", Schema.Type.INT),
createPrimitiveField("field4", Schema.Type.INT));
try {
assertEquals(start, deduceWriterSchema(end, start, allowDroppedColumns));
Schema actual = deduceWriterSchema(end, start, allowDroppedColumns);
Schema expected = createRecord("missingComplexField",
createPrimitiveField("field1", Schema.Type.INT),
createPrimitiveField("field2", Schema.Type.INT),
createNullableArrayField("field3", createRecord("nestedRecord",
createPrimitiveField("nestedField1", Schema.Type.INT),
createPrimitiveField("nestedField2", Schema.Type.INT),
createPrimitiveField("nestedField3", Schema.Type.INT))),
createPrimitiveField("field4", Schema.Type.INT));
assertEquals(expected, actual);
assertTrue(allowDroppedColumns);
} catch (MissingSchemaFieldException e) {
assertFalse(allowDroppedColumns);
Expand All @@ -235,7 +250,16 @@ void testMissingColumn(boolean allowDroppedColumns) {
createPrimitiveField("nestedField3", Schema.Type.INT))),
createPrimitiveField("field4", Schema.Type.INT));
try {
assertEquals(start, deduceWriterSchema(end, start, allowDroppedColumns));
Schema actual = deduceWriterSchema(end, start, allowDroppedColumns);
Schema expected = createRecord("missingComplexField",
createPrimitiveField("field1", Schema.Type.INT),
createNullablePrimitiveField("field2", Schema.Type.INT),
createArrayField("field3", createRecord("nestedRecord",
createNullablePrimitiveField("nestedField1", Schema.Type.INT),
createPrimitiveField("nestedField2", Schema.Type.INT),
createPrimitiveField("nestedField3", Schema.Type.INT))),
createPrimitiveField("field4", Schema.Type.INT));
assertEquals(expected, actual);
assertTrue(allowDroppedColumns);
} catch (MissingSchemaFieldException e) {
assertFalse(allowDroppedColumns);
Expand All @@ -254,7 +278,11 @@ void testFieldReordering() {
Schema end = createRecord("reorderFields",
createPrimitiveField("field3", Schema.Type.INT),
createPrimitiveField("field1", Schema.Type.INT));
assertEquals(start, deduceWriterSchema(end, start, true));
Schema expected = createRecord("reorderFields",
createPrimitiveField("field1", Schema.Type.INT),
createNullablePrimitiveField("field2", Schema.Type.INT),
createPrimitiveField("field3", Schema.Type.INT));
assertEquals(expected, deduceWriterSchema(end, start, true));

// nested field ordering changes and new field is added
start = createRecord("reorderNestedFields",
Expand All @@ -276,7 +304,7 @@ void testFieldReordering() {
createPrimitiveField("nestedField4", Schema.Type.INT))),
createPrimitiveField("field4", Schema.Type.INT));

Schema expected = createRecord("reorderNestedFields",
expected = createRecord("reorderNestedFields",
createPrimitiveField("field1", Schema.Type.INT),
createPrimitiveField("field2", Schema.Type.INT),
createArrayField("field3", createRecord("reorderNestedFields.field3",
Expand Down

0 comments on commit 51ef709

Please sign in to comment.