From e2cbcc107eaf50d4e0e145be5927bde7f0dd0af1 Mon Sep 17 00:00:00 2001 From: rickyma Date: Thu, 30 May 2024 16:24:19 +0800 Subject: [PATCH] [#1727] improvement(server): Introduce blocks number threshold when flushing a single buffer to reduce gc --- .../apache/uniffle/server/ShuffleServerConf.java | 12 ++++++++++++ .../server/buffer/ShuffleBufferManager.java | 16 +++++++++++++++- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java index fe39790127..ea642e1098 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java @@ -393,6 +393,18 @@ public class ShuffleServerConf extends RssBaseConf { .defaultValue(128 * 1024 * 1024L) .withDescription("The threshold of single shuffle buffer flush"); + public static final ConfigOption SINGLE_BUFFER_FLUSH_BLOCKS_NUM_THRESHOLD = + ConfigOptions.key("rss.server.single.buffer.flush.blocksNumberThreshold") + .intType() + .defaultValue(4000) + .withDescription( + "The blocks number threshold for triggering a flush for a single shuffle buffer. " + + "This threshold is mainly used to control jobs with an excessive number of small blocks, " + + "allowing these small blocks to be flushed as much as possible, " + + "rather than being maintained in the heap and unable to be garbage collected. " + + "This can cause severe garbage collection issues on the server side, and may even lead to out-of-heap-memory errors. " + + "If the threshold is set too high, it becomes meaningless."); + public static final ConfigOption SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL = ConfigOptions.key("rss.server.leak.shuffledata.check.interval") .longType() diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java index 257af93192..9b7498d3d8 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java @@ -65,6 +65,7 @@ public class ShuffleBufferManager { private long lowWaterMark; private boolean bufferFlushEnabled; private long bufferFlushThreshold; + private long bufferFlushBlocksNumThreshold; // when shuffle buffer manager flushes data, shuffles with data size < shuffleFlushThreshold is // kept in memory to // reduce small I/Os to persistent storage, especially for local HDDs. @@ -125,6 +126,8 @@ public ShuffleBufferManager( this.bufferFlushEnabled = conf.getBoolean(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED); this.bufferFlushThreshold = conf.getSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD); + this.bufferFlushBlocksNumThreshold = + conf.getInteger(ShuffleServerConf.SINGLE_BUFFER_FLUSH_BLOCKS_NUM_THRESHOLD); this.shuffleFlushThreshold = conf.getSizeAsBytes(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD); this.hugePartitionSizeThresholdRef = @@ -266,7 +269,18 @@ void flushSingleBufferIfNecessary( // When we use multi storage and trigger single buffer flush, the buffer size should be bigger // than rss.server.flush.cold.storage.threshold.size, otherwise cold storage will be useless. if ((isHugePartition || this.bufferFlushEnabled) - && buffer.getSize() > this.bufferFlushThreshold) { + && (buffer.getSize() > this.bufferFlushThreshold + || buffer.getBlocks().size() > bufferFlushBlocksNumThreshold)) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Start to flush single buffer. Details - shuffleId:{}, startPartition:{}, endPartition:{}, isHugePartition:{}, bufferSize:{}, blocksNum:{}", + shuffleId, + startPartition, + endPartition, + isHugePartition, + buffer.getSize(), + buffer.getBlocks().size()); + } flushBuffer(buffer, appId, shuffleId, startPartition, endPartition, isHugePartition); return; }