Skip to content

Commit

Permalink
[HUDI-5926] Improve cleaner parallelism (apache#8171)
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua authored and nsivabalan committed Mar 18, 2023
1 parent f61e41c commit 70bef3f
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.table.action.clean.CleaningTriggerStrategy;

import javax.annotation.concurrent.Immutable;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
Expand Down Expand Up @@ -120,7 +121,17 @@ public class HoodieCleanConfig extends HoodieConfig {
public static final ConfigProperty<String> CLEANER_PARALLELISM_VALUE = ConfigProperty
.key("hoodie.cleaner.parallelism")
.defaultValue("200")
.withDocumentation("Parallelism for the cleaning operation. Increase this if cleaning becomes slow.");
.withDocumentation("This config controls the behavior of both the cleaning plan and "
+ "cleaning execution. Deriving the cleaning plan is parallelized at the table "
+ "partition level, i.e., each table partition is processed by one Spark task to figure "
+ "out the files to clean. The cleaner picks the configured parallelism if the number "
+ "of table partitions is larger than this configured value. The parallelism is "
+ "assigned to the number of table partitions if it is smaller than the configured value. "
+ "The clean execution, i.e., the file deletion, is parallelized at file level, which "
+ "is the unit of Spark task distribution. Similarly, the actual parallelism cannot "
+ "exceed the configured value if the number of files is larger. If cleaning plan or "
+ "execution is slow due to limited parallelism, you can increase this to tune the "
+ "performance..");

public static final ConfigProperty<Boolean> ALLOW_MULTIPLE_CLEANS = ConfigProperty
.key("hoodie.clean.allow.multiple")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ private static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator
*/
List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) {
int cleanerParallelism = Math.min(
(int) (cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()),
cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum(),
config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism);

context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of partitions: " + config.getTableName());
context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of table: " + config.getTableName());

Stream<Pair<String, CleanFileInfo>> filesToBeDeletedPerPartition =
cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
Expand Down

0 comments on commit 70bef3f

Please sign in to comment.