Skip to content

Commit

Permalink
优化线程池配置及线程池监控
Browse files Browse the repository at this point in the history
  • Loading branch information
夜色 committed Dec 15, 2016
1 parent 331f65a commit 3e016b9
Show file tree
Hide file tree
Showing 14 changed files with 171 additions and 203 deletions.
48 changes: 9 additions & 39 deletions conf/reference.conf
Expand Up @@ -141,53 +141,23 @@ mp {
#线程池配置
thread {
pool {
boss { //netty server boss
min:1 //boss 只需要一个线程即可
max:1
queue-size:0
}

work { //netty server boss
min:0 //0表示线程数根据cpu核数动态调整(2*cpu)
max:0
queue-size:0
}

event-bus {
min:4
max:4
queue-size:10000 //大量的online,offline,
}

http-proxy {
min:0 //0表示线程数根据cpu核数动态调整(2*cpu)
max:0
queue-size:0
}
conn-work:0 //0表示线程数根据cpu核数动态调整(2*cpu)
http-work:0 //http proxy netty client work pool
push-task:2 //消息推送任务处理
push-client:4 //消息推送回调处理,该线程池在客户端运行
ack-timer:1 //处理ACK消息超时

biz { //其他业务
event-bus { //用户处理内部事件分发
min:1
max:4
queue-size:16
max:16
queue-size:10000 //大量的online,offline,
}

mq { //用户上下线消息, 踢人等
min:2
min:1
max:4
queue-size:10000
}

push-callback { //消息推送
min:2
max:2
queue-size:0
}

push-center { //消息推送
min:4
max:4
queue-size:0
}
}
}

Expand Down
Expand Up @@ -32,8 +32,9 @@ public interface ExecutorFactory {
String SERVER_BOSS = "sb";
String SERVER_WORK = "sw";
String HTTP_CLIENT_WORK = "hcw";
String PUSH_CALLBACK = "pc";
String PUSH_TIMER = "pt";
String PUSH_CLIENT = "pc";
String PUSH_TASK = "pt";
String ACK_TIMER = "at";
String EVENT_BUS = "eb";
String MQ = "r";
String BIZ = "b";
Expand Down
Expand Up @@ -19,18 +19,17 @@

package com.mpush.client.push;

import com.mpush.api.push.PushException;
import com.mpush.api.service.BaseService;
import com.mpush.api.service.Listener;
import com.mpush.tools.thread.NamedPoolThreadFactory;
import com.mpush.tools.thread.pool.ThreadPoolManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.*;

import static com.mpush.tools.thread.ThreadNames.T_PUSH_REQ_TIMER;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Created by ohun on 2015/12/30.
Expand All @@ -41,7 +40,6 @@ public class PushRequestBus extends BaseService {
public static final PushRequestBus I = new PushRequestBus();
private final Logger logger = LoggerFactory.getLogger(PushRequestBus.class);
private final Map<Integer, PushRequest> reqQueue = new ConcurrentHashMap<>(1024);
private Executor executor;
private ScheduledExecutorService scheduledExecutor;

private PushRequestBus() {
Expand All @@ -57,23 +55,20 @@ public PushRequest getAndRemove(int sessionId) {
}

public void asyncCall(Runnable runnable) {
executor.execute(runnable);
scheduledExecutor.execute(runnable);
}

@Override
protected void doStart(Listener listener) throws Throwable {
executor = ThreadPoolManager.I.getPushCallbackExecutor();
scheduledExecutor = new ScheduledThreadPoolExecutor(1, new NamedPoolThreadFactory(T_PUSH_REQ_TIMER), (r, e) -> {
logger.error("one push request was rejected, request=" + r);
throw new PushException("one push request was rejected. request=" + r);
});
scheduledExecutor = ThreadPoolManager.I.getPushClientTimer();
listener.onSuccess();
}

@Override
protected void doStop(Listener listener) throws Throwable {
scheduledExecutor.shutdown();
((ExecutorService) executor).shutdown();
if (scheduledExecutor != null) {
scheduledExecutor.shutdown();
}
listener.onSuccess();
}
}
32 changes: 22 additions & 10 deletions mpush-core/src/main/java/com/mpush/core/ack/AckMessageQueue.java
Expand Up @@ -19,33 +19,32 @@

package com.mpush.core.ack;

import com.mpush.tools.thread.NamedPoolThreadFactory;
import com.mpush.api.service.BaseService;
import com.mpush.api.service.Listener;
import com.mpush.tools.thread.pool.ThreadPoolManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.*;

import static com.mpush.tools.thread.ThreadNames.T_ARK_REQ_TIMER;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Created by ohun on 16/9/5.
*
* @author ohun@live.cn (夜色)
*/
public final class AckMessageQueue {
public final class AckMessageQueue extends BaseService {
private final Logger logger = LoggerFactory.getLogger(AckMessageQueue.class);

private static final int DEFAULT_TIMEOUT = 3000;
public static final AckMessageQueue I = new AckMessageQueue();

private final ConcurrentMap<Integer, AckContext> queue = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduledExecutor;
private ScheduledExecutorService scheduledExecutor;

private AckMessageQueue() {
scheduledExecutor = new ScheduledThreadPoolExecutor(1,
new NamedPoolThreadFactory(T_ARK_REQ_TIMER),
(r, e) -> logger.error("one ack context was rejected, context=" + r)
);
}

public void add(int sessionId, AckContext context, int timeout) {
Expand All @@ -61,4 +60,17 @@ public AckContext getAndRemove(int sessionId) {
return queue.remove(sessionId);
}

@Override
protected void doStart(Listener listener) throws Throwable {
scheduledExecutor = ThreadPoolManager.I.getAckTimer();
super.doStart(listener);
}

@Override
protected void doStop(Listener listener) throws Throwable {
if (scheduledExecutor != null) {
scheduledExecutor.shutdown();
}
super.doStop(listener);
}
}
5 changes: 4 additions & 1 deletion mpush-core/src/main/java/com/mpush/core/push/PushCenter.java
Expand Up @@ -21,6 +21,7 @@

import com.mpush.api.service.BaseService;
import com.mpush.api.service.Listener;
import com.mpush.core.ack.AckMessageQueue;
import com.mpush.tools.thread.pool.ThreadPoolManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -53,14 +54,16 @@ public void delayTask(int delay, PushTask task) {

@Override
protected void doStart(Listener listener) throws Throwable {
executor = ThreadPoolManager.I.getPushCenterTimer();
executor = ThreadPoolManager.I.getPushTaskTimer();
AckMessageQueue.I.start();
logger.info("push center start success");
listener.onSuccess();
}

@Override
protected void doStop(Listener listener) throws Throwable {
executor.shutdown();
AckMessageQueue.I.stop();
logger.info("push center stop success");
listener.onSuccess();
}
Expand Down
Expand Up @@ -30,6 +30,7 @@
import com.mpush.tools.config.CC;
import com.mpush.tools.thread.NamedPoolThreadFactory;
import com.mpush.tools.thread.ThreadNames;
import com.mpush.tools.thread.pool.ThreadPoolManager;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -100,6 +101,14 @@ public void init() {
}
}

@Override
public void start(Listener listener) {
super.start(listener);
if (this.workerGroup != null) {// 增加线程池监控
ThreadPoolManager.I.register("conn-worker", this.workerGroup);
}
}

@Override
public void stop(Listener listener) {
super.stop(listener);
Expand All @@ -112,7 +121,7 @@ public void stop(Listener listener) {

@Override
protected int getWorkThreadNum() {
return CC.mp.thread.pool.work.max;
return CC.mp.thread.pool.conn_work;
}

@Override
Expand Down
Expand Up @@ -21,30 +21,34 @@

import com.mpush.monitor.quota.ThreadPoolQuota;
import com.mpush.tools.thread.pool.ThreadPoolManager;
import io.netty.channel.EventLoopGroup;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

import static com.mpush.tools.thread.pool.ThreadPoolManager.getPoolInfo;

public class JVMThreadPool implements ThreadPoolQuota {
public static final JVMThreadPool I = new JVMThreadPool();

private JVMThreadPool() {
}


@Override
public Object monitor(Object... args) {
Map<String, Object> map = new HashMap<>();
Map<String, Executor> pool = ThreadPoolManager.I.getActivePools();
for (Map.Entry<String, Executor> entry : pool.entrySet()) {
Map<String, Object> result = new HashMap<>();
Map<String, Executor> pools = ThreadPoolManager.I.getActivePools();
for (Map.Entry<String, Executor> entry : pools.entrySet()) {
String serviceName = entry.getKey();
Executor executor = entry.getValue();
if (executor instanceof ThreadPoolExecutor) {
map.put(serviceName, ThreadPoolManager.getPoolInfo((ThreadPoolExecutor) executor));
result.put(serviceName, getPoolInfo((ThreadPoolExecutor) executor));
} else if (executor instanceof EventLoopGroup) {
result.put(serviceName, getPoolInfo((EventLoopGroup) executor));
}
}
return map;
return result;
}
}
Expand Up @@ -44,6 +44,7 @@
import java.net.URI;
import java.util.concurrent.TimeUnit;

import static com.mpush.tools.config.CC.mp.thread.pool.http_work;
import static com.mpush.tools.thread.ThreadNames.T_HTTP_TIMER;
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
Expand Down Expand Up @@ -129,10 +130,7 @@ private void writeRequest(Channel channel, RequestContext context) {

@Override
protected void doStart(Listener listener) throws Throwable {
workerGroup = new NioEventLoopGroup(
CC.mp.thread.pool.http_proxy.max,
new DefaultThreadFactory(ThreadNames.T_HTTP_CLIENT)
);
workerGroup = new NioEventLoopGroup(http_work, new DefaultThreadFactory(ThreadNames.T_HTTP_CLIENT));
b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
Expand Down
Expand Up @@ -280,4 +280,8 @@ protected boolean useNettyEpoll() {
}
return false;
}

public EventLoopGroup getWorkerGroup() {
return workerGroup;
}
}
Expand Up @@ -46,30 +46,33 @@ public void testPush() throws Exception {
PushSender sender = PushSender.create();
sender.start().join();

PushMsg msg = PushMsg.build(MsgType.MESSAGE, "this a first push.");
msg.setMsgId("msgId_0");
for (int i = 0; i < 10; i++) {

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
PushMsg msg = PushMsg.build(MsgType.MESSAGE, "this a first push.");
msg.setMsgId("msgId_" + i);

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

PushContext context = PushContext.build(msg)
.setAckModel(AckModel.AUTO_ACK)
//.setUserId("user-0")
.setBroadcast(true)
//.setTags(Sets.newHashSet("test"))
//.setCondition("tags&&tags.indexOf('test')!=-1")
//.setUserIds(Arrays.asList("user-0", "user-1"))
.setTimeout(20000)
.setCallback(new PushCallback() {
@Override
public void onResult(PushResult result) {
System.err.println("\n\n" + result);
}
});
FutureTask<Boolean> future = sender.send(context);
PushContext context = PushContext.build(msg)
.setAckModel(AckModel.AUTO_ACK)
.setUserId("user-" + i)
.setBroadcast(false)
//.setTags(Sets.newHashSet("test"))
//.setCondition("tags&&tags.indexOf('test')!=-1")
//.setUserIds(Arrays.asList("user-0", "user-1"))
.setTimeout(2000)
.setCallback(new PushCallback() {
@Override
public void onResult(PushResult result) {
System.err.println("\n\n" + result);
}
});
FutureTask<Boolean> future = sender.send(context);
}

LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(30));
}
Expand Down

0 comments on commit 3e016b9

Please sign in to comment.