From bbc15b1d0155ada2c7fdd1d6748ef13b5f01adec Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Wed, 28 Sep 2022 08:03:41 -0700 Subject: [PATCH] [HUDI-4924] Auto-tune dedup parallelism (#6802) --- .../action/commit/HoodieWriteHelper.java | 5 ++- .../org/apache/hudi/data/HoodieJavaRDD.java | 5 +++ .../TestHoodieClientOnCopyOnWriteStorage.java | 14 +++++-- .../apache/hudi/data/TestHoodieJavaRDD.java | 40 +++++++++++++++++++ .../apache/hudi/common/data/HoodieData.java | 9 ++++- .../hudi/common/data/HoodieListData.java | 5 +++ .../hudi/common/data/TestHoodieListData.java | 8 ++++ 7 files changed, 80 insertions(+), 6 deletions(-) create mode 100644 hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java index 80762b1de85a..b359550e8a7b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java @@ -54,6 +54,9 @@ protected HoodieData> tag(HoodieData> dedupedRec public HoodieData> deduplicateRecords( HoodieData> records, HoodieIndex index, int parallelism) { boolean isIndexingGlobal = index.isGlobal(); + // Auto-tunes the parallelism for reduce transformation based on the number of data partitions + // in engine-specific representation + int reduceParallelism = Math.max(1, Math.min(records.getNumPartitions(), parallelism)); return records.mapToPair(record -> { HoodieKey hoodieKey = record.getKey(); // If index used is global, then records are expected to differ in their partitionPath @@ -65,7 +68,7 @@ public HoodieData> deduplicateRecords( HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey(); return new HoodieAvroRecord<>(reducedKey, reducedData); - }, parallelism).map(Pair::getRight); + }, reduceParallelism).map(Pair::getRight); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java index 3964fa2d6bfb..ed9613bc15fe 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java @@ -102,6 +102,11 @@ public long count() { return rddData.count(); } + @Override + public int getNumPartitions() { + return rddData.getNumPartitions(); + } + @Override public HoodieData map(SerializableFunction func) { return HoodieJavaRDD.of(rddData.map(func::apply)); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 4b7a0139ddda..8aafbcd9f60e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -460,11 +460,17 @@ private void testDeduplication( HoodieData> records = HoodieJavaRDD.of( jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1)); + HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY) + .combineInput(true, true); + addConfigsForPopulateMetaFields(configBuilder, populateMetaFields); // Global dedup should be done based on recordKey only HoodieIndex index = mock(HoodieIndex.class); when(index.isGlobal()).thenReturn(true); - List> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList(); + int dedupParallelism = records.getNumPartitions() + 100; + HoodieData> dedupedRecsRdd = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, dedupParallelism); + List> dedupedRecs = dedupedRecsRdd.collectAsList(); + assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions()); assertEquals(1, dedupedRecs.size()); assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath()); assertNodupesWithinPartition(dedupedRecs); @@ -472,13 +478,15 @@ private void testDeduplication( // non-Global dedup should be done based on both recordKey and partitionPath index = mock(HoodieIndex.class); when(index.isGlobal()).thenReturn(false); - dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList(); + dedupedRecsRdd = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, dedupParallelism); + dedupedRecs = dedupedRecsRdd.collectAsList(); + assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions()); assertEquals(2, dedupedRecs.size()); assertNodupesWithinPartition(dedupedRecs); // Perform write-action and check JavaRDD recordList = jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1); - HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY) + configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY) .combineInput(true, true); addConfigsForPopulateMetaFields(configBuilder, populateMetaFields); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java new file mode 100644 index 000000000000..75958883048e --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.data; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.testutils.HoodieClientTestBase; + +import org.junit.jupiter.api.Test; + +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestHoodieJavaRDD extends HoodieClientTestBase { + @Test + public void testGetNumPartitions() { + int numPartitions = 6; + HoodieData rddData = HoodieJavaRDD.of(jsc.parallelize( + IntStream.rangeClosed(0, 100).boxed().collect(Collectors.toList()), numPartitions)); + assertEquals(numPartitions, rddData.getNumPartitions()); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java index 2d24e7dd1296..1d56e63fad92 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java @@ -67,14 +67,19 @@ public interface HoodieData extends Serializable { /** * Returns number of objects held in the collection - * + *

* NOTE: This is a terminal operation */ long count(); + /** + * @return the number of data partitions in the engine-specific representation. + */ + int getNumPartitions(); + /** * Maps every element in the collection using provided mapping {@code func}. - * + *

* This is an intermediate operation * * @param func serializable map function diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java index 0be9ec9fa73b..b2a503a85b32 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java @@ -175,6 +175,11 @@ public long count() { return super.count(); } + @Override + public int getNumPartitions() { + return 1; + } + @Override public List collectAsList() { return super.collectAsList(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java index 8da8be1338a3..ea19f128d1a9 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -64,4 +65,11 @@ void testEagerSemantic() { assertEquals(3, originalListData.count()); assertEquals(sourceList, originalListData.collectAsList()); } + + @Test + public void testGetNumPartitions() { + HoodieData listData = HoodieListData.eager( + IntStream.rangeClosed(0, 100).boxed().collect(Collectors.toList())); + assertEquals(1, listData.getNumPartitions()); + } }