Skip to content

Commit

Permalink
[HUDI-3995] Making perf optimizations for bulk insert row writer path (
Browse files Browse the repository at this point in the history
…apache#5462)

- Avoid using udf for key generator for SimpleKeyGen and NonPartitionedKeyGen.
- Fixed NonPartitioned Key generator to directly fetch record key from row rather than involving GenericRecord.
- Other minor fixes around using static values instead of looking up hashmap.
  • Loading branch information
nsivabalan authored and yihua committed Jun 3, 2022
1 parent 38c1390 commit 78ec3a8
Show file tree
Hide file tree
Showing 20 changed files with 219 additions and 189 deletions.
Expand Up @@ -68,8 +68,8 @@ public class HoodieRowCreateHandle implements Serializable {
private final HoodieTimer currTimer;

public HoodieRowCreateHandle(HoodieTable table, HoodieWriteConfig writeConfig, String partitionPath, String fileId,
String instantTime, int taskPartitionId, long taskId, long taskEpochId,
StructType structType) {
String instantTime, int taskPartitionId, long taskId, long taskEpochId,
StructType structType) {
this.partitionPath = partitionPath;
this.table = table;
this.writeConfig = writeConfig;
Expand Down Expand Up @@ -107,16 +107,15 @@ public HoodieRowCreateHandle(HoodieTable table, HoodieWriteConfig writeConfig, S
/**
* Writes an {@link InternalRow} to the underlying HoodieInternalRowFileWriter. Before writing, value for meta columns are computed as required
* and wrapped in {@link HoodieInternalRow}. {@link HoodieInternalRow} is what gets written to HoodieInternalRowFileWriter.
*
* @param record instance of {@link InternalRow} that needs to be written to the fileWriter.
* @throws IOException
*/
public void write(InternalRow record) throws IOException {
try {
String partitionPath = record.getUTF8String(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(
HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString();
String seqId = HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement());
String recordKey = record.getUTF8String(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(
HoodieRecord.RECORD_KEY_METADATA_FIELD)).toString();
final String partitionPath = String.valueOf(record.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_POS));
final String seqId = HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement());
final String recordKey = String.valueOf(record.getUTF8String(HoodieRecord.RECORD_KEY_META_FIELD_POS));
HoodieInternalRow internalRow = new HoodieInternalRow(instantTime, seqId, recordKey, partitionPath, path.getName(),
record);
try {
Expand All @@ -141,6 +140,7 @@ public boolean canWrite() {
/**
* Closes the {@link HoodieRowCreateHandle} and returns an instance of {@link HoodieInternalWriteStatus} containing the stats and
* status of the writes to this handle.
*
* @return the {@link HoodieInternalWriteStatus} containing the stats and status of the writes to this handle.
* @throws IOException
*/
Expand Down
Expand Up @@ -18,25 +18,25 @@

package org.apache.hudi.keygen;

import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieKeyException;

import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
import scala.Function1;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import scala.Function1;

/**
* Base class for the built-in key generators. Contains methods structured for
Expand All @@ -46,20 +46,20 @@ public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements Sp

private static final String STRUCT_NAME = "hoodieRowTopLevelField";
private static final String NAMESPACE = "hoodieRow";
private transient Function1<Row, GenericRecord> converterFn = null;
private SparkRowSerDe sparkRowSerDe;
private Function1<Row, GenericRecord> converterFn = null;
private final AtomicBoolean validatePartitionFields = new AtomicBoolean(false);
protected StructType structType;

protected Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
protected Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
protected Map<String, List<DataType>> partitionPathDataTypes = null;
protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo = new HashMap<>();
protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo = new HashMap<>();

protected BuiltinKeyGenerator(TypedProperties config) {
super(config);
}

/**
* Fetch record key from {@link Row}.
*
* @param row instance of {@link Row} from which record key is requested.
* @return the record key of interest from {@link Row}.
*/
Expand All @@ -74,6 +74,7 @@ public String getRecordKey(Row row) {

/**
* Fetch partition path from {@link Row}.
*
* @param row instance of {@link Row} from which partition path is requested
* @return the partition path of interest from {@link Row}.
*/
Expand All @@ -97,87 +98,41 @@ public String getPartitionPath(Row row) {
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getPartitionPath(InternalRow internalRow, StructType structType) {
try {
initDeserializer(structType);
Row row = sparkRowSerDe.deserializeRow(internalRow);
return getPartitionPath(row);
buildFieldSchemaInfoIfNeeded(structType);
return RowKeyGeneratorHelper.getPartitionPathFromInternalRow(internalRow, getPartitionPathFields(),
hiveStylePartitioning, partitionPathSchemaInfo);
} catch (Exception e) {
throw new HoodieIOException("Conversion of InternalRow to Row failed with exception " + e);
}
}

private void initDeserializer(StructType structType) {
if (sparkRowSerDe == null) {
sparkRowSerDe = HoodieSparkUtils.getDeserializer(structType);
}
}

void buildFieldPositionMapIfNeeded(StructType structType) {
void buildFieldSchemaInfoIfNeeded(StructType structType) {
if (this.structType == null) {
// parse simple fields
getRecordKeyFields().stream()
.filter(f -> !(f.contains(".")))
.forEach(f -> {
if (structType.getFieldIndex(f).isDefined()) {
recordKeyPositions.put(f, Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
} else {
throw new HoodieKeyException("recordKey value not found for field: \"" + f + "\"");
}
});
// parse nested fields
getRecordKeyFields().stream()
.filter(f -> f.contains("."))
.forEach(f -> recordKeyPositions.put(f, RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, true)));
// parse simple fields
getRecordKeyFields()
.stream().filter(f -> !f.isEmpty())
.forEach(f -> recordKeySchemaInfo.put(f, RowKeyGeneratorHelper.getFieldSchemaInfo(structType, f, true)));
if (getPartitionPathFields() != null) {
getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
.forEach(f -> {
if (structType.getFieldIndex(f).isDefined()) {
partitionPathPositions.put(f,
Collections.singletonList((Integer) (structType.getFieldIndex(f).get())));
} else {
partitionPathPositions.put(f, Collections.singletonList(-1));
}
});
// parse nested fields
getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> f.contains("."))
.forEach(f -> partitionPathPositions.put(f,
RowKeyGeneratorHelper.getNestedFieldIndices(structType, f, false)));
getPartitionPathFields().stream().filter(f -> !f.isEmpty())
.forEach(f -> partitionPathSchemaInfo.put(f, RowKeyGeneratorHelper.getFieldSchemaInfo(structType, f, false)));
}
this.structType = structType;
}
}

protected String getPartitionPathInternal(InternalRow row, StructType structType) {
buildFieldDataTypesMapIfNeeded(structType);
buildFieldSchemaInfoIfNeeded(structType);
validatePartitionFieldsForInternalRow();
return RowKeyGeneratorHelper.getPartitionPathFromInternalRow(row, getPartitionPathFields(),
hiveStylePartitioning, partitionPathPositions, partitionPathDataTypes);
hiveStylePartitioning, partitionPathSchemaInfo);
}

protected void validatePartitionFieldsForInternalRow() {
partitionPathPositions.entrySet().forEach(entry -> {
if (entry.getValue().size() > 1) {
throw new IllegalArgumentException("Nested column for partitioning is not supported with disabling meta columns");
}
});
}

void buildFieldDataTypesMapIfNeeded(StructType structType) {
buildFieldPositionMapIfNeeded(structType);
if (this.partitionPathDataTypes == null) {
this.partitionPathDataTypes = new HashMap<>();
if (getPartitionPathFields() != null) {
// populating simple fields are good enough
getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
.forEach(f -> {
if (structType.getFieldIndex(f).isDefined()) {
partitionPathDataTypes.put(f,
Collections.singletonList((structType.fields()[structType.fieldIndex(f)].dataType())));
} else {
partitionPathDataTypes.put(f, Collections.singletonList(null));
}
});
}
if (!validatePartitionFields.getAndSet(true)) {
partitionPathSchemaInfo.values().forEach(entry -> {
if (entry.getKey().size() > 1) {
throw new IllegalArgumentException("Nested column for partitioning is not supported with disabling meta columns");
}
});
}
}
}
Expand Down
Expand Up @@ -60,15 +60,15 @@ public String getPartitionPath(GenericRecord record) {

@Override
public String getRecordKey(Row row) {
buildFieldPositionMapIfNeeded(row.schema());
return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeyPositions, true);
buildFieldSchemaInfoIfNeeded(row.schema());
return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, true);
}

@Override
public String getPartitionPath(Row row) {
buildFieldPositionMapIfNeeded(row.schema());
buildFieldSchemaInfoIfNeeded(row.schema());
return RowKeyGeneratorHelper.getPartitionPathFromRow(row, getPartitionPathFields(),
hiveStylePartitioning, partitionPathPositions);
hiveStylePartitioning, partitionPathSchemaInfo);
}

@Override
Expand Down
Expand Up @@ -60,8 +60,8 @@ public List<String> getPartitionPathFields() {

@Override
public String getRecordKey(Row row) {
buildFieldPositionMapIfNeeded(row.schema());
return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeyPositions, true);
buildFieldSchemaInfoIfNeeded(row.schema());
return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, true);
}

@Override
Expand Down
Expand Up @@ -61,6 +61,12 @@ public List<String> getPartitionPathFields() {
return nonpartitionedAvroKeyGenerator.getPartitionPathFields();
}

@Override
public String getRecordKey(Row row) {
buildFieldSchemaInfoIfNeeded(row.schema());
return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, false);
}

@Override
public String getPartitionPath(Row row) {
return nonpartitionedAvroKeyGenerator.getEmptyPartition();
Expand Down

0 comments on commit 78ec3a8

Please sign in to comment.