Skip to content

Commit

Permalink
[HUDI-4631] Adding retries to spark datasource writes on conflict fai…
Browse files Browse the repository at this point in the history
…lures (apache#6854)

Added a retry functionality to spark datasource writes automatically incase of conflict failures. 
User experience w/ multi-writers will be improved with these automatic retries.

---------

Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com>
  • Loading branch information
nsivabalan and codope committed Aug 28, 2023
1 parent 6e84cfe commit e76dd10
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,16 +217,24 @@ public class HoodieLockConfig extends HoodieConfig {
.withDocumentation("Lock provider class name, this should be subclass of "
+ "org.apache.hudi.client.transaction.ConflictResolutionStrategy");

/** @deprecated Use {@link #WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME} and its methods instead */
/**
* @deprecated Use {@link #WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME} and its methods instead
*/
@Deprecated
public static final String WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_PROP = WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME.key();
/** @deprecated Use {@link #WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME} and its methods instead */
/**
* @deprecated Use {@link #WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME} and its methods instead
*/
@Deprecated
public static final String DEFAULT_WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS = WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME.defaultValue();
/** @deprecated Use {@link #LOCK_PROVIDER_CLASS_NAME} and its methods instead */
/**
* @deprecated Use {@link #LOCK_PROVIDER_CLASS_NAME} and its methods instead
*/
@Deprecated
public static final String LOCK_PROVIDER_CLASS_PROP = LOCK_PROVIDER_CLASS_NAME.key();
/** @deprecated Use {@link #LOCK_PROVIDER_CLASS_NAME} and its methods instead */
/**
* @deprecated Use {@link #LOCK_PROVIDER_CLASS_NAME} and its methods instead
*/
@Deprecated
public static final String DEFAULT_LOCK_PROVIDER_CLASS = LOCK_PROVIDER_CLASS_NAME.defaultValue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,12 @@ public class HoodieWriteConfig extends HoodieConfig {
.defaultValue(WriteConcurrencyMode.SINGLE_WRITER.name())
.withDocumentation(WriteConcurrencyMode.class);

public static final ConfigProperty<Integer> NUM_RETRIES_ON_CONFLICT_FAILURES = ConfigProperty
.key("hoodie.write.num.retries.on.conflict.failures")
.defaultValue(0)
.sinceVersion("0.13.0")
.withDocumentation("Maximum number of times to retry a batch on conflict failure.");

public static final ConfigProperty<String> WRITE_SCHEMA_OVERRIDE = ConfigProperty
.key("hoodie.write.schema")
.noDefaultValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.AutoRecordKeyGenerationUtils.{isAutoGenerateRecordKeys, mayBeValidateParamsForAutoGenerationOfRecordKeys}
import org.apache.hudi.AutoRecordKeyGenerationUtils.mayBeValidateParamsForAutoGenerationOfRecordKeys
import org.apache.hudi.AvroConversionUtils.{convertAvroSchemaToStructType, convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace}
import org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTableConfig
import org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty
Expand All @@ -48,17 +48,15 @@ import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME}
import org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.exception.{HoodieException, SchemaCompatibilityException}
import org.apache.hudi.exception.{HoodieException, HoodieWriteConflictException, SchemaCompatibilityException}
import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
import org.apache.hudi.index.HoodieIndex
import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileNullability
import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, SerDeHelper}
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName
import org.apache.hudi.keygen.{BaseKeyGenerator, KeyGenUtils, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.keygen.{BaseKeyGenerator, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.metrics.Metrics
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.SyncUtilHelpers
Expand Down Expand Up @@ -122,6 +120,38 @@ object HoodieSparkSqlWriter {
sourceDf: DataFrame,
streamingWritesParamsOpt: Option[StreamingWriteParams] = Option.empty,
hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty):

(Boolean, HOption[String], HOption[String], HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = {
var succeeded = false
var counter = 0
val maxRetry: Integer = Integer.parseInt(optParams.getOrElse(HoodieWriteConfig.NUM_RETRIES_ON_CONFLICT_FAILURES.key(), HoodieWriteConfig.NUM_RETRIES_ON_CONFLICT_FAILURES.defaultValue().toString))
var toReturn: (Boolean, HOption[String], HOption[String], HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = null

while (counter <= maxRetry && !succeeded) {
try {
toReturn = writeInternal(sqlContext, mode, optParams, sourceDf, streamingWritesParamsOpt, hoodieWriteClient)
log.warn(s"Succeeded with attempt no $counter")
succeeded = true
} catch {
case e: HoodieWriteConflictException =>
val writeConcurrencyMode = optParams.getOrElse(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue())
if (writeConcurrencyMode.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()) && counter < maxRetry) {
counter += 1
log.warn(s"Conflict found. Retrying again for attempt no $counter")
} else {
throw e
}
}
}
toReturn
}

def writeInternal(sqlContext: SQLContext,
mode: SaveMode,
optParams: Map[String, String],
sourceDf: DataFrame,
streamingWritesParamsOpt: Option[StreamingWriteParams] = Option.empty,
hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty):
(Boolean, HOption[String], HOption[String], HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = {

assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import org.apache.hudi.DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.QuickstartUtils.{convertToStringList, getQuickstartWriteConfigs}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT, TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD}
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
import org.apache.hudi.common.table.timeline.{HoodieInstant, TimelineUtils}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
Expand Down Expand Up @@ -59,6 +59,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}

import java.sql.{Date, Timestamp}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.function.Consumer
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -555,11 +556,70 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
assertEquals(snapshotDF2.count(), 80)
}

/**
* Test retries on conflict failures.
*/
@ParameterizedTest
@ValueSource(ints = Array(0, 2))
def testCopyOnWriteConcurrentUpdates(numRetries: Integer): Unit = {
initTestDataGenerator()
val records1 = recordsToStrings(dataGen.generateInserts("000", 1000)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
.option("hoodie.cleaner.policy.failed.writes", "LAZY")
.option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider")
.mode(SaveMode.Overwrite)
.save(basePath)

val snapshotDF1 = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*/*/*")
assertEquals(1000, snapshotDF1.count())

val countDownLatch = new CountDownLatch(2)
for (x <- 1 to 2) {
val thread = new Thread(new UpdateThread(dataGen, spark, commonOpts, basePath, x + "00", countDownLatch, numRetries))
thread.setName((x + "00_THREAD").toString())
thread.start()
}
countDownLatch.await(1, TimeUnit.MINUTES)

val snapshotDF2 = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*/*/*")
if (numRetries > 0) {
assertEquals(snapshotDF2.count(), 3000)
assertEquals(HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").size(), 3)
} else {
// only one among two threads will succeed and hence 2000
assertEquals(snapshotDF2.count(), 2000)
assertEquals(HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").size(), 2)
}
}

class UpdateThread(dataGen: HoodieTestDataGenerator, spark: SparkSession, commonOpts: Map[String, String], basePath: String,
instantTime: String, countDownLatch: CountDownLatch, numRetries: Integer = 0) extends Runnable {
override def run() {
val updateRecs = recordsToStrings(dataGen.generateUniqueUpdates(instantTime, 500)).toList
val insertRecs = recordsToStrings(dataGen.generateInserts(instantTime, 1000)).toList
val updateDf = spark.read.json(spark.sparkContext.parallelize(updateRecs, 2))
val insertDf = spark.read.json(spark.sparkContext.parallelize(insertRecs, 2))
updateDf.union(insertDf).write.format("org.apache.hudi")
.options(commonOpts)
.option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
.option("hoodie.cleaner.policy.failed.writes", "LAZY")
.option("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider")
.option(HoodieWriteConfig.NUM_RETRIES_ON_CONFLICT_FAILURES.key(), numRetries.toString)
.mode(SaveMode.Append)
.save(basePath)
countDownLatch.countDown()
}
}

@ParameterizedTest
@EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK"))
def testOverWriteModeUseReplaceAction(recordType: HoodieRecordType): Unit = {
val (writeOpts, readOpts) = getWriterReaderOpts(recordType)

val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
Expand Down

0 comments on commit e76dd10

Please sign in to comment.