Skip to content

Commit

Permalink
[HUDI-3981] Flink engine support for comprehensive schema evolution (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
trushev authored and vamshigv committed Dec 1, 2022
1 parent 1f5ff3c commit a89eab8
Show file tree
Hide file tree
Showing 17 changed files with 1,347 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.FlinkHoodieIndexFactory;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.action.HoodieWriteMetadata;
Expand All @@ -57,6 +60,9 @@ public static <T extends HoodieRecordPayload> HoodieFlinkTable<T> create(HoodieW
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
.setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
if (config.getSchemaEvolutionEnable()) {
setLatestInternalSchema(config, metaClient);
}
return HoodieFlinkTable.create(config, context, metaClient);
}

Expand Down Expand Up @@ -102,4 +108,11 @@ public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetad
return Option.empty();
}
}

private static void setLatestInternalSchema(HoodieWriteConfig config, HoodieTableMetaClient metaClient) {
Option<InternalSchema> internalSchema = new TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata();
if (internalSchema.isPresent()) {
config.setInternalSchemaString(SerDeHelper.toJson(internalSchema.get()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.configuration;

import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
Expand Down Expand Up @@ -211,6 +212,13 @@ public static HoodieCDCSupplementalLoggingMode getCDCSupplementalLoggingMode(Con
return HoodieCDCSupplementalLoggingMode.parse(mode);
}

/**
* Returns whether comprehensive schema evolution enabled.
*/
public static boolean isSchemaEvolutionEnabled(Configuration conf) {
return conf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), false);
}

// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.hudi.source.StreamReadMonitoringFunction;
import org.apache.hudi.source.StreamReadOperator;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.table.format.cdc.CdcInputFormat;
import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
Expand Down Expand Up @@ -124,6 +125,7 @@ public class HoodieTableSource implements
private final String defaultPartName;
private final Configuration conf;
private final FileIndex fileIndex;
private final InternalSchemaManager internalSchemaManager;

private int[] requiredPos;
private long limit;
Expand All @@ -136,7 +138,7 @@ public HoodieTableSource(
List<String> partitionKeys,
String defaultPartName,
Configuration conf) {
this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, null, null);
this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, null, null, null);
}

public HoodieTableSource(
Expand All @@ -149,7 +151,8 @@ public HoodieTableSource(
@Nullable List<Map<String, String>> requiredPartitions,
@Nullable int[] requiredPos,
@Nullable Long limit,
@Nullable HoodieTableMetaClient metaClient) {
@Nullable HoodieTableMetaClient metaClient,
@Nullable InternalSchemaManager internalSchemaManager) {
this.schema = schema;
this.tableRowType = (RowType) schema.toPhysicalRowDataType().notNull().getLogicalType();
this.path = path;
Expand All @@ -167,6 +170,9 @@ public HoodieTableSource(
this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
this.metaClient = metaClient == null ? StreamerUtil.metaClientForReader(conf, hadoopConf) : metaClient;
this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(conf);
this.internalSchemaManager = internalSchemaManager == null
? InternalSchemaManager.get(this.conf, this.metaClient)
: internalSchemaManager;
}

@Override
Expand Down Expand Up @@ -216,7 +222,7 @@ public ChangelogMode getChangelogMode() {
@Override
public DynamicTableSource copy() {
return new HoodieTableSource(schema, path, partitionKeys, defaultPartName,
conf, fileIndex, requiredPartitions, requiredPos, limit, metaClient);
conf, fileIndex, requiredPartitions, requiredPos, limit, metaClient, internalSchemaManager);
}

@Override
Expand Down Expand Up @@ -469,6 +475,7 @@ private MergeOnReadInputFormat mergeOnReadInputFormat(
.defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
.limit(this.limit)
.emitDelete(emitDelete)
.internalSchemaManager(internalSchemaManager)
.build();
}

Expand All @@ -492,7 +499,8 @@ private MergeOnReadInputFormat mergeOnReadInputFormat(
this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value
getParquetConf(this.conf, this.hadoopConf),
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE)
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE),
this.internalSchemaManager
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.table.format;

import org.apache.hudi.common.util.Option;
import org.apache.hudi.util.RowDataCastProjection;
import org.apache.hudi.util.RowDataProjection;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;

import javax.annotation.Nullable;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.DATE;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER;
import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;

/**
* CastMap is responsible for conversion of flink types when full schema evolution enabled.
* Supported cast conversions:
* Integer => Long, Float, Double, Decimal, String
* Long => Float, Double, Decimal, String
* Float => Double, Decimal, String
* Double => Decimal, String
* Decimal => Decimal, String
* String => Decimal, Date
* Date => String
*/
public final class CastMap implements Serializable {

private static final long serialVersionUID = 1L;

// Maps position to corresponding cast
private final Map<Integer, Cast> castMap = new HashMap<>();

private DataType[] fileFieldTypes;

public Option<RowDataProjection> toRowDataProjection(int[] selectedFields) {
if (castMap.isEmpty()) {
return Option.empty();
}
LogicalType[] requiredType = new LogicalType[selectedFields.length];
for (int i = 0; i < selectedFields.length; i++) {
requiredType[i] = fileFieldTypes[selectedFields[i]].getLogicalType();
}
return Option.of(new RowDataCastProjection(requiredType, this));
}

public Object castIfNeeded(int pos, Object val) {
Cast cast = castMap.get(pos);
if (cast == null) {
return val;
}
return cast.convert(val);
}

public DataType[] getFileFieldTypes() {
return fileFieldTypes;
}

public void setFileFieldTypes(DataType[] fileFieldTypes) {
this.fileFieldTypes = fileFieldTypes;
}

@VisibleForTesting
void add(int pos, LogicalType fromType, LogicalType toType) {
Function<Object, Object> conversion = getConversion(fromType, toType);
if (conversion == null) {
throw new IllegalArgumentException(String.format("Cannot create cast %s => %s at pos %s", fromType, toType, pos));
}
add(pos, new Cast(fromType, toType, conversion));
}

private @Nullable Function<Object, Object> getConversion(LogicalType fromType, LogicalType toType) {
LogicalTypeRoot from = fromType.getTypeRoot();
LogicalTypeRoot to = toType.getTypeRoot();
switch (to) {
case BIGINT: {
if (from == INTEGER) {
return val -> ((Number) val).longValue();
}
break;
}
case FLOAT: {
if (from == INTEGER || from == BIGINT) {
return val -> ((Number) val).floatValue();
}
break;
}
case DOUBLE: {
if (from == INTEGER || from == BIGINT) {
return val -> ((Number) val).doubleValue();
}
if (from == FLOAT) {
return val -> Double.parseDouble(val.toString());
}
break;
}
case DECIMAL: {
if (from == INTEGER || from == BIGINT || from == DOUBLE) {
return val -> toDecimalData((Number) val, toType);
}
if (from == FLOAT) {
return val -> toDecimalData(Double.parseDouble(val.toString()), toType);
}
if (from == VARCHAR) {
return val -> toDecimalData(Double.parseDouble(val.toString()), toType);
}
if (from == DECIMAL) {
return val -> toDecimalData(((DecimalData) val).toBigDecimal(), toType);
}
break;
}
case VARCHAR: {
if (from == INTEGER
|| from == BIGINT
|| from == FLOAT
|| from == DOUBLE
|| from == DECIMAL) {
return val -> new BinaryStringData(String.valueOf(val));
}
if (from == DATE) {
return val -> new BinaryStringData(LocalDate.ofEpochDay(((Integer) val).longValue()).toString());
}
break;
}
case DATE: {
if (from == VARCHAR) {
return val -> (int) LocalDate.parse(val.toString()).toEpochDay();
}
break;
}
default:
}
return null;
}

private void add(int pos, Cast cast) {
castMap.put(pos, cast);
}

private DecimalData toDecimalData(Number val, LogicalType decimalType) {
BigDecimal valAsDecimal = BigDecimal.valueOf(val.doubleValue());
return toDecimalData(valAsDecimal, decimalType);
}

private DecimalData toDecimalData(BigDecimal valAsDecimal, LogicalType decimalType) {
return DecimalData.fromBigDecimal(
valAsDecimal,
((DecimalType) decimalType).getPrecision(),
((DecimalType) decimalType).getScale());
}

/**
* Fields {@link Cast#from} and {@link Cast#to} are redundant due to {@link Cast#convert(Object)} determines conversion.
* However, it is convenient to debug {@link CastMap} when {@link Cast#toString()} prints types.
*/
private static final class Cast implements Serializable {

private static final long serialVersionUID = 1L;

private final LogicalType from;
private final LogicalType to;
private final Function<Object, Object> conversion;

Cast(LogicalType from, LogicalType to, Function<Object, Object> conversion) {
this.from = from;
this.to = to;
this.conversion = conversion;
}

Object convert(Object val) {
return conversion.apply(val);
}

@Override
public String toString() {
return from + " => " + to;
}
}

@Override
public String toString() {
return castMap.entrySet().stream()
.map(e -> e.getKey() + ": " + e.getValue())
.collect(Collectors.joining(", ", "{", "}"));
}
}

0 comments on commit a89eab8

Please sign in to comment.