From befa87ab33bc8a415567d9ec941e086cc59d539d Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Tue, 2 Apr 2024 14:37:15 +0800 Subject: [PATCH] [#1608][part-3] feat(spark3): support reading from partition block data reassignment servers --- .../spark/shuffle/ShuffleHandleInfo.java | 26 +++++ .../spark/shuffle/ShuffleHandleInfoTest.java | 25 ++++- .../spark/shuffle/RssShuffleManager.java | 27 ++++-- .../test/PartitionBlockDataReassignTest.java | 96 +++++++++++++++++++ .../server/buffer/ShuffleBufferManager.java | 4 + 5 files changed, 166 insertions(+), 12 deletions(-) create mode 100644 integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignTest.java diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/ShuffleHandleInfo.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/ShuffleHandleInfo.java index e54145cbf1..165e17b499 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/ShuffleHandleInfo.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/ShuffleHandleInfo.java @@ -18,7 +18,9 @@ package org.apache.spark.shuffle; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -114,4 +116,28 @@ public ShuffleServerInfo createNewReassignmentForMultiPartitions( } return replacement; } + + /** This composes the partition's replica server + replacement servers. */ + public Map> listAllPartitionAssignmentServers() { + Map> partitionServer = new HashMap<>(); + for (Map.Entry> entry : partitionToServers.entrySet()) { + int partitionId = entry.getKey(); + List replicas = entry.getValue(); + Map replacements = failoverPartitionServers.get(partitionId); + if (replacements == null) { + replacements = Collections.emptyMap(); + } + + List servers = + partitionServer.computeIfAbsent(partitionId, k -> new ArrayList<>()); + for (int i = 0; i < replicas.size(); i++) { + servers.add(replicas.get(i)); + ShuffleServerInfo replacement = replacements.get(i); + if (replacement != null) { + servers.add(replacement); + } + } + } + return partitionServer; + } } diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/ShuffleHandleInfoTest.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/ShuffleHandleInfoTest.java index bb1b1ca77c..c61d82c76d 100644 --- a/client-spark/common/src/test/java/org/apache/spark/shuffle/ShuffleHandleInfoTest.java +++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/ShuffleHandleInfoTest.java @@ -35,8 +35,8 @@ public class ShuffleHandleInfoTest { - private ShuffleServerInfo createFakeServerInfo(String host) { - return new ShuffleServerInfo(host, 1); + private ShuffleServerInfo createFakeServerInfo(String id) { + return new ShuffleServerInfo(id, "1.1.1.1", 1); } @Test @@ -58,4 +58,25 @@ public void testReassignment() { assertEquals(newServer, handleInfo.useExistingReassignmentForMultiPartitions(partitions, "a")); } + + @Test + public void testListAllPartitionAssignmentServers() { + Map> partitionToServers = new HashMap<>(); + partitionToServers.put(1, Arrays.asList(createFakeServerInfo("a"), createFakeServerInfo("b"))); + partitionToServers.put(2, Arrays.asList(createFakeServerInfo("c"))); + + ShuffleHandleInfo handleInfo = + new ShuffleHandleInfo(1, partitionToServers, new RemoteStorageInfo("")); + + Set partitions = new HashSet<>(); + partitions.add(2); + handleInfo.createNewReassignmentForMultiPartitions(partitions, "c", createFakeServerInfo("d")); + + Map> partitionAssignment = + handleInfo.listAllPartitionAssignmentServers(); + assertEquals(2, partitionAssignment.size()); + assertEquals( + Arrays.asList(createFakeServerInfo("c"), createFakeServerInfo("d")), + partitionAssignment.get(2)); + } } diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index 1b4df17478..121fcaaf96 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -663,19 +663,13 @@ public ShuffleReader getReaderImpl( rssShuffleHandle.getPartitionToServers(), rssShuffleHandle.getRemoteStorage()); } - Map> allPartitionToServers = - shuffleHandleInfo.getPartitionToServers(); - Map> requirePartitionToServers = - allPartitionToServers.entrySet().stream() - .filter(x -> x.getKey() >= startPartition && x.getKey() < endPartition) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - Map> serverToPartitions = - RssUtils.generateServerToPartitions(requirePartitionToServers); + Map> blockIdServerToPartitions = + getBlockIdServers(shuffleHandleInfo, startPartition, endPartition); long start = System.currentTimeMillis(); Roaring64NavigableMap blockIdBitmap = getShuffleResultForMultiPart( clientType, - serverToPartitions, + blockIdServerToPartitions, rssShuffleHandle.getAppId(), shuffleId, context.stageAttemptNumber()); @@ -721,7 +715,20 @@ public ShuffleReader getReaderImpl( readMetrics, RssSparkConfig.toRssConf(sparkConf), dataDistributionType, - allPartitionToServers); + shuffleHandleInfo.listAllPartitionAssignmentServers()); + } + + private Map> getBlockIdServers( + ShuffleHandleInfo shuffleHandleInfo, int startPartition, int endPartition) { + Map> allPartitionToServers = + shuffleHandleInfo.getPartitionToServers(); + Map> requirePartitionToServers = + allPartitionToServers.entrySet().stream() + .filter(x -> x.getKey() >= startPartition && x.getKey() < endPartition) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Map> serverToPartitions = + RssUtils.generateServerToPartitions(requirePartitionToServers); + return serverToPartitions; } @SuppressFBWarnings("REC_CATCH_EXCEPTION") diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignTest.java new file mode 100644 index 0000000000..79c91815dd --- /dev/null +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.test; + +import java.io.File; +import java.util.Map; + +import com.google.common.collect.Maps; +import org.apache.spark.SparkConf; +import org.apache.spark.shuffle.RssSparkConfig; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.io.TempDir; + +import org.apache.uniffle.common.rpc.ServerType; +import org.apache.uniffle.coordinator.CoordinatorConf; +import org.apache.uniffle.server.ShuffleServer; +import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.server.buffer.ShuffleBufferManager; +import org.apache.uniffle.storage.util.StorageType; + +import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** This class is to test the mechanism of partition block data reassignment. */ +public class PartitionBlockDataReassignTest extends SparkSQLTest { + + private static String basePath; + + @BeforeAll + public static void setupServers(@TempDir File tmpDir) throws Exception { + // for coordinator + CoordinatorConf coordinatorConf = getCoordinatorConf(); + coordinatorConf.setLong("rss.coordinator.app.expired", 5000); + Map dynamicConf = Maps.newHashMap(); + dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE.name()); + addDynamicConf(coordinatorConf, dynamicConf); + createCoordinatorServer(coordinatorConf); + + // for shuffle-server + File dataDir1 = new File(tmpDir, "data1"); + File dataDir2 = new File(tmpDir, "data2"); + basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath(); + + ShuffleServerConf grpcShuffleServerConf1 = buildShuffleServerConf(ServerType.GRPC); + createShuffleServer(grpcShuffleServerConf1); + + ShuffleServerConf grpcShuffleServerConf2 = buildShuffleServerConf(ServerType.GRPC); + createShuffleServer(grpcShuffleServerConf2); + + startServers(); + + // simulate one server without enough buffer + ShuffleServer faultyShuffleServer = grpcShuffleServers.get(0); + ShuffleBufferManager bufferManager = faultyShuffleServer.getShuffleBufferManager(); + bufferManager.setUsedMemory(bufferManager.getCapacity() + 100); + } + + private static ShuffleServerConf buildShuffleServerConf(ServerType serverType) throws Exception { + ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType); + shuffleServerConf.setLong("rss.server.heartbeat.interval", 5000); + shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 4000); + shuffleServerConf.setString("rss.storage.basePath", basePath); + shuffleServerConf.setString("rss.storage.type", StorageType.MEMORY_LOCALFILE.name()); + return shuffleServerConf; + } + + @Override + public void updateRssStorage(SparkConf sparkConf) { + sparkConf.set("spark" + RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER, "1"); + } + + @Override + public void checkShuffleData() throws Exception { + Thread.sleep(12000); + String[] paths = basePath.split(","); + for (String path : paths) { + File f = new File(path); + assertEquals(0, f.list().length); + } + } +} diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java index 8f41a07956..7c233b2401 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java @@ -712,4 +712,8 @@ public boolean limitHugePartition( } return false; } + + public void setUsedMemory(long usedMemory) { + this.usedMemory.set(usedMemory); + } }