diff --git a/conf/reference.conf b/conf/reference.conf index 3b819311..3dad203f 100644 --- a/conf/reference.conf +++ b/conf/reference.conf @@ -141,53 +141,23 @@ mp { #线程池配置 thread { pool { - boss { //netty server boss - min:1 //boss 只需要一个线程即可 - max:1 - queue-size:0 - } - - work { //netty server boss - min:0 //0表示线程数根据cpu核数动态调整(2*cpu) - max:0 - queue-size:0 - } - - event-bus { - min:4 - max:4 - queue-size:10000 //大量的online,offline, - } - - http-proxy { - min:0 //0表示线程数根据cpu核数动态调整(2*cpu) - max:0 - queue-size:0 - } + conn-work:0 //0表示线程数根据cpu核数动态调整(2*cpu) + http-work:0 //http proxy netty client work pool + push-task:2 //消息推送任务处理 + push-client:4 //消息推送回调处理,该线程池在客户端运行 + ack-timer:1 //处理ACK消息超时 - biz { //其他业务 + event-bus { //用户处理内部事件分发 min:1 - max:4 - queue-size:16 + max:16 + queue-size:10000 //大量的online,offline, } mq { //用户上下线消息, 踢人等 - min:2 + min:1 max:4 queue-size:10000 } - - push-callback { //消息推送 - min:2 - max:2 - queue-size:0 - } - - push-center { //消息推送 - min:4 - max:4 - queue-size:0 - } } } diff --git a/mpush-api/src/main/java/com/mpush/api/spi/common/ExecutorFactory.java b/mpush-api/src/main/java/com/mpush/api/spi/common/ExecutorFactory.java index 742c8b55..d8a8f8f7 100644 --- a/mpush-api/src/main/java/com/mpush/api/spi/common/ExecutorFactory.java +++ b/mpush-api/src/main/java/com/mpush/api/spi/common/ExecutorFactory.java @@ -32,8 +32,9 @@ public interface ExecutorFactory { String SERVER_BOSS = "sb"; String SERVER_WORK = "sw"; String HTTP_CLIENT_WORK = "hcw"; - String PUSH_CALLBACK = "pc"; - String PUSH_TIMER = "pt"; + String PUSH_CLIENT = "pc"; + String PUSH_TASK = "pt"; + String ACK_TIMER = "at"; String EVENT_BUS = "eb"; String MQ = "r"; String BIZ = "b"; diff --git a/mpush-client/src/main/java/com/mpush/client/push/PushRequestBus.java b/mpush-client/src/main/java/com/mpush/client/push/PushRequestBus.java index 9437ec3f..d82c9a08 100644 --- a/mpush-client/src/main/java/com/mpush/client/push/PushRequestBus.java +++ b/mpush-client/src/main/java/com/mpush/client/push/PushRequestBus.java @@ -19,18 +19,17 @@ package com.mpush.client.push; -import com.mpush.api.push.PushException; import com.mpush.api.service.BaseService; import com.mpush.api.service.Listener; -import com.mpush.tools.thread.NamedPoolThreadFactory; import com.mpush.tools.thread.pool.ThreadPoolManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; -import java.util.concurrent.*; - -import static com.mpush.tools.thread.ThreadNames.T_PUSH_REQ_TIMER; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * Created by ohun on 2015/12/30. @@ -41,7 +40,6 @@ public class PushRequestBus extends BaseService { public static final PushRequestBus I = new PushRequestBus(); private final Logger logger = LoggerFactory.getLogger(PushRequestBus.class); private final Map reqQueue = new ConcurrentHashMap<>(1024); - private Executor executor; private ScheduledExecutorService scheduledExecutor; private PushRequestBus() { @@ -57,23 +55,20 @@ public PushRequest getAndRemove(int sessionId) { } public void asyncCall(Runnable runnable) { - executor.execute(runnable); + scheduledExecutor.execute(runnable); } @Override protected void doStart(Listener listener) throws Throwable { - executor = ThreadPoolManager.I.getPushCallbackExecutor(); - scheduledExecutor = new ScheduledThreadPoolExecutor(1, new NamedPoolThreadFactory(T_PUSH_REQ_TIMER), (r, e) -> { - logger.error("one push request was rejected, request=" + r); - throw new PushException("one push request was rejected. request=" + r); - }); + scheduledExecutor = ThreadPoolManager.I.getPushClientTimer(); listener.onSuccess(); } @Override protected void doStop(Listener listener) throws Throwable { - scheduledExecutor.shutdown(); - ((ExecutorService) executor).shutdown(); + if (scheduledExecutor != null) { + scheduledExecutor.shutdown(); + } listener.onSuccess(); } } diff --git a/mpush-core/src/main/java/com/mpush/core/ack/AckMessageQueue.java b/mpush-core/src/main/java/com/mpush/core/ack/AckMessageQueue.java index ccbc5f64..d7fb6232 100644 --- a/mpush-core/src/main/java/com/mpush/core/ack/AckMessageQueue.java +++ b/mpush-core/src/main/java/com/mpush/core/ack/AckMessageQueue.java @@ -19,33 +19,32 @@ package com.mpush.core.ack; -import com.mpush.tools.thread.NamedPoolThreadFactory; +import com.mpush.api.service.BaseService; +import com.mpush.api.service.Listener; +import com.mpush.tools.thread.pool.ThreadPoolManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.*; - -import static com.mpush.tools.thread.ThreadNames.T_ARK_REQ_TIMER; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * Created by ohun on 16/9/5. * * @author ohun@live.cn (夜色) */ -public final class AckMessageQueue { +public final class AckMessageQueue extends BaseService { private final Logger logger = LoggerFactory.getLogger(AckMessageQueue.class); private static final int DEFAULT_TIMEOUT = 3000; public static final AckMessageQueue I = new AckMessageQueue(); private final ConcurrentMap queue = new ConcurrentHashMap<>(); - private final ScheduledExecutorService scheduledExecutor; + private ScheduledExecutorService scheduledExecutor; private AckMessageQueue() { - scheduledExecutor = new ScheduledThreadPoolExecutor(1, - new NamedPoolThreadFactory(T_ARK_REQ_TIMER), - (r, e) -> logger.error("one ack context was rejected, context=" + r) - ); } public void add(int sessionId, AckContext context, int timeout) { @@ -61,4 +60,17 @@ public AckContext getAndRemove(int sessionId) { return queue.remove(sessionId); } + @Override + protected void doStart(Listener listener) throws Throwable { + scheduledExecutor = ThreadPoolManager.I.getAckTimer(); + super.doStart(listener); + } + + @Override + protected void doStop(Listener listener) throws Throwable { + if (scheduledExecutor != null) { + scheduledExecutor.shutdown(); + } + super.doStop(listener); + } } diff --git a/mpush-core/src/main/java/com/mpush/core/push/PushCenter.java b/mpush-core/src/main/java/com/mpush/core/push/PushCenter.java index 46fe25ab..fee8db7c 100644 --- a/mpush-core/src/main/java/com/mpush/core/push/PushCenter.java +++ b/mpush-core/src/main/java/com/mpush/core/push/PushCenter.java @@ -21,6 +21,7 @@ import com.mpush.api.service.BaseService; import com.mpush.api.service.Listener; +import com.mpush.core.ack.AckMessageQueue; import com.mpush.tools.thread.pool.ThreadPoolManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +54,8 @@ public void delayTask(int delay, PushTask task) { @Override protected void doStart(Listener listener) throws Throwable { - executor = ThreadPoolManager.I.getPushCenterTimer(); + executor = ThreadPoolManager.I.getPushTaskTimer(); + AckMessageQueue.I.start(); logger.info("push center start success"); listener.onSuccess(); } @@ -61,6 +63,7 @@ protected void doStart(Listener listener) throws Throwable { @Override protected void doStop(Listener listener) throws Throwable { executor.shutdown(); + AckMessageQueue.I.stop(); logger.info("push center stop success"); listener.onSuccess(); } diff --git a/mpush-core/src/main/java/com/mpush/core/server/ConnectionServer.java b/mpush-core/src/main/java/com/mpush/core/server/ConnectionServer.java index 6442d506..4260cf30 100644 --- a/mpush-core/src/main/java/com/mpush/core/server/ConnectionServer.java +++ b/mpush-core/src/main/java/com/mpush/core/server/ConnectionServer.java @@ -30,6 +30,7 @@ import com.mpush.tools.config.CC; import com.mpush.tools.thread.NamedPoolThreadFactory; import com.mpush.tools.thread.ThreadNames; +import com.mpush.tools.thread.pool.ThreadPoolManager; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelOption; @@ -100,6 +101,14 @@ public void init() { } } + @Override + public void start(Listener listener) { + super.start(listener); + if (this.workerGroup != null) {// 增加线程池监控 + ThreadPoolManager.I.register("conn-worker", this.workerGroup); + } + } + @Override public void stop(Listener listener) { super.stop(listener); @@ -112,7 +121,7 @@ public void stop(Listener listener) { @Override protected int getWorkThreadNum() { - return CC.mp.thread.pool.work.max; + return CC.mp.thread.pool.conn_work; } @Override diff --git a/mpush-monitor/src/main/java/com/mpush/monitor/quota/impl/JVMThreadPool.java b/mpush-monitor/src/main/java/com/mpush/monitor/quota/impl/JVMThreadPool.java index e6af9142..d88a522f 100644 --- a/mpush-monitor/src/main/java/com/mpush/monitor/quota/impl/JVMThreadPool.java +++ b/mpush-monitor/src/main/java/com/mpush/monitor/quota/impl/JVMThreadPool.java @@ -21,30 +21,34 @@ import com.mpush.monitor.quota.ThreadPoolQuota; import com.mpush.tools.thread.pool.ThreadPoolManager; +import io.netty.channel.EventLoopGroup; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; +import static com.mpush.tools.thread.pool.ThreadPoolManager.getPoolInfo; + public class JVMThreadPool implements ThreadPoolQuota { public static final JVMThreadPool I = new JVMThreadPool(); private JVMThreadPool() { } - @Override public Object monitor(Object... args) { - Map map = new HashMap<>(); - Map pool = ThreadPoolManager.I.getActivePools(); - for (Map.Entry entry : pool.entrySet()) { + Map result = new HashMap<>(); + Map pools = ThreadPoolManager.I.getActivePools(); + for (Map.Entry entry : pools.entrySet()) { String serviceName = entry.getKey(); Executor executor = entry.getValue(); if (executor instanceof ThreadPoolExecutor) { - map.put(serviceName, ThreadPoolManager.getPoolInfo((ThreadPoolExecutor) executor)); + result.put(serviceName, getPoolInfo((ThreadPoolExecutor) executor)); + } else if (executor instanceof EventLoopGroup) { + result.put(serviceName, getPoolInfo((EventLoopGroup) executor)); } } - return map; + return result; } } diff --git a/mpush-netty/src/main/java/com/mpush/netty/http/NettyHttpClient.java b/mpush-netty/src/main/java/com/mpush/netty/http/NettyHttpClient.java index 1c54ee80..804b1067 100644 --- a/mpush-netty/src/main/java/com/mpush/netty/http/NettyHttpClient.java +++ b/mpush-netty/src/main/java/com/mpush/netty/http/NettyHttpClient.java @@ -44,6 +44,7 @@ import java.net.URI; import java.util.concurrent.TimeUnit; +import static com.mpush.tools.config.CC.mp.thread.pool.http_work; import static com.mpush.tools.thread.ThreadNames.T_HTTP_TIMER; import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION; import static io.netty.handler.codec.http.HttpHeaderNames.HOST; @@ -129,10 +130,7 @@ private void writeRequest(Channel channel, RequestContext context) { @Override protected void doStart(Listener listener) throws Throwable { - workerGroup = new NioEventLoopGroup( - CC.mp.thread.pool.http_proxy.max, - new DefaultThreadFactory(ThreadNames.T_HTTP_CLIENT) - ); + workerGroup = new NioEventLoopGroup(http_work, new DefaultThreadFactory(ThreadNames.T_HTTP_CLIENT)); b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); diff --git a/mpush-netty/src/main/java/com/mpush/netty/server/NettyTCPServer.java b/mpush-netty/src/main/java/com/mpush/netty/server/NettyTCPServer.java index 539d6054..dd4fdb6e 100644 --- a/mpush-netty/src/main/java/com/mpush/netty/server/NettyTCPServer.java +++ b/mpush-netty/src/main/java/com/mpush/netty/server/NettyTCPServer.java @@ -280,4 +280,8 @@ protected boolean useNettyEpoll() { } return false; } + + public EventLoopGroup getWorkerGroup() { + return workerGroup; + } } diff --git a/mpush-test/src/main/java/com/mpush/test/push/PushClientTestMain.java b/mpush-test/src/main/java/com/mpush/test/push/PushClientTestMain.java index 4164cb71..bee7de11 100644 --- a/mpush-test/src/main/java/com/mpush/test/push/PushClientTestMain.java +++ b/mpush-test/src/main/java/com/mpush/test/push/PushClientTestMain.java @@ -46,30 +46,33 @@ public void testPush() throws Exception { PushSender sender = PushSender.create(); sender.start().join(); - PushMsg msg = PushMsg.build(MsgType.MESSAGE, "this a first push."); - msg.setMsgId("msgId_0"); + for (int i = 0; i < 10; i++) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } + PushMsg msg = PushMsg.build(MsgType.MESSAGE, "this a first push."); + msg.setMsgId("msgId_" + i); + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } - PushContext context = PushContext.build(msg) - .setAckModel(AckModel.AUTO_ACK) - //.setUserId("user-0") - .setBroadcast(true) - //.setTags(Sets.newHashSet("test")) - //.setCondition("tags&&tags.indexOf('test')!=-1") - //.setUserIds(Arrays.asList("user-0", "user-1")) - .setTimeout(20000) - .setCallback(new PushCallback() { - @Override - public void onResult(PushResult result) { - System.err.println("\n\n" + result); - } - }); - FutureTask future = sender.send(context); + PushContext context = PushContext.build(msg) + .setAckModel(AckModel.AUTO_ACK) + .setUserId("user-" + i) + .setBroadcast(false) + //.setTags(Sets.newHashSet("test")) + //.setCondition("tags&&tags.indexOf('test')!=-1") + //.setUserIds(Arrays.asList("user-0", "user-1")) + .setTimeout(2000) + .setCallback(new PushCallback() { + @Override + public void onResult(PushResult result) { + System.err.println("\n\n" + result); + } + }); + FutureTask future = sender.send(context); + } LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(30)); } diff --git a/mpush-tools/src/main/java/com/mpush/tools/config/CC.java b/mpush-tools/src/main/java/com/mpush/tools/config/CC.java index 5a3e23ee..c9c0c14c 100644 --- a/mpush-tools/src/main/java/com/mpush/tools/config/CC.java +++ b/mpush-tools/src/main/java/com/mpush/tools/config/CC.java @@ -164,21 +164,11 @@ interface pool { Config cfg = thread.cfg.getObject("pool").toConfig(); - interface boss { - Config cfg = pool.cfg.getObject("boss").toConfig(); - int min = cfg.getInt("min"); - int max = cfg.getInt("max"); - int queue_size = cfg.getInt("queue-size"); - - } - - interface work { - Config cfg = pool.cfg.getObject("work").toConfig(); - int min = cfg.getInt("min"); - int max = cfg.getInt("max"); - int queue_size = cfg.getInt("queue-size"); - - } + int conn_work = cfg.getInt("conn-work"); + int http_work = cfg.getInt("http-work"); + int push_task = cfg.getInt("push-task"); + int push_client = cfg.getInt("push-client"); + int ack_timer = cfg.getInt("ack-timer"); interface event_bus { Config cfg = pool.cfg.getObject("event-bus").toConfig(); @@ -188,42 +178,11 @@ interface event_bus { } - interface http_proxy { - Config cfg = pool.cfg.getObject("http-proxy").toConfig(); - int min = cfg.getInt("min"); - int max = cfg.getInt("max"); - int queue_size = cfg.getInt("queue-size"); - - } - - interface biz { - Config cfg = pool.cfg.getObject("biz").toConfig(); - int min = cfg.getInt("min"); - int max = cfg.getInt("max"); - int queue_size = cfg.getInt("queue-size"); - - } - interface mq { Config cfg = pool.cfg.getObject("mq").toConfig(); int min = cfg.getInt("min"); int max = cfg.getInt("max"); int queue_size = cfg.getInt("queue-size"); - - } - - interface push_callback { - Config cfg = pool.cfg.getObject("push-callback").toConfig(); - int min = cfg.getInt("min"); - int max = cfg.getInt("max"); - int queue_size = cfg.getInt("queue-size"); - } - - interface push_center { - Config cfg = pool.cfg.getObject("push-center").toConfig(); - int min = cfg.getInt("min"); - int max = cfg.getInt("max"); - int queue_size = cfg.getInt("queue-size"); } } } diff --git a/mpush-tools/src/main/java/com/mpush/tools/thread/ThreadNames.java b/mpush-tools/src/main/java/com/mpush/tools/thread/ThreadNames.java index 03166688..36ab997e 100644 --- a/mpush-tools/src/main/java/com/mpush/tools/thread/ThreadNames.java +++ b/mpush-tools/src/main/java/com/mpush/tools/thread/ThreadNames.java @@ -34,11 +34,8 @@ public final class ThreadNames { public static final String T_HTTP_CLIENT = NS + "-http-client-work"; public static final String T_EVENT_BUS = NS + "-event"; public static final String T_MQ = NS + "-mq"; - public static final String T_ZK = NS + "-zk"; - public static final String T_BIZ = NS + "-biz"; - public static final String T_PUSH_CALLBACK = NS + "-push-callback"; - public static final String T_PUSH_REQ_TIMER = NS + "-push-req-timer"; public static final String T_ARK_REQ_TIMER = NS + "-ack-timer"; + public static final String T_PUSH_CLIENT_TIMER = NS + "-push-client-timer"; public static final String T_PUSH_CENTER_TIMER = NS + "-push-center-timer"; public static final String T_CONN_TIMER = NS + "-conn-check-timer"; public static final String T_HTTP_TIMER = NS + "-http-client-timer"; diff --git a/mpush-tools/src/main/java/com/mpush/tools/thread/pool/DefaultExecutorFactory.java b/mpush-tools/src/main/java/com/mpush/tools/thread/pool/DefaultExecutorFactory.java index 5edd663a..68919566 100644 --- a/mpush-tools/src/main/java/com/mpush/tools/thread/pool/DefaultExecutorFactory.java +++ b/mpush-tools/src/main/java/com/mpush/tools/thread/pool/DefaultExecutorFactory.java @@ -23,11 +23,14 @@ import com.mpush.api.spi.Spi; import com.mpush.api.spi.common.ExecutorFactory; import com.mpush.tools.config.CC; +import com.mpush.tools.log.Logs; import com.mpush.tools.thread.NamedPoolThreadFactory; import java.util.concurrent.*; -import static com.mpush.tools.config.CC.mp.thread.pool.push_center.min; +import static com.mpush.tools.config.CC.mp.thread.pool.ack_timer; +import static com.mpush.tools.config.CC.mp.thread.pool.push_client; +import static com.mpush.tools.config.CC.mp.thread.pool.push_task; import static com.mpush.tools.thread.ThreadNames.*; /** @@ -66,26 +69,26 @@ public Executor get(String name) { break; case MQ: config = ThreadPoolConfig - .buildFixed(T_MQ, - CC.mp.thread.pool.mq.min, - CC.mp.thread.pool.mq.queue_size - ); - break; - case PUSH_CALLBACK: - config = ThreadPoolConfig - .build(T_PUSH_CALLBACK) - .setCorePoolSize(CC.mp.thread.pool.push_callback.min) - .setMaxPoolSize(CC.mp.thread.pool.push_callback.max) + .build(T_MQ) + .setCorePoolSize(CC.mp.thread.pool.mq.min) + .setMaxPoolSize(CC.mp.thread.pool.mq.max) .setKeepAliveSeconds(TimeUnit.SECONDS.toSeconds(10)) - .setQueueCapacity(CC.mp.thread.pool.push_callback.queue_size) - .setRejectedPolicy(ThreadPoolConfig.REJECTED_POLICY_CALLER_RUNS); + .setQueueCapacity(CC.mp.thread.pool.mq.queue_size); break; - case PUSH_TIMER: - return new ScheduledThreadPoolExecutor(min, new NamedPoolThreadFactory(T_PUSH_CENTER_TIMER), + case PUSH_CLIENT: + return new ScheduledThreadPoolExecutor(push_client, new NamedPoolThreadFactory(T_PUSH_CLIENT_TIMER), + (r, e) -> r.run() // run caller thread + ); + case PUSH_TASK: + return new ScheduledThreadPoolExecutor(push_task, new NamedPoolThreadFactory(T_PUSH_CENTER_TIMER), (r, e) -> { - throw new PushException("one push request was rejected. task=" + r); + throw new PushException("one push task was rejected. task=" + r); } ); + case ACK_TIMER: + return new ScheduledThreadPoolExecutor(ack_timer, new NamedPoolThreadFactory(T_ARK_REQ_TIMER), + (r, e) -> Logs.PUSH.error("one ack context was rejected, context=" + r) + ); default: throw new IllegalArgumentException("no executor for " + name); } diff --git a/mpush-tools/src/main/java/com/mpush/tools/thread/pool/ThreadPoolManager.java b/mpush-tools/src/main/java/com/mpush/tools/thread/pool/ThreadPoolManager.java index 4e7b2466..40d4c3ca 100644 --- a/mpush-tools/src/main/java/com/mpush/tools/thread/pool/ThreadPoolManager.java +++ b/mpush-tools/src/main/java/com/mpush/tools/thread/pool/ThreadPoolManager.java @@ -21,11 +21,16 @@ import com.mpush.api.spi.common.ExecutorFactory; import com.mpush.tools.thread.NamedThreadFactory; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SingleThreadEventLoop; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.ThreadProperties; import java.util.HashMap; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -35,62 +40,47 @@ public final class ThreadPoolManager { private final ExecutorFactory executorFactory = ExecutorFactory.create(); private final NamedThreadFactory threadFactory = new NamedThreadFactory(); - private Executor eventBusExecutor; - private Executor redisExecutor; - private Executor pushCallbackExecutor; - private ScheduledExecutorService pushCenterTimer; + private final Map pools = new ConcurrentHashMap<>(); public final Thread newThread(String name, Runnable target) { return threadFactory.newThread(name, target); } public Executor getRedisExecutor() { - if (redisExecutor == null) { - synchronized (this) { - redisExecutor = executorFactory.get(ExecutorFactory.MQ); - } - } - return redisExecutor; + return pools.computeIfAbsent("mq", s -> executorFactory.get(ExecutorFactory.MQ)); } public Executor getEventBusExecutor() { - if (eventBusExecutor == null) { - synchronized (this) { - eventBusExecutor = executorFactory.get(ExecutorFactory.EVENT_BUS); - } - } - return eventBusExecutor; + return pools.computeIfAbsent("event-bus", s -> executorFactory.get(ExecutorFactory.EVENT_BUS)); } - public Executor getPushCallbackExecutor() { - if (pushCallbackExecutor == null) { - synchronized (this) { - pushCallbackExecutor = executorFactory.get(ExecutorFactory.PUSH_CALLBACK); - } - } - return pushCallbackExecutor; + public ScheduledExecutorService getPushClientTimer() { + return (ScheduledExecutorService) pools.computeIfAbsent("push-client-timer" + , s -> executorFactory.get(ExecutorFactory.PUSH_CLIENT)); } - public ScheduledExecutorService getPushCenterTimer() { - if (pushCenterTimer == null) { - synchronized (this) { - pushCenterTimer = (ScheduledExecutorService) executorFactory.get(ExecutorFactory.PUSH_TIMER); - } - } - return pushCenterTimer; + public ScheduledExecutorService getPushTaskTimer() { + return (ScheduledExecutorService) pools.computeIfAbsent("push-task-timer" + , s -> executorFactory.get(ExecutorFactory.PUSH_TASK)); + } + + public ScheduledExecutorService getAckTimer() { + return (ScheduledExecutorService) pools.computeIfAbsent("ack-timer" + , s -> executorFactory.get(ExecutorFactory.ACK_TIMER)); + } + + public void register(String name, Executor executor) { + Objects.requireNonNull(name); + Objects.requireNonNull(executor); + pools.put(name, executor); } public Map getActivePools() { - Map map = new HashMap<>(); - if (eventBusExecutor != null) map.put("eventBusExecutor", eventBusExecutor); - if (redisExecutor != null) map.put("redisExecutor", redisExecutor); - if (pushCallbackExecutor != null) map.put("pushCallbackExecutor", pushCallbackExecutor); - if (pushCenterTimer != null) map.put("pushCenterTimer", pushCenterTimer); - return map; + return pools; } public static Map getPoolInfo(ThreadPoolExecutor executor) { - Map info = new HashMap<>(); + Map info = new HashMap<>(5); info.put("corePoolSize", executor.getCorePoolSize()); info.put("maxPoolSize", executor.getMaximumPoolSize()); info.put("activeCount(workingThread)", executor.getActiveCount()); @@ -98,4 +88,24 @@ public static Map getPoolInfo(ThreadPoolExecutor executor) { info.put("queueSize(blockedTask)", executor.getQueue().size()); return info; } + + public static Map getPoolInfo(EventLoopGroup executors) { + Map info = new HashMap<>(3); + int poolSize = 0, queueSize = 0, activeCount = 0; + for (EventExecutor e : executors) { + poolSize++; + if (e instanceof SingleThreadEventLoop) { + SingleThreadEventLoop executor = (SingleThreadEventLoop) e; + queueSize += executor.pendingTasks(); + ThreadProperties tp = executor.threadProperties(); + if (tp.state() == Thread.State.RUNNABLE) { + activeCount++; + } + } + } + info.put("poolSize(workThread)", poolSize); + info.put("activeCount(workingThread)", activeCount); + info.put("queueSize(blockedTask)", queueSize); + return info; + } }