Skip to content

Commit

Permalink
[HUDI-6093] Use the correct partitionToReplacedFileIds during commit. (
Browse files Browse the repository at this point in the history
…apache#8487)

Specify the correct partitionToReplacedFileIds 
while creating a commit in DeltaStreamer.

Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com>
  • Loading branch information
prashantwason and codope committed Jun 20, 2023
1 parent 822b5a1 commit 6ee6a52
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
Expand Down Expand Up @@ -792,6 +793,8 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(String instantTim
instantTime = startCommit(instantTime, !autoGenerateRecordKeys);
LOG.info("Starting commit : " + instantTime);

HoodieWriteResult writeResult;
Map<String, List<String>> partitionToReplacedFileIds = Collections.emptyMap();
JavaRDD<WriteStatus> writeStatusRDD;
switch (cfg.operation) {
case INSERT:
Expand All @@ -804,14 +807,20 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(String instantTim
writeStatusRDD = writeClient.bulkInsert(records, instantTime);
break;
case INSERT_OVERWRITE:
writeStatusRDD = writeClient.insertOverwrite(records, instantTime).getWriteStatuses();
writeResult = writeClient.insertOverwrite(records, instantTime);
partitionToReplacedFileIds = writeResult.getPartitionToReplaceFileIds();
writeStatusRDD = writeResult.getWriteStatuses();
break;
case INSERT_OVERWRITE_TABLE:
writeStatusRDD = writeClient.insertOverwriteTable(records, instantTime).getWriteStatuses();
writeResult = writeClient.insertOverwriteTable(records, instantTime);
partitionToReplacedFileIds = writeResult.getPartitionToReplaceFileIds();
writeStatusRDD = writeResult.getWriteStatuses();
break;
case DELETE_PARTITION:
List<String> partitions = records.map(record -> record.getPartitionPath()).distinct().collect();
writeStatusRDD = writeClient.deletePartitions(partitions, instantTime).getWriteStatuses();
writeResult = writeClient.deletePartitions(partitions, instantTime);
partitionToReplacedFileIds = writeResult.getPartitionToReplaceFileIds();
writeStatusRDD = writeResult.getWriteStatuses();
break;
default:
throw new HoodieDeltaStreamerException("Unknown operation : " + cfg.operation);
Expand Down Expand Up @@ -859,7 +868,8 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(String instantTim
}
}
}
boolean success = writeClient.commit(instantTime, writeStatusRDD, Option.of(checkpointCommitMetadata), commitActionType, Collections.emptyMap(), extraPreCommitFunc);
boolean success = writeClient.commit(instantTime, writeStatusRDD, Option.of(checkpointCommitMetadata), commitActionType, partitionToReplacedFileIds,
extraPreCommitFunc);
if (success) {
LOG.info("Commit " + instantTime + " successful!");
latestCheckpointWritten = checkpointStr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
Expand All @@ -47,6 +48,7 @@
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -136,6 +138,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -2473,6 +2476,10 @@ public void testDeletePartitions() throws Exception {
prepareParquetDFSSource(false, false, "source.avsc", "target.avsc",
PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, "partition_path");
String tableBasePath = basePath + "/test_parquet_table" + testNum;

// There should be fileIDs in the partition being deleted
assertFalse(getAllFileIDsInTable(tableBasePath, Option.of(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).isEmpty());

HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
null, PROPS_FILENAME_TEST_PARQUET, false,
Expand All @@ -2491,6 +2498,9 @@ public void testDeletePartitions() throws Exception {
deltaStreamer.sync();
// No records should match the HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION.
TestHelpers.assertNoPartitionMatch(tableBasePath, sqlContext, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);

// There should not be any fileIDs in the deleted partition
assertTrue(getAllFileIDsInTable(tableBasePath, Option.of(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).isEmpty());
}

@Test
Expand Down Expand Up @@ -2519,19 +2529,34 @@ void testDeltaStreamerWithSpecifiedOperation(final String tableBasePath, WriteOp
TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);

// Collect the fileIds before running HoodieDeltaStreamer
Set<String> beforeFileIDs = getAllFileIDsInTable(tableBasePath, Option.empty());

// setting the operationType
cfg.operation = operationType;
// No new data => no commits.
cfg.sourceLimit = 0;
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);

if (operationType == WriteOperationType.INSERT_OVERWRITE) {
TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
} else if (operationType == WriteOperationType.INSERT_OVERWRITE_TABLE) {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).build();
final HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
assertEquals(0, fsView.getLatestFileSlices("").count());
TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);

// Since the table has been overwritten all fileIDs before should have been replaced
Set<String> afterFileIDs = getAllFileIDsInTable(tableBasePath, Option.empty());
assertTrue(afterFileIDs.isEmpty());
}

cfg.sourceLimit = 1000;
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1950, tableBasePath, sqlContext);
TestHelpers.assertDistanceCount(1950, tableBasePath, sqlContext);
TestHelpers.assertRecordCount(950, tableBasePath, sqlContext);
TestHelpers.assertDistanceCount(950, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
Expand Down Expand Up @@ -2716,11 +2741,18 @@ public void testConfigurationHotUpdate(HoodieTableType tableType, HoodieRecordTy
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}

private Set<String> getAllFileIDsInTable(String tableBasePath, Option<String> partition) {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).build();
final HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
Stream<HoodieBaseFile> baseFileStream = partition.isPresent() ? fsView.getLatestBaseFiles(partition.get()) : fsView.getLatestBaseFiles();
return baseFileStream.map(HoodieBaseFile::getFileId).collect(Collectors.toSet());
}

class TestDeltaSync extends DeltaSync {

public TestDeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props,
JavaSparkContext jssc, FileSystem fs, Configuration conf,
Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
JavaSparkContext jssc, FileSystem fs, Configuration conf,
Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
super(cfg, sparkSession, schemaProvider, props, jssc, fs, conf, onInitializingHoodieWriteClient);
}

Expand Down Expand Up @@ -2849,5 +2881,4 @@ private static Stream<Arguments> testORCDFSSource() {
arguments(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()))
);
}

}

0 comments on commit 6ee6a52

Please sign in to comment.