From c1d92f2b71ce45f4ed3f541ac533bb8b4377118b Mon Sep 17 00:00:00 2001 From: wubin01 Date: Thu, 10 May 2018 16:08:25 +0800 Subject: [PATCH 1/2] mdf msg queue --- .../tron/common/overlay/server/MessageQueue.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/tron/common/overlay/server/MessageQueue.java b/src/main/java/org/tron/common/overlay/server/MessageQueue.java index de25643a107..43b317c7d21 100644 --- a/src/main/java/org/tron/common/overlay/server/MessageQueue.java +++ b/src/main/java/org/tron/common/overlay/server/MessageQueue.java @@ -69,8 +69,11 @@ public void activate(ChannelHandlerContext ctx) { } Message msg = msgQueue.take(); ctx.writeAndFlush(msg.getSendData()).addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); + }catch (InterruptedException e){ + logger.info("Close send thread, peer {}", ctx.channel().remoteAddress()); + break; }catch (Exception e) { - logger.error("send message failed, {}, error info: {}", ctx.channel().remoteAddress(), e.getMessage()); + logger.error("Send message failed, {}, error info: {}", ctx.channel().remoteAddress(), e.getMessage()); } } }); @@ -102,6 +105,16 @@ public void close() { sendMsgFlag = false; if(sendTask != null && !sendTask.isCancelled()){ sendTask.cancel(false); + sendTask = null; + } + if (sendMsgThread != null){ + try{ + sendMsgThread.interrupt(); + sendMsgThread.join(100); + sendMsgThread = null; + }catch (Exception e){ + logger.warn("Join send thread failed, peer {}", ctx.channel().remoteAddress()); + } } } From ceeebfda147a4e95badfba420ce940dcb02a0f86 Mon Sep 17 00:00:00 2001 From: wubin01 Date: Thu, 10 May 2018 16:17:58 +0800 Subject: [PATCH 2/2] mdf msg queue --- .../java/org/tron/common/overlay/server/MessageQueue.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/tron/common/overlay/server/MessageQueue.java b/src/main/java/org/tron/common/overlay/server/MessageQueue.java index 43b317c7d21..bcae990f558 100644 --- a/src/main/java/org/tron/common/overlay/server/MessageQueue.java +++ b/src/main/java/org/tron/common/overlay/server/MessageQueue.java @@ -26,7 +26,7 @@ public class MessageQueue { private static final Logger logger = LoggerFactory.getLogger("MessageQueue"); - private boolean sendMsgFlag = false; + private volatile boolean sendMsgFlag = false; private Thread sendMsgThread; @@ -69,9 +69,6 @@ public void activate(ChannelHandlerContext ctx) { } Message msg = msgQueue.take(); ctx.writeAndFlush(msg.getSendData()).addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); - }catch (InterruptedException e){ - logger.info("Close send thread, peer {}", ctx.channel().remoteAddress()); - break; }catch (Exception e) { logger.error("Send message failed, {}, error info: {}", ctx.channel().remoteAddress(), e.getMessage()); } @@ -109,8 +106,7 @@ public void close() { } if (sendMsgThread != null){ try{ - sendMsgThread.interrupt(); - sendMsgThread.join(100); + sendMsgThread.join(20); sendMsgThread = null; }catch (Exception e){ logger.warn("Join send thread failed, peer {}", ctx.channel().remoteAddress());