From ee8f9aec4149c153fff86447e63f1632ef241ad8 Mon Sep 17 00:00:00 2001 From: jiangyuanshu <317787106@qq.com> Date: Thu, 14 Jul 2022 11:03:56 +0800 Subject: [PATCH 1/2] optimize blockLock; MessageQueue does not send message if channel is disconnected --- .../common/overlay/server/ChannelManager.java | 16 ++++++++-------- .../tron/common/overlay/server/MessageQueue.java | 10 ++++++++++ .../org/tron/core/net/service/SyncService.java | 8 ++++---- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/framework/src/main/java/org/tron/common/overlay/server/ChannelManager.java b/framework/src/main/java/org/tron/common/overlay/server/ChannelManager.java index dbcb73baa8b..8c452a2f880 100644 --- a/framework/src/main/java/org/tron/common/overlay/server/ChannelManager.java +++ b/framework/src/main/java/org/tron/common/overlay/server/ChannelManager.java @@ -34,7 +34,7 @@ @Component public class ChannelManager { - private final Map activePeers = new ConcurrentHashMap<>(); + private final Map activeChannels = new ConcurrentHashMap<>(); @Autowired private PeerServer peerServer; @Autowired @@ -121,7 +121,7 @@ public void processDisconnect(Channel channel, ReasonCode reason) { public void notifyDisconnect(Channel channel) { syncPool.onDisconnect(channel); - activePeers.values().remove(channel); + activeChannels.values().remove(channel); if (channel != null) { if (channel.getNodeStatistics() != null) { channel.getNodeStatistics().notifyDisconnect(); @@ -146,7 +146,7 @@ public synchronized boolean processPeer(Channel peer) { return false; } - if (!peer.isActive() && activePeers.size() >= maxActivePeers) { + if (!peer.isActive() && activeChannels.size() >= maxActivePeers) { peer.disconnect(TOO_MANY_PEERS); return false; } @@ -157,7 +157,7 @@ public synchronized boolean processPeer(Channel peer) { } } - Channel channel = activePeers.get(peer.getNodeIdWrapper()); + Channel channel = activeChannels.get(peer.getNodeIdWrapper()); if (channel != null) { if (channel.getStartTime() > peer.getStartTime()) { logger.info("Disconnect connection established later, {}", channel.getNode()); @@ -167,14 +167,14 @@ public synchronized boolean processPeer(Channel peer) { return false; } } - activePeers.put(peer.getNodeIdWrapper(), peer); - logger.info("Add active peer {}, total active peers: {}", peer, activePeers.size()); + activeChannels.put(peer.getNodeIdWrapper(), peer); + logger.info("Add active peer {}, total active peers: {}", peer, activeChannels.size()); return true; } public int getConnectionNum(InetAddress inetAddress) { int cnt = 0; - for (Channel channel : activePeers.values()) { + for (Channel channel : activeChannels.values()) { if (channel.getInetAddress().equals(inetAddress)) { cnt++; } @@ -183,7 +183,7 @@ public int getConnectionNum(InetAddress inetAddress) { } public Collection getActivePeers() { - return activePeers.values(); + return activeChannels.values(); } public Cache getRecentlyDisconnected() { diff --git a/framework/src/main/java/org/tron/common/overlay/server/MessageQueue.java b/framework/src/main/java/org/tron/common/overlay/server/MessageQueue.java index 1919042232f..3e3fc3b2c44 100644 --- a/framework/src/main/java/org/tron/common/overlay/server/MessageQueue.java +++ b/framework/src/main/java/org/tron/common/overlay/server/MessageQueue.java @@ -69,6 +69,11 @@ public void activate(ChannelHandlerContext ctx) { continue; } Message msg = msgQueue.take(); + if (channel.isDisconnect()) { + logger.warn("Failed to send to {} as channel has closed, {}", + ctx.channel().remoteAddress(), msg); + return; + } ctx.writeAndFlush(msg.getSendData()).addListener((ChannelFutureListener) future -> { if (!future.isSuccess() && !channel.isDisconnect()) { logger.warn("Failed to send to {}, {}", ctx.channel().remoteAddress(), msg); @@ -92,6 +97,11 @@ public void setChannel(Channel channel) { } public void fastSend(Message msg) { + if (channel.isDisconnect()) { + logger.warn("Fast send to {} failed as channel has closed, {} ", + ctx.channel().remoteAddress(), msg); + return; + } logger.info("Fast send to {}, {} ", ctx.channel().remoteAddress(), msg); ctx.writeAndFlush(msg.getSendData()).addListener((ChannelFutureListener) future -> { if (!future.isSuccess() && !channel.isDisconnect()) { diff --git a/framework/src/main/java/org/tron/core/net/service/SyncService.java b/framework/src/main/java/org/tron/core/net/service/SyncService.java index 64a721bf014..67c3629d22b 100644 --- a/framework/src/main/java/org/tron/core/net/service/SyncService.java +++ b/framework/src/main/java/org/tron/core/net/service/SyncService.java @@ -234,8 +234,8 @@ private synchronized void handleSyncBlock() { isProcessed[0] = false; - synchronized (tronNetDelegate.getBlockLock()) { - blockWaitToProcess.forEach((msg, peerConnection) -> { + blockWaitToProcess.forEach((msg, peerConnection) -> { + synchronized (tronNetDelegate.getBlockLock()) { if (peerConnection.isDisconnect()) { blockWaitToProcess.remove(msg); invalid(msg.getBlockId()); @@ -254,8 +254,8 @@ private synchronized void handleSyncBlock() { isProcessed[0] = true; processSyncBlock(msg.getBlockCapsule()); } - }); - } + } + }); } } From 6ccb55406513f8182b01a5c53e930b51f3fc0172 Mon Sep 17 00:00:00 2001 From: jiangyuanshu <317787106@qq.com> Date: Mon, 18 Jul 2022 15:21:18 +0800 Subject: [PATCH 2/2] rollback --- .../org/tron/common/overlay/server/MessageQueue.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/framework/src/main/java/org/tron/common/overlay/server/MessageQueue.java b/framework/src/main/java/org/tron/common/overlay/server/MessageQueue.java index 3e3fc3b2c44..1919042232f 100644 --- a/framework/src/main/java/org/tron/common/overlay/server/MessageQueue.java +++ b/framework/src/main/java/org/tron/common/overlay/server/MessageQueue.java @@ -69,11 +69,6 @@ public void activate(ChannelHandlerContext ctx) { continue; } Message msg = msgQueue.take(); - if (channel.isDisconnect()) { - logger.warn("Failed to send to {} as channel has closed, {}", - ctx.channel().remoteAddress(), msg); - return; - } ctx.writeAndFlush(msg.getSendData()).addListener((ChannelFutureListener) future -> { if (!future.isSuccess() && !channel.isDisconnect()) { logger.warn("Failed to send to {}, {}", ctx.channel().remoteAddress(), msg); @@ -97,11 +92,6 @@ public void setChannel(Channel channel) { } public void fastSend(Message msg) { - if (channel.isDisconnect()) { - logger.warn("Fast send to {} failed as channel has closed, {} ", - ctx.channel().remoteAddress(), msg); - return; - } logger.info("Fast send to {}, {} ", ctx.channel().remoteAddress(), msg); ctx.writeAndFlush(msg.getSendData()).addListener((ChannelFutureListener) future -> { if (!future.isSuccess() && !channel.isDisconnect()) {