Skip to content

Commit

Permalink
3.2.6发布
Browse files Browse the repository at this point in the history
  • Loading branch information
tywo45 committed Jan 10, 2019
1 parent 5ba9701 commit 7d60631
Show file tree
Hide file tree
Showing 24 changed files with 184 additions and 122 deletions.
2 changes: 1 addition & 1 deletion src/core/pom.xml
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>org.t-io</groupId>
<artifactId>tio-parent</artifactId>
<version>3.2.5.v20190101-RELEASE</version>
<version>3.2.6.v20190110-RELEASE</version>
<relativePath>../parent/pom.xml</relativePath>
</parent>

Expand Down
24 changes: 24 additions & 0 deletions src/core/src/main/java/org/tio/client/ClientChannelContext.java
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.atomic.AtomicInteger;

import org.tio.core.ChannelContext;
import org.tio.core.GroupContext;
Expand All @@ -18,6 +19,13 @@ public class ClientChannelContext extends ChannelContext {
private String bindIp;

private Integer bindPort;

private ReconnRunnable reconnRunnable;

/**
* 连续重连次数,连接成功后,此值会被重置0
*/
private AtomicInteger reconnCount = new AtomicInteger();

/**
* @param groupContext
Expand Down Expand Up @@ -92,4 +100,20 @@ public boolean isServer() {
return false;
}

public ReconnRunnable getReconnRunnable() {
return reconnRunnable;
}

public void setReconnRunnable(ReconnRunnable reconnRunnable) {
this.reconnRunnable = reconnRunnable;
}

public AtomicInteger getReconnCount() {
return reconnCount;
}

public void setReconnCount(AtomicInteger reconnCount) {
this.reconnCount = reconnCount;
}

}
Expand Up @@ -10,6 +10,7 @@
import org.tio.core.GroupContext;
import org.tio.core.Node;
import org.tio.core.ReadCompletionHandler;
import org.tio.core.Tio;
import org.tio.core.ssl.SslFacadeContext;
import org.tio.core.ssl.SslUtils;
import org.tio.core.stat.IpStat;
Expand Down Expand Up @@ -83,7 +84,7 @@ private void handler(Void result, ConnectionCompletionVo attachment, Throwable t
channelContext.setBindIp(bindIp);
channelContext.setBindPort(bindPort);

channelContext.setReconnCount(0);
channelContext.getReconnCount().set(0);
channelContext.setClosed(false);
isConnected = true;

Expand Down Expand Up @@ -118,7 +119,10 @@ private void handler(Void result, ConnectionCompletionVo attachment, Throwable t
attachment.setChannelContext(channelContext);
}
}
ReconnConf.put(channelContext);
boolean f = ReconnConf.put(channelContext);
if (!f) {
Tio.close(channelContext, null, "不需要重连,关闭该连接", true, false);
}
}
} catch (Throwable e) {
log.error(e.toString(), e);
Expand Down
27 changes: 20 additions & 7 deletions src/core/src/main/java/org/tio/client/ReconnConf.java
@@ -1,14 +1,13 @@
package org.tio.client;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.utils.SystemTimer;
import org.tio.utils.thread.pool.DefaultThreadFactory;
import org.tio.utils.thread.pool.SynThreadPoolExecutor;

/**
*
Expand Down Expand Up @@ -44,13 +43,14 @@ public static boolean isNeedReconn(ClientChannelContext clientChannelContext, bo
}

if (reconnConf.getInterval() > 0) {
if (reconnConf.getRetryCount() <= 0 || reconnConf.getRetryCount() >= clientChannelContext.getReconnCount()) {
if (reconnConf.getRetryCount() <= 0 || reconnConf.getRetryCount() > clientChannelContext.getReconnCount().get()) {
if (putIfNeedConn) {
ClientGroupContext clientGroupContext = (ClientGroupContext)clientChannelContext.groupContext;
clientGroupContext.closeds.add(clientChannelContext);
clientChannelContext.stat.timeInReconnQueue = SystemTimer.currTime;
reconnConf.getQueue().add(clientChannelContext);
}

return true;
} else {
log.info("不需要重连{}", clientChannelContext);
Expand Down Expand Up @@ -89,7 +89,7 @@ public static boolean put(ClientChannelContext clientChannelContext) {
/**
* 用来重连的线程池
*/
private volatile ThreadPoolExecutor threadPoolExecutor = null;
private volatile SynThreadPoolExecutor threadPoolExecutor = null;

/**
*
Expand All @@ -101,8 +101,21 @@ public ReconnConf() {
if (threadPoolExecutor == null) {
synchronized (ReconnConf.class) {
if (threadPoolExecutor == null) {
threadPoolExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), DefaultThreadFactory.getInstance("tio-client-reconn"));
// threadPoolExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 60L, TimeUnit.SECONDS,
// new LinkedBlockingQueue<Runnable>(), DefaultThreadFactory.getInstance("tio-client-reconn"));
//
//


LinkedBlockingQueue<Runnable> tioQueue = new LinkedBlockingQueue<>();
// ArrayBlockingQueue<Runnable> tioQueue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
String tioThreadName = "tio-client-reconn";
DefaultThreadFactory defaultThreadFactory = DefaultThreadFactory.getInstance(tioThreadName, Thread.MAX_PRIORITY);

threadPoolExecutor = new SynThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 60L, tioQueue, defaultThreadFactory, tioThreadName);
// tioExecutor = new SynThreadPoolExecutor(AVAILABLE_PROCESSORS * 2, Integer.MAX_VALUE, 60, new SynchronousQueue<Runnable>(), defaultThreadFactory, tioThreadName);


}
}

Expand Down Expand Up @@ -159,7 +172,7 @@ public int getRetryCount() {
/**
* @return the threadPoolExecutor
*/
public ThreadPoolExecutor getThreadPoolExecutor() {
public SynThreadPoolExecutor getThreadPoolExecutor() {
return threadPoolExecutor;
}

Expand Down
69 changes: 69 additions & 0 deletions src/core/src/main/java/org/tio/client/ReconnRunnable.java
@@ -0,0 +1,69 @@
package org.tio.client;

import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.utils.SystemTimer;
import org.tio.utils.thread.pool.AbstractSynRunnable;

/**
* @author tanyw
*
*/
public class ReconnRunnable extends AbstractSynRunnable {
private static Logger log = LoggerFactory.getLogger(ReconnRunnable.class);

ClientChannelContext channelContext = null;
TioClient tioClient = null;

// private static Map<Node, Long> cacheMap = new HashMap<>();

public ReconnRunnable(ClientChannelContext channelContext, TioClient tioClient, Executor executor) {
super(executor);
this.channelContext = channelContext;
this.tioClient = tioClient;
}

@Override
public boolean isNeededExecute() {
return false;
}

@Override
public void runTask() {
channelContext.getReconnCount().incrementAndGet();
ReentrantReadWriteLock closeLock = channelContext.closeLock;
WriteLock writeLock = closeLock.writeLock();
writeLock.lock();
try {
if (!channelContext.isClosed) //已经连上了,不需要再重连了
{
return;
}
long start = SystemTimer.currTime;
tioClient.reconnect(channelContext, 2);
long end = SystemTimer.currTime;
long iv = end - start;
// if (iv >= 100) {
// log.error("{}, 第{}次重连,重连耗时:{} ms", channelContext, channelContext.getReconnCount(), iv);
// } else {
// log.info("{}, 第{}次重连,重连耗时:{} ms", channelContext, channelContext.getReconnCount(), iv);
// }

log.error("{}, 第{}次重连,重连耗时:{} ms", channelContext, channelContext.getReconnCount(), iv);

// if (channelContext.isClosed) {
// // cacheMap.put(channelContext.getServerNode(), SystemTimer.currTime);
// return;
// }
} catch (java.lang.Throwable e) {
log.error(e.toString(), e);
} finally {
writeLock.unlock();
}

}
}
92 changes: 22 additions & 70 deletions src/core/src/main/java/org/tio/client/TioClient.java
Expand Up @@ -9,9 +9,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,63 +30,6 @@
* 2017年4月1日 上午9:29:58
*/
public class TioClient {
/**
* 自动重连任务
* @author tanyaowu
*
*/
private static class ReconnRunnable implements Runnable {
ClientChannelContext channelContext = null;
TioClient tioClient = null;

// private static Map<Node, Long> cacheMap = new HashMap<>();

public ReconnRunnable(ClientChannelContext channelContext, TioClient tioClient) {
this.channelContext = channelContext;
this.tioClient = tioClient;
}

/**
* @see java.lang.Runnable#run()
*
* @author tanyaowu
* 2017年2月2日 下午8:24:40
*
*/
@Override
public void run() {
ReentrantReadWriteLock closeLock = channelContext.closeLock;
WriteLock writeLock = closeLock.writeLock();
writeLock.lock();
try {
if (!channelContext.isClosed) //已经连上了,不需要再重连了
{
return;
}
long start = SystemTimer.currTime;
tioClient.reconnect(channelContext, 2);
long end = SystemTimer.currTime;
long iv = end - start;
if (iv >= 100) {
log.error("{},重连耗时:{} ms", channelContext, iv);
} else {
log.info("{},重连耗时:{} ms", channelContext, iv);
}

if (channelContext.isClosed) {
channelContext.setReconnCount(channelContext.getReconnCount() + 1);
// cacheMap.put(channelContext.getServerNode(), SystemTimer.currTime);
return;
}
} catch (java.lang.Throwable e) {
log.error(e.toString(), e);
} finally {
writeLock.unlock();
}

}
}

private static Logger log = LoggerFactory.getLogger(TioClient.class);

private AsynchronousChannelGroup channelGroup;
Expand Down Expand Up @@ -208,7 +149,7 @@ public ClientChannelContext connect(Node serverNode, String bindIp, Integer bind
* @author tanyaowu
*/
private ClientChannelContext connect(Node serverNode, String bindIp, Integer bindPort, ClientChannelContext initClientChannelContext, Integer timeout, boolean isSyn)
throws Exception {
throws Exception {

AsynchronousSocketChannel asynchronousSocketChannel = null;
ClientChannelContext channelContext = null;
Expand Down Expand Up @@ -303,7 +244,7 @@ public ClientGroupContext getClientGroupContext() {
/**
*
* @param channelContext
* @param timeout
* @param timeout 单位秒
* @return
* @throws Exception
*
Expand All @@ -327,15 +268,15 @@ public void setClientGroupContext(ClientGroupContext clientGroupContext) {
*
*/
private void startHeartbeatTask() {
final ClientGroupStat clientGroupStat = (ClientGroupStat)clientGroupContext.groupStat;
final ClientGroupStat clientGroupStat = (ClientGroupStat) clientGroupContext.groupStat;
final ClientAioHandler aioHandler = clientGroupContext.getClientAioHandler();

final String id = clientGroupContext.getId();
new Thread(new Runnable() {
@Override
public void run() {
while (!clientGroupContext.isStopped()) {
// final long heartbeatTimeout = clientGroupContext.heartbeatTimeout;
// final long heartbeatTimeout = clientGroupContext.heartbeatTimeout;
if (clientGroupContext.heartbeatTimeout <= 0) {
log.warn("用户取消了框架层面的心跳定时发送功能,请用户自己去完成心跳机制");
break;
Expand Down Expand Up @@ -367,8 +308,8 @@ public void run() {
}
if (log.isInfoEnabled()) {
log.info("[{}]: curr:{}, closed:{}, received:({}p)({}b), handled:{}, sent:({}p)({}b)", id, set.size(), clientGroupStat.closed.get(),
clientGroupStat.receivedPackets.get(), clientGroupStat.receivedBytes.get(), clientGroupStat.handledPackets.get(),
clientGroupStat.sentPackets.get(), clientGroupStat.sentBytes.get());
clientGroupStat.receivedPackets.get(), clientGroupStat.receivedBytes.get(), clientGroupStat.handledPackets.get(),
clientGroupStat.sentPackets.get(), clientGroupStat.sentBytes.get());
}

} catch (Throwable e) {
Expand Down Expand Up @@ -405,6 +346,7 @@ private void startReconnTask() {
@Override
public void run() {
while (!clientGroupContext.isStopped()) {
log.error("closeds:{}, connections:{}", clientGroupContext.closeds.size(), clientGroupContext.connections.size());
//log.info("准备重连");
LinkedBlockingQueue<ChannelContext> queue = reconnConf.getQueue();
ClientChannelContext channelContext = null;
Expand All @@ -422,7 +364,7 @@ public void run() {
{
continue;
}

SslFacadeContext sslFacadeContext = channelContext.sslFacadeContext;
if (sslFacadeContext != null) {
sslFacadeContext.setHandshakeCompleted(false);
Expand All @@ -441,9 +383,20 @@ public void run() {
if (channelContext.isRemoved || !channelContext.isClosed) //已经删除的和已经连上的,不需要重新再连
{
continue;
} else {
ReconnRunnable runnable = channelContext.getReconnRunnable();
if (runnable == null) {
synchronized (channelContext) {
runnable = channelContext.getReconnRunnable();
if (runnable == null) {
runnable = new ReconnRunnable(channelContext, TioClient.this, reconnConf.getThreadPoolExecutor());
channelContext.setReconnRunnable(runnable);
}
}
}
runnable.execute();
// reconnConf.getThreadPoolExecutor().execute(runnable);
}
ReconnRunnable runnable = new ReconnRunnable(channelContext, TioClient.this);
reconnConf.getThreadPoolExecutor().execute(runnable);
}
}
});
Expand All @@ -470,8 +423,7 @@ public boolean stop() {
} catch (Exception e1) {
log.error(e1.toString(), e1);
}



clientGroupContext.setStopped(true);
try {
ret = ret && clientGroupContext.groupExecutor.awaitTermination(6000, TimeUnit.SECONDS);
Expand Down

0 comments on commit 7d60631

Please sign in to comment.