Skip to content

Commit

Permalink
代码整理
Browse files Browse the repository at this point in the history
  • Loading branch information
闫逍旭 committed Dec 28, 2015
1 parent 595d53e commit cfb0247
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 30 deletions.
Expand Up @@ -2,6 +2,7 @@


import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;


import io.netty.buffer.PooledByteBufAllocator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -130,16 +131,30 @@ public void initChannel(SocketChannel ch) throws Exception {
*/ */
b.option(ChannelOption.SO_BACKLOG, 1024); b.option(ChannelOption.SO_BACKLOG, 1024);


/**
* TCP层面的接收和发送缓冲区大小设置,
* 在Netty中分别对应ChannelOption的SO_SNDBUF和SO_RCVBUF,
* 需要根据推送消息的大小,合理设置,对于海量长连接,通常32K是个不错的选择。
*/
b.childOption(ChannelOption.SO_SNDBUF, 32 * 1024);
b.childOption(ChannelOption.SO_RCVBUF, 32 * 1024);

/*** /***
* option()是提供给NioServerSocketChannel用来接收进来的连接。 * option()是提供给NioServerSocketChannel用来接收进来的连接。
* childOption()是提供给由父管道ServerChannel接收到的连接, * childOption()是提供给由父管道ServerChannel接收到的连接,
* 在这个例子中也是NioServerSocketChannel。 * 在这个例子中也是NioServerSocketChannel。
*/ */
b.childOption(ChannelOption.SO_KEEPALIVE, true); b.childOption(ChannelOption.SO_KEEPALIVE, true);


b.option(ChannelOption.ALLOCATOR, NettySharedHolder.byteBufAllocator);
b.childOption(ChannelOption.ALLOCATOR, NettySharedHolder.byteBufAllocator);


/**
* 在Netty 4中实现了一个新的ByteBuf内存池,它是一个纯Java版本的 jemalloc (Facebook也在用)。
* 现在,Netty不会再因为用零填充缓冲区而浪费内存带宽了。不过,由于它不依赖于GC,开发人员需要小心内存泄漏。
* 如果忘记在处理程序中释放缓冲区,那么内存使用率会无限地增长。
* Netty默认不使用内存池,需要在创建客户端或者服务端的时候进行指定
*/
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);


/*** /***
* 绑定端口并启动去接收进来的连接 * 绑定端口并启动去接收进来的连接
Expand Down
Expand Up @@ -11,12 +11,7 @@


public class NettySharedHolder { public class NettySharedHolder {


public static final Timer timer = new HashedWheelTimer(new NamedThreadFactory(ThreadNameSpace.NETTY_TIMER)); public static final Timer timer = new HashedWheelTimer(new NamedThreadFactory(ThreadNameSpace.NETTY_TIMER));


public static final ByteBufAllocator byteBufAllocator;

static {
byteBufAllocator = UnpooledByteBufAllocator.DEFAULT;
}


} }
Expand Up @@ -89,7 +89,7 @@ public void run() {
} }




public void allocThreadpool(final String serviceUniqueName, int corePoolSize, int maximumPoolSize) public void allocThreadPool(final String serviceUniqueName, int corePoolSize, int maximumPoolSize)
throws Exception { throws Exception {
if (poolCache.containsKey(serviceUniqueName)) { // 对同一个服务重复分配线程池时,抛出异常 if (poolCache.containsKey(serviceUniqueName)) { // 对同一个服务重复分配线程池时,抛出异常
throw new Exception(MessageFormat.format( throw new Exception(MessageFormat.format(
Expand Down Expand Up @@ -149,7 +149,7 @@ public Executor getThreadExecutor(String serviceUniqueName,int corePoolSize, int
return executor; return executor;
}else{ }else{
try{ try{
allocThreadpool(serviceUniqueName, corePoolSize, maximumPoolSize); allocThreadPool(serviceUniqueName, corePoolSize, maximumPoolSize);
}catch(Exception e){ }catch(Exception e){
log.error("allocThreadPool exception",e); log.error("allocThreadPool exception",e);
} }
Expand All @@ -160,7 +160,7 @@ public Executor getThreadExecutor(String serviceUniqueName,int corePoolSize, int
} }
}else{ }else{
try{ try{
allocThreadpool(serviceUniqueName, corePoolSize, maximumPoolSize); allocThreadPool(serviceUniqueName, corePoolSize, maximumPoolSize);
}catch(Exception e){ }catch(Exception e){
log.error("allocThreadPool exception",e); log.error("allocThreadPool exception",e);
} }
Expand Down
Expand Up @@ -8,23 +8,25 @@


public class ThreadPoolUtil { public class ThreadPoolUtil {


private static final ThreadPoolManager threadPoolManager = new ThreadPoolManager(Constants.MIN_POOL_SIZE, Constants.MAX_POOL_SIZE, private static final ThreadPoolManager threadPoolManager =
Constants.THREAD_QUEUE_SIZE); new ThreadPoolManager(Constants.MIN_POOL_SIZE, Constants.MAX_POOL_SIZE, Constants.THREAD_QUEUE_SIZE);


private static Executor bossExecutor = ThreadPoolUtil.getThreadPoolManager().getThreadExecutor(ThreadNameSpace.NETTY_BOSS,Constants.MIN_BOSS_POOL_SIZE,Constants.MAX_BOSS_POLL_SIZE); private static Executor bossExecutor = ThreadPoolUtil.getThreadPoolManager()
private static Executor workExecutor = ThreadPoolUtil.getThreadPoolManager().getThreadExecutor(ThreadNameSpace.NETTY_WORKER,Constants.MIN_WORK_POOL_SIZE,Constants.MAX_WORK_POOL_SIZE); .getThreadExecutor(ThreadNameSpace.NETTY_BOSS, Constants.MIN_BOSS_POOL_SIZE, Constants.MAX_BOSS_POLL_SIZE);

private static Executor workExecutor = ThreadPoolUtil.getThreadPoolManager()
public static ThreadPoolManager getThreadPoolManager() { .getThreadExecutor(ThreadNameSpace.NETTY_WORKER, Constants.MIN_WORK_POOL_SIZE, Constants.MAX_WORK_POOL_SIZE);
return threadPoolManager;
} public static ThreadPoolManager getThreadPoolManager() {

return threadPoolManager;
public static Executor getBossExecutor(){ }
return bossExecutor;
} public static Executor getBossExecutor() {

return bossExecutor;
public static Executor getWorkExecutor(){ }
return workExecutor;
} public static Executor getWorkExecutor() {

return workExecutor;

}


} }

0 comments on commit cfb0247

Please sign in to comment.