Skip to content

Commit

Permalink
[apache#1686] feat(netty): Support pending tasks number metrics for N…
Browse files Browse the repository at this point in the history
…etty EventLoopGroup
  • Loading branch information
rickyma committed May 9, 2024
1 parent 30bf8dc commit a057f2e
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,15 @@ public abstract class NettyMetrics extends RPCMetrics {

private static final String NETTY_ACTIVE_CONNECTION = "netty_active_connection";
private static final String NETTY_HANDLE_EXCEPTION = "netty_handle_exception";
private static final String NETTY_PENDING_TASKS_NUM_FOR_BOSS_GROUP =
"netty_pending_tasks_num_for_boss_group";
private static final String NETTY_PENDING_TASKS_NUM_FOR_WORKER_GROUP =
"netty_pending_tasks_num_for_worker_group";

protected Gauge.Child gaugeNettyActiveConn;
protected Counter.Child counterNettyException;
protected Gauge.Child gaugeNettyPendingTasksNumForBossGroup;
protected Gauge.Child gaugeNettyPendingTasksNumForWorkerGroup;

public NettyMetrics(RssConf rssConf, String tags) {
super(rssConf, tags);
Expand All @@ -38,6 +44,10 @@ public NettyMetrics(RssConf rssConf, String tags) {
public void registerGeneralMetrics() {
gaugeNettyActiveConn = metricsManager.addLabeledGauge(NETTY_ACTIVE_CONNECTION);
counterNettyException = metricsManager.addLabeledCounter(NETTY_HANDLE_EXCEPTION);
gaugeNettyPendingTasksNumForBossGroup =
metricsManager.addLabeledGauge(NETTY_PENDING_TASKS_NUM_FOR_BOSS_GROUP);
gaugeNettyPendingTasksNumForWorkerGroup =
metricsManager.addLabeledGauge(NETTY_PENDING_TASKS_NUM_FOR_WORKER_GROUP);
}

public Counter.Child getCounterNettyException() {
Expand All @@ -47,4 +57,12 @@ public Counter.Child getCounterNettyException() {
public Gauge.Child getGaugeNettyActiveConn() {
return gaugeNettyActiveConn;
}

public Gauge.Child getGaugeNettyPendingTasksNumForBossGroup() {
return gaugeNettyPendingTasksNumForBossGroup;
}

public Gauge.Child getGaugeNettyPendingTasksNumForWorkerGroup() {
return gaugeNettyPendingTasksNumForWorkerGroup;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ public class ShuffleServerConf extends RssBaseConf {
.defaultValue(10 * 1000L)
.withDescription("Direct memory usage tracker interval to MetricSystem (ms)");

public static final ConfigOption<Long> SERVER_NETTY_PENDING_TASKS_NUM_TRACKER_INTERVAL =
ConfigOptions.key("rss.server.netty.pending.tasks.num.updateMetricsIntervalMs")
.longType()
.defaultValue(10 * 1000L)
.withDescription(
"How often to collect Netty pending tasks number metrics (in milliseconds)");

public static final ConfigOption<Integer> SERVER_FLUSH_LOCALFILE_THREAD_POOL_SIZE =
ConfigOptions.key("rss.server.flush.localfile.threadPool.size")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package org.apache.uniffle.server.netty;

import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.StreamSupport;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
Expand All @@ -31,6 +34,7 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.NettyRuntime;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import io.netty.util.internal.SystemPropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,6 +47,7 @@
import org.apache.uniffle.common.util.ExitUtils;
import org.apache.uniffle.common.util.NettyUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;

Expand All @@ -59,6 +64,12 @@ public class StreamServer implements ServerInterface {
private ShuffleServerConf shuffleServerConf;
private ChannelFuture channelFuture;

private final ScheduledExecutorService nettyPendingTasksTracker =
Executors.newSingleThreadScheduledExecutor(
ThreadUtils.getThreadFactory("NettyPendingTasksTracker"));
/** Interval to poll for Netty pending tasks number for Netty metrics, in milliseconds */
private final long pendingTasksNumMetricsPollingInterval;

public StreamServer(ShuffleServer shuffleServer) {
this.shuffleServer = shuffleServer;
this.shuffleServerConf = shuffleServer.getShuffleServerConf();
Expand All @@ -80,6 +91,37 @@ public StreamServer(ShuffleServer shuffleServer) {
shuffleBossGroup = new NioEventLoopGroup(acceptThreads);
shuffleWorkerGroup = new NioEventLoopGroup(workerThreads);
}
this.pendingTasksNumMetricsPollingInterval =
shuffleServerConf.getLong(
ShuffleServerConf.SERVER_NETTY_PENDING_TASKS_NUM_TRACKER_INTERVAL);
startMonitoringPendingTasks();
}

private void startMonitoringPendingTasks() {
nettyPendingTasksTracker.scheduleAtFixedRate(
() -> {
int pendingTasksNumForBossGroup = getPendingTasksForEventLoopGroup(shuffleBossGroup);
shuffleServer
.getNettyMetrics()
.getGaugeNettyPendingTasksNumForBossGroup()
.set(pendingTasksNumForBossGroup);

int pendingTasksNumForWorkerGroup = getPendingTasksForEventLoopGroup(shuffleWorkerGroup);
shuffleServer
.getNettyMetrics()
.getGaugeNettyPendingTasksNumForWorkerGroup()
.set(pendingTasksNumForWorkerGroup);
},
0L,
pendingTasksNumMetricsPollingInterval,
TimeUnit.MILLISECONDS);
}

private int getPendingTasksForEventLoopGroup(EventLoopGroup eventLoopGroup) {
return StreamSupport.stream(eventLoopGroup.spliterator(), false)
.filter(eventExecutor -> eventExecutor instanceof SingleThreadEventExecutor)
.mapToInt(eventExecutor -> ((SingleThreadEventExecutor) eventExecutor).pendingTasks())
.sum();
}

private ServerBootstrap bootstrapChannel(
Expand Down Expand Up @@ -177,6 +219,9 @@ public void stop() {
shuffleBossGroup = null;
shuffleWorkerGroup = null;
}
if (!nettyPendingTasksTracker.isShutdown()) {
nettyPendingTasksTracker.shutdown();
}
}

@Override
Expand Down

0 comments on commit a057f2e

Please sign in to comment.