Skip to content

Commit

Permalink
[HUDI-4171] Fixing Non partitioned with virtual keys in read path (ap…
Browse files Browse the repository at this point in the history
…ache#5747)

- When Non partitioned key gen is used with virtual keys, read path could break since partition path may not exist.
  • Loading branch information
nsivabalan authored and yihua committed Jun 7, 2022
1 parent 9f646d4 commit e3ce7a1
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,19 @@ public static HoodieTableMetaClient init(Configuration hadoopConf, String basePa
}

public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
HoodieFileFormat baseFileFormat)
HoodieFileFormat baseFileFormat) throws IOException {
return init(hadoopConf, basePath, tableType, baseFileFormat, false, null, true);
}

public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType,
HoodieFileFormat baseFileFormat, boolean setKeyGen, String keyGenerator, boolean populateMetaFields)
throws IOException {
Properties properties = new Properties();
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), baseFileFormat.toString());
if (setKeyGen) {
properties.setProperty("hoodie.datasource.write.keygenerator.class", keyGenerator);
}
properties.setProperty("hoodie.populate.meta.fields", Boolean.toString(populateMetaFields));
return init(hadoopConf, basePath, tableType, properties);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
Expand Down Expand Up @@ -275,16 +276,16 @@ protected static Option<HoodieVirtualKeyInfo> getHoodieVirtualKeyInfo(HoodieTabl
if (tableConfig.populateMetaFields()) {
return Option.empty();
}

TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
try {
Schema schema = tableSchemaResolver.getTableAvroSchema();
boolean isNonPartitionedKeyGen = StringUtils.isNullOrEmpty(tableConfig.getPartitionFieldProp());
return Option.of(
new HoodieVirtualKeyInfo(
tableConfig.getRecordKeyFieldProp(),
tableConfig.getPartitionFieldProp(),
isNonPartitionedKeyGen ? Option.empty() : Option.of(tableConfig.getPartitionFieldProp()),
schema.getField(tableConfig.getRecordKeyFieldProp()).pos(),
schema.getField(tableConfig.getPartitionFieldProp()).pos()));
isNonPartitionedKeyGen ? Option.empty() : Option.of(schema.getField(tableConfig.getPartitionFieldProp()).pos())));
} catch (Exception exception) {
throw new HoodieException("Fetching table schema failed with exception ", exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.hadoop.realtime;

import org.apache.hudi.common.util.Option;

import java.io.Serializable;

/**
Expand All @@ -26,11 +28,11 @@
public class HoodieVirtualKeyInfo implements Serializable {

private final String recordKeyField;
private final String partitionPathField;
private final Option<String> partitionPathField;
private final int recordKeyFieldIndex;
private final int partitionPathFieldIndex;
private final Option<Integer> partitionPathFieldIndex;

public HoodieVirtualKeyInfo(String recordKeyField, String partitionPathField, int recordKeyFieldIndex, int partitionPathFieldIndex) {
public HoodieVirtualKeyInfo(String recordKeyField, Option<String> partitionPathField, int recordKeyFieldIndex, Option<Integer> partitionPathFieldIndex) {
this.recordKeyField = recordKeyField;
this.partitionPathField = partitionPathField;
this.recordKeyFieldIndex = recordKeyFieldIndex;
Expand All @@ -41,25 +43,25 @@ public String getRecordKeyField() {
return recordKeyField;
}

public String getPartitionPathField() {
public Option<String> getPartitionPathField() {
return partitionPathField;
}

public int getRecordKeyFieldIndex() {
return recordKeyFieldIndex;
}

public int getPartitionPathFieldIndex() {
public Option<Integer> getPartitionPathFieldIndex() {
return partitionPathFieldIndex;
}

@Override
public String toString() {
return "HoodieVirtualKeyInfo{"
+ "recordKeyField='" + recordKeyField + '\''
+ ", partitionPathField='" + partitionPathField + '\''
+ ", partitionPathField='" + (partitionPathField.isPresent() ? partitionPathField.get() : "null") + '\''
+ ", recordKeyFieldIndex=" + recordKeyFieldIndex
+ ", partitionPathFieldIndex=" + partitionPathFieldIndex
+ ", partitionPathFieldIndex=" + (partitionPathFieldIndex.isPresent() ? partitionPathFieldIndex.get() : "-1")
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,12 @@ default void writeToOutput(DataOutput out) throws IOException {
} else {
InputSplitUtils.writeBoolean(true, out);
InputSplitUtils.writeString(virtualKeyInfoOpt.get().getRecordKeyField(), out);
InputSplitUtils.writeString(virtualKeyInfoOpt.get().getPartitionPathField(), out);
InputSplitUtils.writeString(String.valueOf(virtualKeyInfoOpt.get().getRecordKeyFieldIndex()), out);
InputSplitUtils.writeString(String.valueOf(virtualKeyInfoOpt.get().getPartitionPathFieldIndex()), out);
InputSplitUtils.writeBoolean(virtualKeyInfoOpt.get().getPartitionPathField().isPresent(), out);
if (virtualKeyInfoOpt.get().getPartitionPathField().isPresent()) {
InputSplitUtils.writeString(virtualKeyInfoOpt.get().getPartitionPathField().get(), out);
InputSplitUtils.writeString(String.valueOf(virtualKeyInfoOpt.get().getPartitionPathFieldIndex()), out);
}
}
}

Expand All @@ -130,9 +133,10 @@ default void readFromInput(DataInput in) throws IOException {
boolean hoodieVirtualKeyPresent = InputSplitUtils.readBoolean(in);
if (hoodieVirtualKeyPresent) {
String recordKeyField = InputSplitUtils.readString(in);
String partitionPathField = InputSplitUtils.readString(in);
int recordFieldIndex = Integer.parseInt(InputSplitUtils.readString(in));
int partitionPathIndex = Integer.parseInt(InputSplitUtils.readString(in));
boolean isPartitionPathFieldPresent = InputSplitUtils.readBoolean(in);
Option<String> partitionPathField = isPartitionPathFieldPresent ? Option.of(InputSplitUtils.readString(in)) : Option.empty();
Option<Integer> partitionPathIndex = isPartitionPathFieldPresent ? Option.of(Integer.parseInt(InputSplitUtils.readString(in))) : Option.empty();
setVirtualKeyInfo(Option.of(new HoodieVirtualKeyInfo(recordKeyField, partitionPathField, recordFieldIndex, partitionPathIndex)));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ public static void addRequiredProjectionFields(Configuration configuration, Opti
} else {
HoodieVirtualKeyInfo hoodieVirtualKey = hoodieVirtualKeyInfo.get();
addProjectionField(configuration, hoodieVirtualKey.getRecordKeyField(), hoodieVirtualKey.getRecordKeyFieldIndex());
addProjectionField(configuration, hoodieVirtualKey.getPartitionPathField(), hoodieVirtualKey.getPartitionPathFieldIndex());
if (hoodieVirtualKey.getPartitionPathField().isPresent()) {
addProjectionField(configuration, hoodieVirtualKey.getPartitionPathField().get(), hoodieVirtualKey.getPartitionPathFieldIndex().get());
}
}
}

Expand All @@ -99,7 +101,8 @@ public static boolean requiredProjectionFieldsExistInConf(Configuration configur
&& readColNames.contains(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
} else {
return readColNames.contains(hoodieVirtualKeyInfo.get().getRecordKeyField())
&& readColNames.contains(hoodieVirtualKeyInfo.get().getPartitionPathField());
&& (hoodieVirtualKeyInfo.get().getPartitionPathField().isPresent() ? readColNames.contains(hoodieVirtualKeyInfo.get().getPartitionPathField().get())
: true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,25 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapreduce.Job;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
Expand All @@ -55,6 +61,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
Expand Down Expand Up @@ -167,6 +174,26 @@ public void testInputFormatLoad() throws IOException {
assertEquals(10, files.length);
}

@Test
public void testInputFormatLoadForNonPartitionedAndVirtualKeyedTable() throws IOException {
// initial commit
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
File partitionDir = InputFormatTestUtil.prepareCustomizedTable(basePath, baseFileFormat, 10, "100", true, false,
true, schema);
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT,
schema.toString(), HoodieTimeline.COMMIT_ACTION);
FileCreateUtils.createCommit(basePath.toString(), "100", Option.of(commitMetadata));

// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());

InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 10);
assertEquals(10, inputSplits.length);

FileStatus[] files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length);
}

@Test
public void testInputFormatLoadWithEmptyTable() throws IOException {
// initial hoodie table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,35 @@ public class InputFormatTestUtil {
private static String TEST_WRITE_TOKEN = "1-0-1";

public static File prepareTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles,
String commitNumber)
String commitNumber) throws IOException {
return prepareCustomizedTable(basePath, baseFileFormat, numberOfFiles, commitNumber, false, true, false, null);
}

public static File prepareCustomizedTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles,
String commitNumber, boolean useNonPartitionedKeyGen, boolean populateMetaFields, boolean injectData, Schema schema)
throws IOException {
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
baseFileFormat);
if (useNonPartitionedKeyGen) {
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
baseFileFormat, true, "org.apache.hudi.keygen.NonpartitionedKeyGenerator", populateMetaFields);
} else {
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE,
baseFileFormat);
}

java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01"));
setupPartition(basePath, partitionPath);

return simulateInserts(partitionPath.toFile(), baseFileFormat.getFileExtension(), "fileId1", numberOfFiles,
commitNumber);
if (injectData) {
try {
createSimpleData(schema, partitionPath, numberOfFiles, 100, commitNumber);
return partitionPath.toFile();
} catch (Exception e) {
throw new IOException("Excpetion thrown while writing data ", e);
}
} else {
return simulateInserts(partitionPath.toFile(), baseFileFormat.getFileExtension(), "fileId1", numberOfFiles,
commitNumber);
}
}

public static File prepareMultiPartitionTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles,
Expand Down

0 comments on commit e3ce7a1

Please sign in to comment.