Skip to content

Commit

Permalink
[apache#1608][part-2] fix(spark): avoid releasing block in advance wh…
Browse files Browse the repository at this point in the history
…en enable block resend (apache#1610)

### What changes were proposed in this pull request?

1. avoid releasing block previously when enable block resend
2. introduce the block max retry times

### Why are the changes needed?

For: apache#1608

In the current codebase for partition reassignment, it has some bugs as follows
1. data has been released when resending.
2. if the blocks fail to resend, it may fast fail without retry again

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

`RssShuffleWriterTest#blockFailureResendTest` is to test the resending block mechanism.
  • Loading branch information
zuston authored Apr 8, 2024
1 parent f9d71da commit 80caa0e
Show file tree
Hide file tree
Showing 13 changed files with 520 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,6 @@ public AddBlockEvent(String taskId, List<ShuffleBlockInfo> shuffleDataInfoList)
this.processedCallbackChain = new ArrayList<>();
}

public AddBlockEvent(
String taskId, List<ShuffleBlockInfo> shuffleBlockInfoList, Runnable callback) {
this.taskId = taskId;
this.shuffleDataInfoList = shuffleBlockInfoList;
this.processedCallbackChain = new ArrayList<>();
addCallback(callback);
}

/** @param callback, should not throw any exception and execute fast. */
public void addCallback(Runnable callback) {
processedCallbackChain.add(callback);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.writer;

import org.apache.uniffle.common.ShuffleBlockInfo;

public interface BlockFailureCallback {
void onBlockFailure(ShuffleBlockInfo block);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.writer;

import org.apache.uniffle.common.ShuffleBlockInfo;

public interface BlockSuccessCallback {
void onBlockSuccess(ShuffleBlockInfo block);
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,23 @@ public CompletableFuture<Long> send(AddBlockEvent event) {
() -> {
String taskId = event.getTaskId();
List<ShuffleBlockInfo> shuffleBlockInfoList = event.getShuffleDataInfoList();
SendShuffleDataResult result = null;
try {
SendShuffleDataResult result =
result =
shuffleWriteClient.sendShuffleData(
rssAppId, shuffleBlockInfoList, () -> !isValidTask(taskId));
putBlockId(taskToSuccessBlockIds, taskId, result.getSuccessBlockIds());
putFailedBlockSendTracker(
taskToFailedBlockSendTracker, taskId, result.getFailedBlockSendTracker());
} finally {
Set<Long> succeedBlockIds =
result.getSuccessBlockIds() == null
? Collections.emptySet()
: result.getSuccessBlockIds();
for (ShuffleBlockInfo block : shuffleBlockInfoList) {
block.executeCompletionCallback(succeedBlockIds.contains(block.getBlockId()));
}

List<Runnable> callbackChain =
Optional.of(event.getProcessedCallbackChain()).orElse(Collections.EMPTY_LIST);
for (Runnable runnable : callbackChain) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,14 +408,18 @@ private void requestExecutorMemory(long leastMem) {
}
}

public void releaseBlockResource(ShuffleBlockInfo block) {
this.freeAllocatedMemory(block.getFreeMemory());
block.getData().release();
}

public List<AddBlockEvent> buildBlockEvents(List<ShuffleBlockInfo> shuffleBlockInfoList) {
long totalSize = 0;
long memoryUsed = 0;
List<AddBlockEvent> events = new ArrayList<>();
List<ShuffleBlockInfo> shuffleBlockInfosPerEvent = Lists.newArrayList();
for (ShuffleBlockInfo sbi : shuffleBlockInfoList) {
sbi.withCompletionCallback((block, isSuccessful) -> this.releaseBlockResource(block));
totalSize += sbi.getSize();
memoryUsed += sbi.getFreeMemory();
shuffleBlockInfosPerEvent.add(sbi);
// split shuffle data according to the size
if (totalSize > sendSizeLimit) {
Expand All @@ -427,20 +431,9 @@ public List<AddBlockEvent> buildBlockEvents(List<ShuffleBlockInfo> shuffleBlockI
+ totalSize
+ " bytes");
}
// Use final temporary variables for closures
final long memoryUsedTemp = memoryUsed;
final List<ShuffleBlockInfo> shuffleBlocksTemp = shuffleBlockInfosPerEvent;
events.add(
new AddBlockEvent(
taskId,
shuffleBlockInfosPerEvent,
() -> {
freeAllocatedMemory(memoryUsedTemp);
shuffleBlocksTemp.stream().forEach(x -> x.getData().release());
}));
events.add(new AddBlockEvent(taskId, shuffleBlockInfosPerEvent));
shuffleBlockInfosPerEvent = Lists.newArrayList();
totalSize = 0;
memoryUsed = 0;
}
}
if (!shuffleBlockInfosPerEvent.isEmpty()) {
Expand All @@ -453,16 +446,7 @@ public List<AddBlockEvent> buildBlockEvents(List<ShuffleBlockInfo> shuffleBlockI
+ " bytes");
}
// Use final temporary variables for closures
final long memoryUsedTemp = memoryUsed;
final List<ShuffleBlockInfo> shuffleBlocksTemp = shuffleBlockInfosPerEvent;
events.add(
new AddBlockEvent(
taskId,
shuffleBlockInfosPerEvent,
() -> {
freeAllocatedMemory(memoryUsedTemp);
shuffleBlocksTemp.stream().forEach(x -> x.getData().release());
}));
events.add(new AddBlockEvent(taskId, shuffleBlockInfosPerEvent));
}
return events;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ public void spillByOwnTest() {
long sum = 0L;
List<AddBlockEvent> events = wbm.buildBlockEvents(blocks);
for (AddBlockEvent event : events) {
for (ShuffleBlockInfo block : event.getShuffleDataInfoList()) {
block.executeCompletionCallback(true);
}
event.getProcessedCallbackChain().stream().forEach(x -> x.run());
sum += event.getShuffleDataInfoList().stream().mapToLong(x -> x.getFreeMemory()).sum();
}
Expand Down Expand Up @@ -413,6 +416,9 @@ public void spillByOwnTest() {
// ignore.
}
}
for (ShuffleBlockInfo block : event.getShuffleDataInfoList()) {
block.executeCompletionCallback(true);
}
event.getProcessedCallbackChain().stream().forEach(x -> x.run());
sum +=
event.getShuffleDataInfoList().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1264,4 +1264,9 @@ private RemoteStorageInfo getRemoteStorageInfo() {
public boolean isRssResubmitStage() {
return rssResubmitStage;
}

@VisibleForTesting
public void setDataPusher(DataPusher dataPusher) {
this.dataPusher = dataPusher;
}
}
Loading

0 comments on commit 80caa0e

Please sign in to comment.