Skip to content

Commit

Permalink
[apache#1727] improvement(server): Introduce blocks number threshold …
Browse files Browse the repository at this point in the history
…when flushing a single buffer to reduce gc
  • Loading branch information
rickyma committed May 30, 2024
1 parent d182a03 commit e2cbcc1
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,18 @@ public class ShuffleServerConf extends RssBaseConf {
.defaultValue(128 * 1024 * 1024L)
.withDescription("The threshold of single shuffle buffer flush");

public static final ConfigOption<Integer> SINGLE_BUFFER_FLUSH_BLOCKS_NUM_THRESHOLD =
ConfigOptions.key("rss.server.single.buffer.flush.blocksNumberThreshold")
.intType()
.defaultValue(4000)
.withDescription(
"The blocks number threshold for triggering a flush for a single shuffle buffer. "
+ "This threshold is mainly used to control jobs with an excessive number of small blocks, "
+ "allowing these small blocks to be flushed as much as possible, "
+ "rather than being maintained in the heap and unable to be garbage collected. "
+ "This can cause severe garbage collection issues on the server side, and may even lead to out-of-heap-memory errors. "
+ "If the threshold is set too high, it becomes meaningless.");

public static final ConfigOption<Long> SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL =
ConfigOptions.key("rss.server.leak.shuffledata.check.interval")
.longType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class ShuffleBufferManager {
private long lowWaterMark;
private boolean bufferFlushEnabled;
private long bufferFlushThreshold;
private long bufferFlushBlocksNumThreshold;
// when shuffle buffer manager flushes data, shuffles with data size < shuffleFlushThreshold is
// kept in memory to
// reduce small I/Os to persistent storage, especially for local HDDs.
Expand Down Expand Up @@ -125,6 +126,8 @@ public ShuffleBufferManager(
this.bufferFlushEnabled = conf.getBoolean(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED);
this.bufferFlushThreshold =
conf.getSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD);
this.bufferFlushBlocksNumThreshold =
conf.getInteger(ShuffleServerConf.SINGLE_BUFFER_FLUSH_BLOCKS_NUM_THRESHOLD);
this.shuffleFlushThreshold =
conf.getSizeAsBytes(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD);
this.hugePartitionSizeThresholdRef =
Expand Down Expand Up @@ -266,7 +269,18 @@ void flushSingleBufferIfNecessary(
// When we use multi storage and trigger single buffer flush, the buffer size should be bigger
// than rss.server.flush.cold.storage.threshold.size, otherwise cold storage will be useless.
if ((isHugePartition || this.bufferFlushEnabled)
&& buffer.getSize() > this.bufferFlushThreshold) {
&& (buffer.getSize() > this.bufferFlushThreshold
|| buffer.getBlocks().size() > bufferFlushBlocksNumThreshold)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Start to flush single buffer. Details - shuffleId:{}, startPartition:{}, endPartition:{}, isHugePartition:{}, bufferSize:{}, blocksNum:{}",
shuffleId,
startPartition,
endPartition,
isHugePartition,
buffer.getSize(),
buffer.getBlocks().size());
}
flushBuffer(buffer, appId, shuffleId, startPartition, endPartition, isHugePartition);
return;
}
Expand Down

0 comments on commit e2cbcc1

Please sign in to comment.