From cbf9b83ca6d3dada14eea551a5bae25144ca0459 Mon Sep 17 00:00:00 2001 From: Nicolas Paris Date: Wed, 21 Sep 2022 23:41:03 +0200 Subject: [PATCH] [HUDI-4792] Batch clean files to delete (#6580) This patch makes use of batch call to get fileGroup to delete during cleaning instead of 1 call per partition. This limit the number of call to the view and should fix the trouble with metadata table in context of lot of partitions. Fixes issue #6373 Co-authored-by: sivabalan --- .../action/clean/CleanPlanActionExecutor.java | 11 +- .../hudi/table/action/clean/CleanPlanner.java | 237 +++++++++--------- ...arkCopyOnWriteTableArchiveWithReplace.java | 4 +- .../view/AbstractTableFileSystemView.java | 16 +- .../view/PriorityBasedFileSystemView.java | 5 + .../view/RemoteHoodieTableFileSystemView.java | 12 + .../table/view/TableFileSystemView.java | 14 +- 7 files changed, 176 insertions(+), 123 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java index 7f3b437178fd..bd7ec798ed1a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java @@ -42,6 +42,7 @@ import org.apache.log4j.Logger; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -116,9 +117,15 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) { context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName()); Map>> cleanOpsWithPartitionMeta = context - .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism) + .parallelize(partitionsToClean, cleanerParallelism) + .mapPartitions(partitionIterator -> { + List partitionList = new ArrayList<>(); + partitionIterator.forEachRemaining(partitionList::add); + Map>> cleanResult = planner.getDeletePaths(partitionList); + return cleanResult.entrySet().iterator(); + }, false).collectAsList() .stream() - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); Map> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 671a522cab42..5ed53f7ae047 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -60,6 +60,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -225,10 +226,10 @@ private List getPartitionPathsForFullCleaning() { * policy is useful, if you are simply interested in querying the table, and you don't want too many versions for a * single file (i.e run it with versionsRetained = 1) */ - private Pair> getFilesToCleanKeepingLatestVersions(String partitionPath) { - LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained() + private Map>> getFilesToCleanKeepingLatestVersions(List partitionPaths) { + LOG.info("Cleaning " + partitionPaths + ", retaining latest " + config.getCleanerFileVersionsRetained() + " file versions. "); - List deletePaths = new ArrayList<>(); + Map>> map = new HashMap<>(); // Collect all the datafiles savepointed by all the savepoints List savepointedFiles = hoodieTable.getSavepointTimestamps().stream() .flatMap(this::getSavepointedDataFiles) @@ -236,43 +237,48 @@ private Pair> getFilesToCleanKeepingLatestVersions( // In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely // In other words, the file versions only apply to the active file groups. - deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty())); - boolean toDeletePartition = false; - List fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); - for (HoodieFileGroup fileGroup : fileGroups) { - int keepVersions = config.getCleanerFileVersionsRetained(); - // do not cleanup slice required for pending compaction - Iterator fileSliceIterator = - fileGroup.getAllFileSlices().filter(fs -> !isFileSliceNeededForPendingCompaction(fs)).iterator(); - if (isFileGroupInPendingCompaction(fileGroup)) { - // We have already saved the last version of file-groups for pending compaction Id - keepVersions--; - } + List>> fileGroupsPerPartition = fileSystemView.getAllFileGroups(partitionPaths).collect(Collectors.toList()); + for (Pair> partitionFileGroupList : fileGroupsPerPartition) { + List deletePaths = new ArrayList<>(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), Option.empty())); + boolean toDeletePartition = false; + for (HoodieFileGroup fileGroup : partitionFileGroupList.getRight()) { + int keepVersions = config.getCleanerFileVersionsRetained(); + // do not cleanup slice required for pending compaction + Iterator fileSliceIterator = + fileGroup.getAllFileSlices() + .filter(fs -> !isFileSliceNeededForPendingCompaction(fs)) + .iterator(); + if (isFileGroupInPendingCompaction(fileGroup)) { + // We have already saved the last version of file-groups for pending compaction Id + keepVersions--; + } - while (fileSliceIterator.hasNext() && keepVersions > 0) { - // Skip this most recent version - fileSliceIterator.next(); - keepVersions--; - } - // Delete the remaining files - while (fileSliceIterator.hasNext()) { - FileSlice nextSlice = fileSliceIterator.next(); - Option dataFile = nextSlice.getBaseFile(); - if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) { - // do not clean up a savepoint data file - continue; + while (fileSliceIterator.hasNext() && keepVersions > 0) { + // Skip this most recent version + fileSliceIterator.next(); + keepVersions--; + } + // Delete the remaining files + while (fileSliceIterator.hasNext()) { + FileSlice nextSlice = fileSliceIterator.next(); + Option dataFile = nextSlice.getBaseFile(); + if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) { + // do not clean up a savepoint data file + continue; + } + deletePaths.addAll(getCleanFileInfoForSlice(nextSlice)); } - deletePaths.addAll(getCleanFileInfoForSlice(nextSlice)); } + // if there are no valid file groups for the partition, mark it to be deleted + if (partitionFileGroupList.getValue().isEmpty()) { + toDeletePartition = true; + } + map.put(partitionFileGroupList.getLeft(), Pair.of(toDeletePartition, deletePaths)); } - // if there are no valid file groups for the partition, mark it to be deleted - if (fileGroups.isEmpty()) { - toDeletePartition = true; - } - return Pair.of(toDeletePartition, deletePaths); + return map; } - private Pair> getFilesToCleanKeepingLatestCommits(String partitionPath) { + private Map>> getFilesToCleanKeepingLatestCommits(List partitionPath) { return getFilesToCleanKeepingLatestCommits(partitionPath, config.getCleanerCommitsRetained(), HoodieCleaningPolicy.KEEP_LATEST_COMMITS); } @@ -293,9 +299,9 @@ private Pair> getFilesToCleanKeepingLatestCommits(S * @return A {@link Pair} whose left is boolean indicating whether partition itself needs to be deleted, * and right is a list of {@link CleanFileInfo} about the files in the partition that needs to be deleted. */ - private Pair> getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, HoodieCleaningPolicy policy) { - LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. "); - List deletePaths = new ArrayList<>(); + private Map>> getFilesToCleanKeepingLatestCommits(List partitionPaths, int commitsRetained, HoodieCleaningPolicy policy) { + LOG.info("Cleaning " + partitionPaths + ", retaining latest " + commitsRetained + " commits. "); + Map>> cleanFileInfoPerPartitionMap = new HashMap<>(); // Collect all the datafiles savepointed by all the savepoints List savepointedFiles = hoodieTable.getSavepointTimestamps().stream() @@ -307,86 +313,90 @@ private Pair> getFilesToCleanKeepingLatestCommits(S if (commitTimeline.countInstants() > commitsRetained) { Option earliestCommitToRetainOption = getEarliestCommitToRetain(); HoodieInstant earliestCommitToRetain = earliestCommitToRetainOption.get(); - // all replaced file groups before earliestCommitToRetain are eligible to clean - deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetainOption)); // add active files - List fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); - for (HoodieFileGroup fileGroup : fileGroups) { - List fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList()); - - if (fileSliceList.isEmpty()) { - continue; - } - - String lastVersion = fileSliceList.get(0).getBaseInstantTime(); - String lastVersionBeforeEarliestCommitToRetain = - getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain); - - // Ensure there are more than 1 version of the file (we only clean old files from updates) - // i.e always spare the last commit. - for (FileSlice aSlice : fileSliceList) { - Option aFile = aSlice.getBaseFile(); - String fileCommitTime = aSlice.getBaseInstantTime(); - if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) { - // do not clean up a savepoint data file + List>> fileGroupsPerPartition = fileSystemView.getAllFileGroups(partitionPaths).collect(Collectors.toList()); + for (Pair> partitionFileGroupList : fileGroupsPerPartition) { + List deletePaths = new ArrayList<>(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), earliestCommitToRetainOption)); + // all replaced file groups before earliestCommitToRetain are eligible to clean + deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), earliestCommitToRetainOption)); + for (HoodieFileGroup fileGroup : partitionFileGroupList.getRight()) { + List fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList()); + + if (fileSliceList.isEmpty()) { continue; } - if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { - // Dont delete the latest commit and also the last commit before the earliest commit we - // are retaining - // The window of commit retain == max query run time. So a query could be running which - // still - // uses this file. - if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) { - // move on to the next file - continue; - } - } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) { - // This block corresponds to KEEP_LATEST_BY_HOURS policy - // Do not delete the latest commit. - if (fileCommitTime.equals(lastVersion)) { - // move on to the next file + String lastVersion = fileSliceList.get(0).getBaseInstantTime(); + String lastVersionBeforeEarliestCommitToRetain = + getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain); + + // Ensure there are more than 1 version of the file (we only clean old files from updates) + // i.e always spare the last commit. + for (FileSlice aSlice : fileSliceList) { + Option aFile = aSlice.getBaseFile(); + String fileCommitTime = aSlice.getBaseInstantTime(); + if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) { + // do not clean up a savepoint data file continue; } - } - // Always keep the last commit - if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline - .compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) { - // this is a commit, that should be cleaned. - aFile.ifPresent(hoodieDataFile -> { - deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false)); - if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) { - deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true)); + if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { + // Dont delete the latest commit and also the last commit before the earliest commit we + // are retaining + // The window of commit retain == max query run time. So a query could be running which + // still + // uses this file. + if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) { + // move on to the next file + continue; + } + } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) { + // This block corresponds to KEEP_LATEST_BY_HOURS policy + // Do not delete the latest commit. + if (fileCommitTime.equals(lastVersion)) { + // move on to the next file + continue; } - }); - if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { - // If merge on read, then clean the log files for the commits as well - Predicate notCDCLogFile = - hoodieLogFile -> !hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX); - deletePaths.addAll( - aSlice.getLogFiles().filter(notCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) - .collect(Collectors.toList())); } - if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) { - // The cdc log files will be written out in cdc scenario, no matter the table type is mor or cow. - // Here we need to clean uo these cdc log files. - Predicate isCDCLogFile = - hoodieLogFile -> hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX); - deletePaths.addAll( - aSlice.getLogFiles().filter(isCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) - .collect(Collectors.toList())); + + // Always keep the last commit + if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline + .compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) { + // this is a commit, that should be cleaned. + aFile.ifPresent(hoodieDataFile -> { + deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false)); + if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) { + deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true)); + } + }); + if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { + // If merge on read, then clean the log files for the commits as well + Predicate notCDCLogFile = + hoodieLogFile -> !hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX); + deletePaths.addAll( + aSlice.getLogFiles().filter(notCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) + .collect(Collectors.toList())); + } + if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) { + // The cdc log files will be written out in cdc scenario, no matter the table type is mor or cow. + // Here we need to clean uo these cdc log files. + Predicate isCDCLogFile = + hoodieLogFile -> hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX); + deletePaths.addAll( + aSlice.getLogFiles().filter(isCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) + .collect(Collectors.toList())); + } } } } - } - // if there are no valid file groups for the partition, mark it to be deleted - if (fileGroups.isEmpty()) { - toDeletePartition = true; + // if there are no valid file groups for the partition, mark it to be deleted + if (partitionFileGroupList.getValue().isEmpty()) { + toDeletePartition = true; + } + cleanFileInfoPerPartitionMap.put(partitionFileGroupList.getLeft(), Pair.of(toDeletePartition, deletePaths)); } } - return Pair.of(toDeletePartition, deletePaths); + return cleanFileInfoPerPartitionMap; } /** @@ -394,10 +404,11 @@ private Pair> getFilesToCleanKeepingLatestCommits(S * all the files with commit time earlier than 5 hours will be removed. Also the latest file for any file group is retained. * This policy gives much more flexibility to users for retaining data for running incremental queries as compared to * KEEP_LATEST_COMMITS cleaning policy. The default number of hours is 5. + * * @param partitionPath partition path to check * @return list of files to clean */ - private Pair> getFilesToCleanKeepingLatestHours(String partitionPath) { + private Map>> getFilesToCleanKeepingLatestHours(List partitionPath) { return getFilesToCleanKeepingLatestCommits(partitionPath, 0, HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS); } @@ -463,21 +474,23 @@ private List getCleanFileInfoForSlice(FileSlice nextSlice) { /** * Returns files to be cleaned for the given partitionPath based on cleaning policy. */ - public Pair> getDeletePaths(String partitionPath) { + public Map>> getDeletePaths(List partitionPaths) { HoodieCleaningPolicy policy = config.getCleanerPolicy(); - Pair> deletePaths; + Map>> deletePaths; if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { - deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath); + deletePaths = getFilesToCleanKeepingLatestCommits(partitionPaths); } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) { - deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath); + deletePaths = getFilesToCleanKeepingLatestVersions(partitionPaths); } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) { - deletePaths = getFilesToCleanKeepingLatestHours(partitionPath); + deletePaths = getFilesToCleanKeepingLatestHours(partitionPaths); } else { throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name()); } - LOG.info(deletePaths.getValue().size() + " patterns used to delete in partition path:" + partitionPath); - if (deletePaths.getKey()) { - LOG.info("Partition " + partitionPath + " to be deleted"); + for (String partitionPath : deletePaths.keySet()) { + LOG.info(deletePaths.get(partitionPath).getRight().size() + " patterns used to delete in partition path:" + partitionPath); + if (deletePaths.get(partitionPath).getLeft()) { + LOG.info("Partition " + partitionPath + " to be deleted"); + } } return deletePaths; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java index baff4ebac875..967e313f4ee9 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java @@ -57,7 +57,7 @@ public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOExce HoodieWriteConfig writeConfig = getConfigBuilder(true) .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()) .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).withMaxNumDeltaCommitsBeforeCompaction(2).build()) .build(); try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig); HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(DEFAULT_PARTITION_PATHS)) { @@ -81,7 +81,7 @@ public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOExce client.startCommitWithTime(instantTime4, HoodieActiveTimeline.REPLACE_COMMIT_ACTION); client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH), instantTime4); - // 2nd write batch; 4 commits for the 4th partition; the 4th commit to trigger archiving the replace commit + // 2nd write batch; 4 commits for the 3rd partition; the 4th commit to trigger archiving the replace commit for (int i = 5; i < 9; i++) { String instantTime = HoodieActiveTimeline.createNewInstantTime(i * 1000); client.startCommitWithTime(instantTime); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index 8cfd92d01fec..89a184bf497f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -116,7 +116,7 @@ protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActi /** * Refresh commits timeline. - * + * * @param visibleActiveTimeline Visible Active Timeline */ protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) { @@ -750,6 +750,20 @@ public final Stream getAllFileGroups(String partitionStr) { return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg -> !isFileGroupReplaced(fg)); } + @Override + public final Stream>> getAllFileGroups(List partitionPaths) { + return getAllFileGroupsIncludingReplaced(partitionPaths) + .map(pair -> Pair.of(pair.getLeft(), pair.getRight().stream().filter(fg -> !isFileGroupReplaced(fg)).collect(Collectors.toList()))); + } + + private Stream>> getAllFileGroupsIncludingReplaced(final List partitionStrList) { + List>> fileGroupPerPartitionList = new ArrayList<>(); + for (String partitionStr : partitionStrList) { + fileGroupPerPartitionList.add(Pair.of(partitionStr, getAllFileGroupsIncludingReplaced(partitionStr).collect(Collectors.toList()))); + } + return fileGroupPerPartitionList.stream(); + } + private Stream getAllFileGroupsIncludingReplaced(final String partitionStr) { try { readLock.lock(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java index ff44c7cef017..9006bd45cba9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java @@ -204,6 +204,11 @@ public Stream getAllFileGroups(String partitionPath) { return execute(partitionPath, preferredView::getAllFileGroups, secondaryView::getAllFileGroups); } + @Override + public Stream>> getAllFileGroups(List partitionPaths) { + return execute(partitionPaths, preferredView::getAllFileGroups, secondaryView::getAllFileGroups); + } + @Override public Stream getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) { return execute(maxCommitTime, partitionPath, preferredView::getReplacedFileGroupsBeforeOrOn, secondaryView::getReplacedFileGroupsBeforeOrOn); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java index bd18ba22a25d..5e52767fe2ce 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java @@ -51,9 +51,11 @@ import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -377,6 +379,16 @@ public Stream getAllFileGroups(String partitionPath) { } } + @Override + public Stream>> getAllFileGroups(List partitionPaths) { + ArrayList>> fileGroupPerPartitionList = new ArrayList<>(); + for (String partitionPath : partitionPaths) { + Stream fileGroup = getAllFileGroups(partitionPath); + fileGroupPerPartitionList.add(Pair.of(partitionPath, fileGroup.collect(Collectors.toList()))); + } + return fileGroupPerPartitionList.stream(); + } + @Override public Stream getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) { Map paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java index c32e2cabb101..9c83c8f19cd9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java @@ -109,18 +109,18 @@ interface SliceViewWithLatestSlice { /** * Stream all latest file slices in given partition with precondition that commitTime(file) before maxCommitTime. * - * @param partitionPath Partition path - * @param maxCommitTime Max Instant Time + * @param partitionPath Partition path + * @param maxCommitTime Max Instant Time * @param includeFileSlicesInPendingCompaction include file-slices that are in pending compaction */ Stream getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime, - boolean includeFileSlicesInPendingCompaction); + boolean includeFileSlicesInPendingCompaction); /** * Stream all "merged" file-slices before on an instant time If a file-group has a pending compaction request, the * file-slice before and after compaction request instant is merged and returned. - * - * @param partitionPath Partition Path + * + * @param partitionPath Partition Path * @param maxInstantTime Max Instant Time * @return */ @@ -149,10 +149,12 @@ interface SliceView extends SliceViewWithLatestSlice { */ Stream getAllFileGroups(String partitionPath); + Stream>> getAllFileGroups(List partitionPaths); + /** * Return Pending Compaction Operations. * - * @return Pair> + * @return Pair> */ Stream> getPendingCompactionOperations();