Skip to content

Commit

Permalink
[HUDI-4412] Fix multi writer INSERT_OVERWRITE NPE bug (apache#6130)
Browse files Browse the repository at this point in the history
There are two minor issues fixed here:

1. When the insert_overwrite operation is performed, the 
    clusteringPlan in the requestedReplaceMetadata will be 
    null. Calling getFileIdsFromRequestedReplaceMetadata will cause NPE.

2. When insert_overwrite operation, inflightCommitMetadata!=null, 
    getOperationType should be obtained from getHoodieInflightReplaceMetadata,
    the original code will have a null pointer.
  • Loading branch information
liujinhui1994 authored and prasannarajaperumal committed Sep 29, 2022
1 parent 9e82940 commit 75eb803
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 2 deletions.
Expand Up @@ -124,14 +124,15 @@ private void init(HoodieInstant instant) {
HoodieRequestedReplaceMetadata requestedReplaceMetadata = this.metadataWrapper.getMetadataFromTimeline().getHoodieRequestedReplaceMetadata();
org.apache.hudi.avro.model.HoodieCommitMetadata inflightCommitMetadata = this.metadataWrapper.getMetadataFromTimeline().getHoodieInflightReplaceMetadata();
if (instant.isRequested()) {
if (requestedReplaceMetadata != null) {
// for insert_overwrite/insert_overwrite_table clusteringPlan will be empty
if (requestedReplaceMetadata != null && requestedReplaceMetadata.getClusteringPlan() != null) {
this.mutatedFileIds = getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
this.operationType = WriteOperationType.CLUSTER;
}
} else {
if (inflightCommitMetadata != null) {
this.mutatedFileIds = getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(inflightCommitMetadata.getPartitionToWriteStats()).keySet();
this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata().getOperationType());
this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieInflightReplaceMetadata().getOperationType());
} else if (requestedReplaceMetadata != null) {
// inflight replacecommit metadata is empty due to clustering, read fileIds from requested replacecommit
this.mutatedFileIds = getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
Expand Down
Expand Up @@ -123,6 +123,41 @@ public void testConcurrentWritesWithInterleavingSuccesssfulCommit() throws Excep
}
}

@Test
public void testConcurrentWritesWithReplaceInflightCommit() throws Exception {
createReplaceInflight(HoodieActiveTimeline.createNewInstantTime());
HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
Option<HoodieInstant> lastSuccessfulInstant = Option.empty();

// writer 1 starts
String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime();
createInflightCommit(currentWriterInstant);
Option<HoodieInstant> currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant));

// writer 2 starts and finishes
String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
createReplaceInflight(newInstantTime);

SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new SimpleConcurrentFileWritesConflictResolutionStrategy();
HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant);
timeline = timeline.reload();

List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(timeline, currentInstant.get(), lastSuccessfulInstant).collect(
Collectors.toList());

// writer 1 conflicts with writer 2
Assertions.assertTrue(candidateInstants.size() == 1);
ConcurrentOperation thatCommitOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient);
ConcurrentOperation thisCommitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata);
Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, thatCommitOperation));
try {
strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
Assertions.fail("Cannot reach here, writer 1 and writer 2 should have thrown a conflict");
} catch (HoodieWriteConflictException e) {
// expected
}
}

@Test
public void testConcurrentWritesWithInterleavingScheduledCompaction() throws Exception {
createCommit(HoodieActiveTimeline.createNewInstantTime());
Expand Down Expand Up @@ -394,6 +429,20 @@ private void createReplaceRequested(String instantTime) throws Exception {
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
}

private void createReplaceInflight(String instantTime) throws Exception {
String fileId1 = "file-1";
String fileId2 = "file-2";

HoodieCommitMetadata inflightReplaceMetadata = new HoodieCommitMetadata();
inflightReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE);
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId("file-1");
inflightReplaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat);
HoodieTestTable.of(metaClient)
.addInflightReplace(instantTime, Option.of(inflightReplaceMetadata))
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
}

private void createReplace(String instantTime, WriteOperationType writeOperationType) throws Exception {
String fileId1 = "file-1";
String fileId2 = "file-2";
Expand Down
Expand Up @@ -284,6 +284,12 @@ public HoodieTestTable addRequestedReplace(String instantTime, Option<HoodieRequ
return this;
}

public HoodieTestTable addInflightReplace(String instantTime, Option<HoodieCommitMetadata> inflightReplaceMetadata) throws Exception {
createInflightReplaceCommit(basePath, instantTime, inflightReplaceMetadata);
currentInstantTime = instantTime;
return this;
}

public HoodieTestTable addInflightClean(String instantTime, HoodieCleanerPlan cleanerPlan) throws IOException {
createRequestedCleanFile(basePath, instantTime, cleanerPlan);
createInflightCleanFile(basePath, instantTime, cleanerPlan);
Expand Down

0 comments on commit 75eb803

Please sign in to comment.