From 6d65c59c76694f798412968a006d88d7ff529222 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Tue, 14 Mar 2023 11:56:25 -0700 Subject: [PATCH] [HUDI-5926] Improve cleaner parallelism (#8171) --- .../org/apache/hudi/config/HoodieCleanConfig.java | 13 ++++++++++++- .../table/action/clean/CleanActionExecutor.java | 4 ++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java index 30289e1acbab4..c1b66a371d19c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java @@ -29,6 +29,7 @@ import org.apache.hudi.table.action.clean.CleaningTriggerStrategy; import javax.annotation.concurrent.Immutable; + import java.io.File; import java.io.FileReader; import java.io.IOException; @@ -120,7 +121,17 @@ public class HoodieCleanConfig extends HoodieConfig { public static final ConfigProperty CLEANER_PARALLELISM_VALUE = ConfigProperty .key("hoodie.cleaner.parallelism") .defaultValue("200") - .withDocumentation("Parallelism for the cleaning operation. Increase this if cleaning becomes slow."); + .withDocumentation("This config controls the behavior of both the cleaning plan and " + + "cleaning execution. Deriving the cleaning plan is parallelized at the table " + + "partition level, i.e., each table partition is processed by one Spark task to figure " + + "out the files to clean. The cleaner picks the configured parallelism if the number " + + "of table partitions is larger than this configured value. The parallelism is " + + "assigned to the number of table partitions if it is smaller than the configured value. " + + "The clean execution, i.e., the file deletion, is parallelized at file level, which " + + "is the unit of Spark task distribution. Similarly, the actual parallelism cannot " + + "exceed the configured value if the number of files is larger. If cleaning plan or " + + "execution is slow due to limited parallelism, you can increase this to tune the " + + "performance.."); public static final ConfigProperty ALLOW_MULTIPLE_CLEANS = ConfigProperty .key("hoodie.clean.allow.multiple") diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index 9137eb436bb8a..01b8d19122651 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -126,11 +126,11 @@ private static Stream> deleteFilesFunc(Iterator */ List clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) { int cleanerParallelism = Math.min( - (int) (cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()), + cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum(), config.getCleanerParallelism()); LOG.info("Using cleanerParallelism: " + cleanerParallelism); - context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of partitions: " + config.getTableName()); + context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of table: " + config.getTableName()); Stream> filesToBeDeletedPerPartition = cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()