Skip to content

Commit

Permalink
[HUDI-6315] Optimize UPSERT codepath to use meta fields instead of ke…
Browse files Browse the repository at this point in the history
…y generation and index lookup (apache#8956)

- Optimize UPDATE in spark-sql codepath to use meta fields instead of key generation and index lookup and directly use prepped apis in write client.
- This also adds support for UPDATE for pk less table
  • Loading branch information
amrishlal committed Jun 21, 2023
1 parent 35897ba commit d560725
Show file tree
Hide file tree
Showing 12 changed files with 386 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public static boolean isOverwrite(WriteOperationType operationType) {
public static boolean isDataChange(WriteOperationType operation) {
return operation == WriteOperationType.INSERT
|| operation == WriteOperationType.UPSERT
|| operation == WriteOperationType.UPSERT_PREPPED
|| operation == WriteOperationType.DELETE
|| operation == WriteOperationType.BULK_INSERT
|| operation == WriteOperationType.DELETE_PARTITION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -203,7 +204,7 @@ public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jssc, Stri
}

public static HoodieWriteResult doWriteOperation(SparkRDDWriteClient client, JavaRDD<HoodieRecord> hoodieRecords,
String instantTime, WriteOperationType operation) throws HoodieException {
String instantTime, WriteOperationType operation, Boolean isPrepped) throws HoodieException {
switch (operation) {
case BULK_INSERT:
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner =
Expand All @@ -212,6 +213,10 @@ public static HoodieWriteResult doWriteOperation(SparkRDDWriteClient client, Jav
case INSERT:
return new HoodieWriteResult(client.insert(hoodieRecords, instantTime));
case UPSERT:
if (isPrepped) {
return new HoodieWriteResult(client.upsertPreppedRecords(hoodieRecords, instantTime));
}

return new HoodieWriteResult(client.upsert(hoodieRecords, instantTime));
case INSERT_OVERWRITE:
return client.insertOverwrite(hoodieRecords, instantTime);
Expand All @@ -233,15 +238,23 @@ public static HoodieWriteResult doDeletePartitionsOperation(SparkRDDWriteClient
}

public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey,
String payloadClass) throws IOException {
String payloadClass, scala.Option<HoodieRecordLocation> recordLocation) throws IOException {
HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal);
return new HoodieAvroRecord<>(hKey, payload);
HoodieAvroRecord record = new HoodieAvroRecord<>(hKey, payload);
if (recordLocation.isDefined()) {
record.setCurrentLocation(recordLocation.get());
}
return record;
}

public static HoodieRecord createHoodieRecord(GenericRecord gr, HoodieKey hKey,
String payloadClass) throws IOException {
String payloadClass, scala.Option<HoodieRecordLocation> recordLocation) throws IOException {
HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr);
return new HoodieAvroRecord<>(hKey, payload);
HoodieAvroRecord record = new HoodieAvroRecord<>(hKey, payload);
if (recordLocation.isDefined()) {
record.setCurrentLocation(recordLocation.get());
}
return record;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,11 @@ object DataSourceWriteOptions {
.withAlternatives("hoodie.datasource.write.storage.type")
.withDocumentation("The table type for the underlying data, for this write. This can’t change between writes.")

/**
* Config key with boolean value that indicates whether record being written is already prepped.
*/
val DATASOURCE_WRITE_PREPPED_KEY = "_hoodie.datasource.write.prepped";

/**
* May be derive partition path from incoming df if not explicitly set.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.hudi

import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, DATASOURCE_WRITE_PREPPED_KEY, OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
import org.apache.hudi.cdc.CDCRelation
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
Expand Down Expand Up @@ -137,27 +137,32 @@ class DefaultSource extends RelationProvider
* @param sqlContext Spark SQL Context
* @param mode Mode for saving the DataFrame at the destination
* @param optParams Parameters passed as part of the DataFrame write operation
* @param df Spark DataFrame to be written
* @param rawDf Spark DataFrame to be written
* @return Spark Relation
*/
override def createRelation(sqlContext: SQLContext,
mode: SaveMode,
optParams: Map[String, String],
df: DataFrame): BaseRelation = {
val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*)
rawDf: DataFrame): BaseRelation = {
val df = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY, "false")
.equalsIgnoreCase("true")) {
rawDf // Don't remove meta columns for prepped write.
} else {
rawDf.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: _*)
}

if (optParams.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)) {
HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols)
HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, df)
HoodieSparkSqlWriter.cleanup()
} else {
val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols)
val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sqlContext, mode, optParams, df)
HoodieSparkSqlWriter.cleanup()
if (!success) {
throw new HoodieException("Write to Hudi failed")
}
}

new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)
new HoodieEmptyRelation(sqlContext, df.schema)
}

override def createSink(sqlContext: SQLContext,
Expand Down
Loading

0 comments on commit d560725

Please sign in to comment.