Skip to content

Commit

Permalink
[HUDI-3043] De-coupling multi writer tests (apache#4362)
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Dec 29, 2021
1 parent 1c09112 commit 73d69ef
Showing 1 changed file with 32 additions and 18 deletions.
Expand Up @@ -81,16 +81,16 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona

@ParameterizedTest
@EnumSource(HoodieTableType.class)
void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) throws Exception {
void testUpsertsContinuousModeWithMultipleWritersForConflicts(HoodieTableType tableType) throws Exception {
// NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
setUpTestTable(tableType);
prepareInitialConfigs(fs(), basePath, "foo");
// enable carrying forward latest checkpoint
TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3");
props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000");
props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "10");
props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "200");
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);

HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
Expand Down Expand Up @@ -118,37 +118,51 @@ void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) thr
// run ingestion & backfill in parallel, create conflict and fail one
runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob2,
cfgIngestionJob, backfillJob, cfgBackfillJob, true);
}

@ParameterizedTest
@EnumSource(HoodieTableType.class)
void testUpsertsContinuousModeWithMultipleWritersWithoutConflicts(HoodieTableType tableType) throws Exception {
// NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
setUpTestTable(tableType);
prepareInitialConfigs(fs(), basePath, "foo");
TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "10");
props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "200");
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);

// create new ingestion & backfill job config to generate only INSERTS to avoid conflict
props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
props.setProperty("hoodie.test.source.generate.inserts", "true");
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER);
cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.INSERT,
HoodieDeltaStreamer.Config cfgBackfillJob2 = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.INSERT,
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName()));
cfgBackfillJob.continuousMode = false;
meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build();
timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
commitMetadata = HoodieCommitMetadata
cfgBackfillJob2.continuousMode = false;
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
cfgBackfillJob2.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob2.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));

cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
HoodieDeltaStreamer.Config cfgIngestionJob2 = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName()));
cfgIngestionJob.continuousMode = true;
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
cfgIngestionJob2.continuousMode = true;
cfgIngestionJob2.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
// re-init ingestion job
HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob, jsc());
HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob2, jsc());
// re-init backfill job
HoodieDeltaStreamer backfillJob2 = new HoodieDeltaStreamer(cfgBackfillJob, jsc());
HoodieDeltaStreamer backfillJob2 = new HoodieDeltaStreamer(cfgBackfillJob2, jsc());

// run ingestion & backfill in parallel, avoid conflict and succeed both
runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob3,
cfgIngestionJob, backfillJob2, cfgBackfillJob, false);
cfgIngestionJob2, backfillJob2, cfgBackfillJob2, false);
}

@ParameterizedTest
Expand Down

0 comments on commit 73d69ef

Please sign in to comment.