Skip to content

Commit

Permalink
[apache#1608][part-2] fix(spark): avoid releasing block previously wh…
Browse files Browse the repository at this point in the history
…en enable block resend
  • Loading branch information
zuston committed Mar 29, 2024
1 parent cbf4f6f commit 990a0e2
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,27 @@
import java.util.List;

import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.function.TupleConsumer;

public class AddBlockEvent {

private String taskId;
private List<ShuffleBlockInfo> shuffleDataInfoList;
private List<Runnable> processedCallbackChain;

// The var is to indicate if the blocks fail to send, whether the writer will resend to
// re-assignment servers.
// if so, the failure blocks will not be released.
private boolean isResendEnabled = false;

private TupleConsumer<ShuffleBlockInfo, Boolean> blockProcessedCallback;

public AddBlockEvent(String taskId, List<ShuffleBlockInfo> shuffleDataInfoList) {
this.taskId = taskId;
this.shuffleDataInfoList = shuffleDataInfoList;
this.processedCallbackChain = new ArrayList<>();
}

public AddBlockEvent(
String taskId, List<ShuffleBlockInfo> shuffleBlockInfoList, Runnable callback) {
this.taskId = taskId;
this.shuffleDataInfoList = shuffleBlockInfoList;
this.processedCallbackChain = new ArrayList<>();
addCallback(callback);
}

/** @param callback, should not throw any exception and execute fast. */
public void addCallback(Runnable callback) {
processedCallbackChain.add(callback);
Expand All @@ -59,6 +59,23 @@ public List<Runnable> getProcessedCallbackChain() {
return processedCallbackChain;
}

public void withBlockProcessedCallback(
TupleConsumer<ShuffleBlockInfo, Boolean> blockProcessedCallback) {
this.blockProcessedCallback = blockProcessedCallback;
}

public TupleConsumer<ShuffleBlockInfo, Boolean> getBlockProcessedCallback() {
return blockProcessedCallback;
}

public void enableBlockResend() {
this.isResendEnabled = true;
}

public boolean isBlockResendEnabled() {
return isResendEnabled;
}

@Override
public String toString() {
return "AddBlockEvent: TaskId[" + taskId + "], " + shuffleDataInfoList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,22 @@ public CompletableFuture<Long> send(AddBlockEvent event) {
() -> {
String taskId = event.getTaskId();
List<ShuffleBlockInfo> shuffleBlockInfoList = event.getShuffleDataInfoList();
SendShuffleDataResult result = null;
try {
SendShuffleDataResult result =
result =
shuffleWriteClient.sendShuffleData(
rssAppId, shuffleBlockInfoList, () -> !isValidTask(taskId));
putBlockId(taskToSuccessBlockIds, taskId, result.getSuccessBlockIds());
putFailedBlockSendTracker(
taskToFailedBlockSendTracker, taskId, result.getFailedBlockSendTracker());
} finally {
Set<Long> succeedBlockIds = result.getSuccessBlockIds();
for (ShuffleBlockInfo block : shuffleBlockInfoList) {
event
.getBlockProcessedCallback()
.accept(block, succeedBlockIds.contains(block.getBlockId()));
}

List<Runnable> callbackChain =
Optional.of(event.getProcessedCallbackChain()).orElse(Collections.EMPTY_LIST);
for (Runnable runnable : callbackChain) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,12 +410,10 @@ private void requestExecutorMemory(long leastMem) {

public List<AddBlockEvent> buildBlockEvents(List<ShuffleBlockInfo> shuffleBlockInfoList) {
long totalSize = 0;
long memoryUsed = 0;
List<AddBlockEvent> events = new ArrayList<>();
List<ShuffleBlockInfo> shuffleBlockInfosPerEvent = Lists.newArrayList();
for (ShuffleBlockInfo sbi : shuffleBlockInfoList) {
totalSize += sbi.getSize();
memoryUsed += sbi.getFreeMemory();
shuffleBlockInfosPerEvent.add(sbi);
// split shuffle data according to the size
if (totalSize > sendSizeLimit) {
Expand All @@ -427,20 +425,9 @@ public List<AddBlockEvent> buildBlockEvents(List<ShuffleBlockInfo> shuffleBlockI
+ totalSize
+ " bytes");
}
// Use final temporary variables for closures
final long memoryUsedTemp = memoryUsed;
final List<ShuffleBlockInfo> shuffleBlocksTemp = shuffleBlockInfosPerEvent;
events.add(
new AddBlockEvent(
taskId,
shuffleBlockInfosPerEvent,
() -> {
freeAllocatedMemory(memoryUsedTemp);
shuffleBlocksTemp.stream().forEach(x -> x.getData().release());
}));
events.add(new AddBlockEvent(taskId, shuffleBlockInfosPerEvent));
shuffleBlockInfosPerEvent = Lists.newArrayList();
totalSize = 0;
memoryUsed = 0;
}
}
if (!shuffleBlockInfosPerEvent.isEmpty()) {
Expand All @@ -453,16 +440,7 @@ public List<AddBlockEvent> buildBlockEvents(List<ShuffleBlockInfo> shuffleBlockI
+ " bytes");
}
// Use final temporary variables for closures
final long memoryUsedTemp = memoryUsed;
final List<ShuffleBlockInfo> shuffleBlocksTemp = shuffleBlockInfosPerEvent;
events.add(
new AddBlockEvent(
taskId,
shuffleBlockInfosPerEvent,
() -> {
freeAllocatedMemory(memoryUsedTemp);
shuffleBlocksTemp.stream().forEach(x -> x.getData().release());
}));
events.add(new AddBlockEvent(taskId, shuffleBlockInfosPerEvent));
}
return events;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
private final Set<Long> blockIds = Sets.newConcurrentHashSet();
private TaskContext taskContext;
private SparkConf sparkConf;
private boolean taskFailRetry;
private boolean isBlockFailSentRetryEnabled;
private int blockFailSentMaxTimes = 2;

/** used by columnar rss shuffle writer implementation */
protected final long taskAttemptId;
Expand Down Expand Up @@ -189,7 +190,7 @@ private RssShuffleWriter(
this.taskFailureCallback = taskFailureCallback;
this.taskContext = context;
this.sparkConf = sparkConf;
this.taskFailRetry =
this.isBlockFailSentRetryEnabled =
sparkConf.getBoolean(
RssClientConf.RSS_TASK_FAILED_RETRY_ENABLED.key(),
RssClientConf.RSS_TASK_FAILED_RETRY_ENABLED.defaultValue());
Expand Down Expand Up @@ -265,8 +266,8 @@ private void writeImpl(Iterator<Product2<K, V>> records) {
long recordCount = 0;
while (records.hasNext()) {
recordCount++;
// Task should fast fail when sending data failed
checkIfBlocksFailed();

dataCheckOrRetry();

Product2<K, V> record = records.next();
K key = record._1();
Expand Down Expand Up @@ -359,6 +360,26 @@ protected List<CompletableFuture<Long>> postBlockEvent(
List<ShuffleBlockInfo> shuffleBlockInfoList) {
List<CompletableFuture<Long>> futures = new ArrayList<>();
for (AddBlockEvent event : bufferManager.buildBlockEvents(shuffleBlockInfoList)) {
event.withBlockProcessedCallback(
(block, isSuccessful) -> {
boolean isRelease = false;
if (!isBlockFailSentRetryEnabled) {
isRelease = true;
} else {
if (isSuccessful) {
isRelease = true;
} else {
if (block.getRetryCounter() >= blockFailSentMaxTimes - 1) {
isRelease = true;
}
}
}

if (isRelease) {
bufferManager.freeAllocatedMemory(block.getFreeMemory());
block.getData().release();
}
});
event.addCallback(
() -> {
boolean ret = finishEventQueue.add(new Object());
Expand All @@ -382,7 +403,7 @@ protected void checkBlockSendResult(Set<Long> blockIds) {
while (true) {
try {
finishEventQueue.clear();
checkIfBlocksFailed();
dataCheckOrRetry();
Set<Long> successBlockIds = shuffleManager.getSuccessBlockIds(taskId);
blockIds.removeAll(successBlockIds);
if (blockIds.isEmpty()) {
Expand Down Expand Up @@ -418,12 +439,67 @@ protected void checkBlockSendResult(Set<Long> blockIds) {
}
}

private void dataCheckOrRetry() {
if (isBlockFailSentRetryEnabled) {
collectBlocksToResendOrFastFail();
} else {
if (hasAnyBlockFailure()) {
throw new RssSendFailedException();
}
}
}

private boolean hasAnyBlockFailure() {
Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
if (!failedBlockIds.isEmpty()) {
LOG.error(
"Errors on sending blocks for task[{}]. {} blocks can't be sent to remote servers: {}",
taskId,
failedBlockIds.size(),
shuffleManager.getBlockIdsFailedSendTracker(taskId).getFaultyShuffleServers());
return true;
}
return false;
}

private void collectBlocksToResendOrFastFail() {
if (!isBlockFailSentRetryEnabled) {
return;
}

FailedBlockSendTracker failedTracker = shuffleManager.getBlockIdsFailedSendTracker(taskId);
Set<Long> failedBlockIds = failedTracker.getFailedBlockIds();
if (failedBlockIds == null || failedBlockIds.isEmpty()) {
return;
}

Set<TrackingBlockStatus> resendCandidates = new HashSet<>();
// to check whether the blocks resent exceed the max resend count.
for (Long blockId : failedBlockIds) {
List<TrackingBlockStatus> retryRecords = failedTracker.getFailedBlockStatus(blockId);
// todo: support retry times by config
if (retryRecords.size() >= blockFailSentMaxTimes) {
LOG.error(
"Partial blocks for taskId: [{}] retry exceeding the max retry times. Fast fail! faulty server list: {}",
taskId,
retryRecords.stream().map(x -> x.getShuffleServerInfo()).collect(Collectors.toSet()));
// fast fail if any blocks failure with multiple retry times
throw new RssSendFailedException();
}

// todo: if setting multi replica and another replica is succeed to send, no need to resend
resendCandidates.add(retryRecords.get(retryRecords.size() - 1));
}

resendFailedBlocks(resendCandidates);
}

private void checkIfBlocksFailed() {
Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
if (taskFailRetry && !failedBlockIds.isEmpty()) {
if (isBlockFailSentRetryEnabled && !failedBlockIds.isEmpty()) {
Set<TrackingBlockStatus> shouldResendBlockSet = shouldResendBlockStatusSet(failedBlockIds);
try {
reSendFailedBlockIds(shouldResendBlockSet);
resendFailedBlocks(shouldResendBlockSet);
} catch (Exception e) {
LOG.error("resend failed blocks failed.", e);
}
Expand Down Expand Up @@ -456,7 +532,7 @@ private Set<TrackingBlockStatus> shouldResendBlockStatusSet(Set<Long> failedBloc
return resendBlockStatusSet;
}

private void reSendFailedBlockIds(Set<TrackingBlockStatus> failedBlockStatusSet) {
private void resendFailedBlocks(Set<TrackingBlockStatus> failedBlockStatusSet) {
List<ShuffleBlockInfo> reAssignSeverBlockInfoList = Lists.newArrayList();
List<ShuffleBlockInfo> failedBlockInfoList = Lists.newArrayList();
Map<ShuffleServerInfo, List<TrackingBlockStatus>> faultyServerToPartitions =
Expand All @@ -471,36 +547,37 @@ private void reSendFailedBlockIds(Set<TrackingBlockStatus> failedBlockStatusSet)
.collect(Collectors.toSet());
ShuffleServerInfo dynamicShuffleServer = faultyServers.get(t.getKey().getId());
if (dynamicShuffleServer == null) {
// todo: merge multiple requests into one.
dynamicShuffleServer =
reAssignFaultyShuffleServer(partitionIds, t.getKey().getId());
faultyServers.put(t.getKey().getId(), dynamicShuffleServer);
}

ShuffleServerInfo finalDynamicShuffleServer = dynamicShuffleServer;
failedBlockStatusSet.forEach(
trackingBlockStatus -> {
ShuffleBlockInfo failedBlockInfo = trackingBlockStatus.getShuffleBlockInfo();
failedBlockInfoList.add(failedBlockInfo);
reAssignSeverBlockInfoList.add(
new ShuffleBlockInfo(
failedBlockInfo.getShuffleId(),
failedBlockInfo.getPartitionId(),
failedBlockInfo.getBlockId(),
failedBlockInfo.getLength(),
failedBlockInfo.getCrc(),
failedBlockInfo.getData(),
Lists.newArrayList(finalDynamicShuffleServer),
failedBlockInfo.getUncompressLength(),
failedBlockInfo.getFreeMemory(),
taskAttemptId));
});
for (TrackingBlockStatus blockStatus : failedBlockStatusSet) {
ShuffleBlockInfo failedBlockInfo = blockStatus.getShuffleBlockInfo();
failedBlockInfoList.add(failedBlockInfo);
ShuffleBlockInfo newBlock =
new ShuffleBlockInfo(
failedBlockInfo.getShuffleId(),
failedBlockInfo.getPartitionId(),
failedBlockInfo.getBlockId(),
failedBlockInfo.getLength(),
failedBlockInfo.getCrc(),
failedBlockInfo.getData(),
Lists.newArrayList(finalDynamicShuffleServer),
failedBlockInfo.getUncompressLength(),
failedBlockInfo.getFreeMemory(),
taskAttemptId);
newBlock.setRetryCounter(failedBlockInfo.getRetryCounter() + 1);
reAssignSeverBlockInfoList.add(newBlock);
}
});
clearFailedBlockIdsStates(failedBlockInfoList, faultyServers);
clearFailedBlockStates(failedBlockInfoList, faultyServers);
processShuffleBlockInfos(reAssignSeverBlockInfoList);
checkIfBlocksFailed();
}

private void clearFailedBlockIdsStates(
private void clearFailedBlockStates(
List<ShuffleBlockInfo> failedBlockInfoList, Map<String, ShuffleServerInfo> faultyServers) {
failedBlockInfoList.forEach(
shuffleBlockInfo -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class ShuffleBlockInfo {
private int uncompressLength;
private long freeMemory;

private int retryCounter = 0;

public ShuffleBlockInfo(
int shuffleId,
int partitionId,
Expand Down Expand Up @@ -84,6 +86,14 @@ public ShuffleBlockInfo(
this.taskAttemptId = taskAttemptId;
}

public int getRetryCounter() {
return retryCounter;
}

public void setRetryCounter(int retryCounter) {
this.retryCounter = retryCounter;
}

public long getBlockId() {
return blockId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
package org.apache.uniffle.common.exception;

public class RssSendFailedException extends RssException {

public RssSendFailedException() {
super("");
}

public RssSendFailedException(String message) {
super(message);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common.function;

@FunctionalInterface
public interface TupleConsumer<T, F> {
void accept(T t, F f);
}

0 comments on commit 990a0e2

Please sign in to comment.