From f5382767266e255654b37bbbe50e4c8144208bd1 Mon Sep 17 00:00:00 2001 From: liangzhiyan Date: Tue, 29 May 2018 11:17:26 +0800 Subject: [PATCH 1/7] add test case --- .../org/tron/core/net/node/BaseNetTest.java | 114 ++++++++++++++++++ .../org/tron/core/net/node/BroadTest.java | 6 +- .../org/tron/core/net/node/TcpNetTest.java | 22 ++++ 3 files changed, 137 insertions(+), 5 deletions(-) create mode 100644 src/test/java/org/tron/core/net/node/BaseNetTest.java create mode 100644 src/test/java/org/tron/core/net/node/TcpNetTest.java diff --git a/src/test/java/org/tron/core/net/node/BaseNetTest.java b/src/test/java/org/tron/core/net/node/BaseNetTest.java new file mode 100644 index 00000000000..ed22c271a14 --- /dev/null +++ b/src/test/java/org/tron/core/net/node/BaseNetTest.java @@ -0,0 +1,114 @@ +package org.tron.core.net.node; + +import java.io.File; +import lombok.extern.slf4j.Slf4j; +import org.junit.After; +import org.junit.Before; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.tron.common.application.Application; +import org.tron.common.application.ApplicationFactory; +import org.tron.common.overlay.client.PeerClient; +import org.tron.common.overlay.server.ChannelManager; +import org.tron.common.overlay.server.SyncPool; +import org.tron.common.utils.FileUtil; +import org.tron.core.config.DefaultConfig; +import org.tron.core.config.args.Args; +import org.tron.core.db.Manager; +import org.tron.core.services.RpcApiService; +import org.tron.core.services.WitnessService; + +@Slf4j +public abstract class BaseNetTest { + + protected static AnnotationConfigApplicationContext context; + protected NodeImpl node; + protected RpcApiService rpcApiService; + protected PeerClient peerClient; + protected ChannelManager channelManager; + protected SyncPool pool; + private String dbPath; + private String dbDirectory; + private String indexDirectory; + + private static boolean go = false; + + public BaseNetTest(String dbPath, String dbDirectory, String indexDirectory) { + this.dbPath = dbPath; + this.dbDirectory = dbDirectory; + this.indexDirectory = indexDirectory; + } + + @Before + public void init() { + new Thread(new Runnable() { + @Override + public void run() { + logger.info("Full node running."); + Args.setParam( + new String[]{ + "--output-directory", dbPath, + "--storage-db-directory", dbDirectory, + "--storage-index-directory", indexDirectory + }, + "config.conf" + ); + Args cfgArgs = Args.getInstance(); + cfgArgs.setNodeListenPort(17889); + cfgArgs.setNodeDiscoveryEnable(false); + cfgArgs.getSeedNode().getIpList().clear(); + cfgArgs.setNeedSyncCheck(false); + cfgArgs.setNodeExternalIp("127.0.0.1"); + + context = new AnnotationConfigApplicationContext(DefaultConfig.class); + + if (cfgArgs.isHelp()) { + logger.info("Here is the help message."); + return; + } + Application appT = ApplicationFactory.create(context); + rpcApiService = context.getBean(RpcApiService.class); + appT.addService(rpcApiService); + if (cfgArgs.isWitness()) { + appT.addService(new WitnessService(appT)); + } + appT.initServices(cfgArgs); + appT.startServices(); + + node = context.getBean(NodeImpl.class); + peerClient = context.getBean(PeerClient.class); + channelManager = context.getBean(ChannelManager.class); + pool = context.getBean(SyncPool.class); + Manager dbManager = context.getBean(Manager.class); + NodeDelegate nodeDelegate = new NodeDelegateImpl(dbManager); + node.setNodeDelegate(nodeDelegate); + pool.init(node); + + appT.startup(); + rpcApiService.blockUntilShutdown(); + } + }).start(); + int tryTimes = 1; + while (tryTimes <= 30 && (node == null || peerClient == null + || channelManager == null || pool == null)) { + try { + logger.info("node:{},peerClient:{},channelManager:{},pool:{}", node, peerClient, + channelManager, pool); + Thread.sleep(1000 * tryTimes); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + ++tryTimes; + } + } + } + + protected void buildClient() { + + } + + @After + public void destroy() { + Args.clearParam(); + FileUtil.deleteDir(new File("output-nodeImplTest")); + } +} diff --git a/src/test/java/org/tron/core/net/node/BroadTest.java b/src/test/java/org/tron/core/net/node/BroadTest.java index c7ba5da9ef5..84258b898c3 100644 --- a/src/test/java/org/tron/core/net/node/BroadTest.java +++ b/src/test/java/org/tron/core/net/node/BroadTest.java @@ -24,7 +24,6 @@ import org.tron.common.utils.FileUtil; import org.tron.common.utils.ReflectUtils; import org.tron.common.utils.Sha256Hash; -import org.tron.core.Constant; import org.tron.core.config.DefaultConfig; import org.tron.core.config.args.Args; import org.tron.core.db.Manager; @@ -234,6 +233,7 @@ public void run() { } } + private static int trys = 0; private void prepare() { try { ExecutorService advertiseLoopThread = ReflectUtils.getFieldValue(node, "broadPool"); @@ -242,10 +242,6 @@ private void prepare() { ReflectUtils.setFieldValue(node, "isAdvertiseActive", false); ReflectUtils.setFieldValue(node, "isFetchActive", false); -// ScheduledExecutorService mainWorker = ReflectUtils -// .getFieldValue(channelManager, "mainWorker"); -// mainWorker.shutdownNow(); - Node node = new Node( "enode://e437a4836b77ad9d9ffe73ee782ef2614e6d8370fcf62191a6e488276e23717147073a7ce0b444d485fff5a0c34c4577251a7a990cf80d8542e21b95aa8c5e6c@127.0.0.1:17889"); new Thread(new Runnable() { diff --git a/src/test/java/org/tron/core/net/node/TcpNetTest.java b/src/test/java/org/tron/core/net/node/TcpNetTest.java new file mode 100644 index 00000000000..f2b62a23e33 --- /dev/null +++ b/src/test/java/org/tron/core/net/node/TcpNetTest.java @@ -0,0 +1,22 @@ +package org.tron.core.net.node; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Test; + +@Slf4j +public class TcpNetTest extends BaseNetTest { + + private static final String dbPath = "output-nodeImplTest/tcpNet"; + private static final String dbDirectory = "db_tcp_test"; + private static final String indexDirectory = "index_tcp_test"; + + public TcpNetTest() { + super(dbPath, dbDirectory, indexDirectory); + } + + @Test + public void test() throws InterruptedException { + Thread.sleep(2000); + + } +} From 78dd23deb0d05a8323150c3fffe71c6e5c066d61 Mon Sep 17 00:00:00 2001 From: liangzhiyan Date: Tue, 29 May 2018 11:43:45 +0800 Subject: [PATCH 2/7] add test case --- .../org/tron/core/net/node/BaseNetTest.java | 47 ++++++++++++++++-- .../org/tron/core/net/node/TcpNetTest.java | 49 ++++++++++++++++++- 2 files changed, 89 insertions(+), 7 deletions(-) diff --git a/src/test/java/org/tron/core/net/node/BaseNetTest.java b/src/test/java/org/tron/core/net/node/BaseNetTest.java index ed22c271a14..a2b9ccbb669 100644 --- a/src/test/java/org/tron/core/net/node/BaseNetTest.java +++ b/src/test/java/org/tron/core/net/node/BaseNetTest.java @@ -1,6 +1,19 @@ package org.tron.core.net.node; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.DefaultMessageSizeEstimator; +import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; import java.io.File; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.junit.After; import org.junit.Before; @@ -26,11 +39,13 @@ public abstract class BaseNetTest { protected PeerClient peerClient; protected ChannelManager channelManager; protected SyncPool pool; + protected Manager manager; + private String dbPath; private String dbDirectory; private String indexDirectory; - private static boolean go = false; + private static final int port = 17889; public BaseNetTest(String dbPath, String dbDirectory, String indexDirectory) { this.dbPath = dbPath; @@ -53,7 +68,7 @@ public void run() { "config.conf" ); Args cfgArgs = Args.getInstance(); - cfgArgs.setNodeListenPort(17889); + cfgArgs.setNodeListenPort(port); cfgArgs.setNodeDiscoveryEnable(false); cfgArgs.getSeedNode().getIpList().clear(); cfgArgs.setNeedSyncCheck(false); @@ -78,8 +93,8 @@ public void run() { peerClient = context.getBean(PeerClient.class); channelManager = context.getBean(ChannelManager.class); pool = context.getBean(SyncPool.class); - Manager dbManager = context.getBean(Manager.class); - NodeDelegate nodeDelegate = new NodeDelegateImpl(dbManager); + manager = context.getBean(Manager.class); + NodeDelegate nodeDelegate = new NodeDelegateImpl(manager); node.setNodeDelegate(nodeDelegate); pool.init(node); @@ -102,8 +117,30 @@ public void run() { } } - protected void buildClient() { + protected Channel createClient() throws InterruptedException { + NioEventLoopGroup group = new NioEventLoopGroup(1); + Bootstrap b = new Bootstrap(); + b.group(group).channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + // limit the size of receiving buffer to 1024 + ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(256 * 1024)); + ch.config().setOption(ChannelOption.SO_RCVBUF, 256 * 1024); + ch.config().setOption(ChannelOption.SO_BACKLOG, 1024); + ch.pipeline() + .addLast("readTimeoutHandler", new ReadTimeoutHandler(600, TimeUnit.SECONDS)) + .addLast("writeTimeoutHandler", new WriteTimeoutHandler(600, TimeUnit.SECONDS)); + ch.pipeline().addLast("protoPender", new ProtobufVarint32LengthFieldPrepender()); + ch.pipeline().addLast("lengthDecode", new ProtobufVarint32FrameDecoder()); + // be aware of channel closing + ch.closeFuture(); + } + }).option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000) + .option(ChannelOption.MESSAGE_SIZE_ESTIMATOR, DefaultMessageSizeEstimator.DEFAULT); + return b.connect("127.0.0.1", port).sync().channel(); } @After diff --git a/src/test/java/org/tron/core/net/node/TcpNetTest.java b/src/test/java/org/tron/core/net/node/TcpNetTest.java index f2b62a23e33..e28bcf5111d 100644 --- a/src/test/java/org/tron/core/net/node/TcpNetTest.java +++ b/src/test/java/org/tron/core/net/node/TcpNetTest.java @@ -1,7 +1,12 @@ package org.tron.core.net.node; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; import lombok.extern.slf4j.Slf4j; import org.junit.Test; +import org.tron.common.overlay.discover.Node; +import org.tron.common.overlay.message.HelloMessage; +import org.tron.core.capsule.BlockCapsule.BlockId; @Slf4j public class TcpNetTest extends BaseNetTest { @@ -15,8 +20,48 @@ public TcpNetTest() { } @Test - public void test() throws InterruptedException { + public void normalTest() throws InterruptedException { +// Thread.sleep(2000); + Channel channel = createClient(); + org.tron.common.overlay.discover.Node node = new Node( + "enode://e437a4836b77ad9d9ffe73ee782ef2614e6d8370fcf62191a6e488276e23717147073a7ce0b444d485fff5a0c34c4577251a7a990cf80d8542e21b95aa8c5e6c@127.0.0.1:17889"); + BlockId genesisBlockId = new BlockId(); + HelloMessage message = new HelloMessage(node, + System.currentTimeMillis(), manager.getGenesisBlockId(), manager.getSolidBlockId(), + manager.getHeadBlockId()); + //Unpooled.wrappedBuffer(ArrayUtils.add("nihao".getBytes(), 0, (byte) 1)) + channel.writeAndFlush(message.getSendData()) + .addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + logger.info("send msg success"); + } else { + logger.error("send msg fail", future.cause()); + } + }); + + Thread.sleep(2000); + } + + @Test + public void errorGenesisBlockIdTest() throws InterruptedException { +// Thread.sleep(2000); + Channel channel = createClient(); + org.tron.common.overlay.discover.Node node = new Node( + "enode://e437a4836b77ad9d9ffe73ee782ef2614e6d8370fcf62191a6e488276e23717147073a7ce0b444d485fff5a0c34c4577251a7a990cf80d8542e21b95aa8c5e6c@127.0.0.1:17889"); + BlockId genesisBlockId = new BlockId(); + HelloMessage message = new HelloMessage(node, + System.currentTimeMillis(), genesisBlockId, manager.getSolidBlockId(), + manager.getHeadBlockId()); + //Unpooled.wrappedBuffer(ArrayUtils.add("nihao".getBytes(), 0, (byte) 1)) + channel.writeAndFlush(message.getSendData()) + .addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + logger.info("send msg success"); + } else { + logger.error("send msg fail", future.cause()); + } + }); + Thread.sleep(2000); - } } From 73486af1d6af992f197d7e7f7900af643a0651fb Mon Sep 17 00:00:00 2001 From: liangzhiyan Date: Tue, 29 May 2018 16:50:13 +0800 Subject: [PATCH 3/7] add test case --- .../org/tron/core/net/node/BaseNetTest.java | 25 ++- .../org/tron/core/net/node/TcpNetTest.java | 180 +++++++++++++++--- 2 files changed, 175 insertions(+), 30 deletions(-) diff --git a/src/test/java/org/tron/core/net/node/BaseNetTest.java b/src/test/java/org/tron/core/net/node/BaseNetTest.java index a2b9ccbb669..6a3bc371f59 100644 --- a/src/test/java/org/tron/core/net/node/BaseNetTest.java +++ b/src/test/java/org/tron/core/net/node/BaseNetTest.java @@ -8,11 +8,14 @@ import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.WriteTimeoutHandler; import java.io.File; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.junit.After; @@ -40,22 +43,26 @@ public abstract class BaseNetTest { protected ChannelManager channelManager; protected SyncPool pool; protected Manager manager; + private Application appT; private String dbPath; private String dbDirectory; private String indexDirectory; - private static final int port = 17889; + private int port; - public BaseNetTest(String dbPath, String dbDirectory, String indexDirectory) { + private ExecutorService executorService = Executors.newFixedThreadPool(1); + + public BaseNetTest(String dbPath, String dbDirectory, String indexDirectory, int port) { this.dbPath = dbPath; this.dbDirectory = dbDirectory; this.indexDirectory = indexDirectory; + this.port = port; } @Before public void init() { - new Thread(new Runnable() { + executorService.execute(new Runnable() { @Override public void run() { logger.info("Full node running."); @@ -80,7 +87,7 @@ public void run() { logger.info("Here is the help message."); return; } - Application appT = ApplicationFactory.create(context); + appT = ApplicationFactory.create(context); rpcApiService = context.getBean(RpcApiService.class); appT.addService(rpcApiService); if (cfgArgs.isWitness()) { @@ -101,7 +108,7 @@ public void run() { appT.startup(); rpcApiService.blockUntilShutdown(); } - }).start(); + }); int tryTimes = 1; while (tryTimes <= 30 && (node == null || peerClient == null || channelManager == null || pool == null)) { @@ -117,7 +124,8 @@ public void run() { } } - protected Channel createClient() throws InterruptedException { + protected Channel createClient(ByteToMessageDecoder decoder) + throws InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup(1); Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) @@ -133,6 +141,7 @@ protected void initChannel(Channel ch) throws Exception { .addLast("writeTimeoutHandler", new WriteTimeoutHandler(600, TimeUnit.SECONDS)); ch.pipeline().addLast("protoPender", new ProtobufVarint32LengthFieldPrepender()); ch.pipeline().addLast("lengthDecode", new ProtobufVarint32FrameDecoder()); + ch.pipeline().addLast("handshakeHandler", decoder); // be aware of channel closing ch.closeFuture(); @@ -145,7 +154,9 @@ protected void initChannel(Channel ch) throws Exception { @After public void destroy() { + appT.shutdown(); + executorService.shutdownNow(); Args.clearParam(); - FileUtil.deleteDir(new File("output-nodeImplTest")); + FileUtil.deleteDir(new File(dbPath)); } } diff --git a/src/test/java/org/tron/core/net/node/TcpNetTest.java b/src/test/java/org/tron/core/net/node/TcpNetTest.java index e28bcf5111d..c0fd770a614 100644 --- a/src/test/java/org/tron/core/net/node/TcpNetTest.java +++ b/src/test/java/org/tron/core/net/node/TcpNetTest.java @@ -1,12 +1,32 @@ package org.tron.core.net.node; +import static org.tron.core.net.message.MessageTypes.P2P_DISCONNECT; +import static org.tron.core.net.message.MessageTypes.P2P_HELLO; +import static org.tron.protos.Protocol.ReasonCode.FORKED; +import static org.tron.protos.Protocol.ReasonCode.INCOMPATIBLE_CHAIN; +import static org.tron.protos.Protocol.ReasonCode.INCOMPATIBLE_PROTOCOL; + +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import java.net.InetAddress; +import java.util.Collections; +import java.util.Date; +import java.util.List; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.map.LRUMap; +import org.junit.Assert; import org.junit.Test; import org.tron.common.overlay.discover.Node; +import org.tron.common.overlay.message.DisconnectMessage; import org.tron.common.overlay.message.HelloMessage; +import org.tron.common.overlay.message.P2pMessage; +import org.tron.common.overlay.message.P2pMessageFactory; +import org.tron.common.utils.ReflectUtils; import org.tron.core.capsule.BlockCapsule.BlockId; +import org.tron.core.config.args.Args; @Slf4j public class TcpNetTest extends BaseNetTest { @@ -14,22 +34,83 @@ public class TcpNetTest extends BaseNetTest { private static final String dbPath = "output-nodeImplTest/tcpNet"; private static final String dbDirectory = "db_tcp_test"; private static final String indexDirectory = "index_tcp_test"; + public static final int sleepTime = 1000; + private boolean finish = false; + private final static int tryTimes = 10; + private final static int port = 17899; + + Node node = new Node( + "enode://e437a4836b77ad9d9ffe73ee782ef2614e6d8370fcf62191a6e488276e23717147073a7ce0b444d485fff5a0c34c4577251a7a990cf80d8542e21b95aa8c5e6c@127.0.0.1:17889"); + public TcpNetTest() { - super(dbPath, dbDirectory, indexDirectory); + super(dbPath, dbDirectory, indexDirectory, port); } - @Test + private enum TestType { + normal, errorGenesisBlock, errorVersion, errorSolid + } + + private class HandshakeHandler extends ByteToMessageDecoder { + + private P2pMessageFactory messageFactory = new P2pMessageFactory(); + + private TestType testType; + + public HandshakeHandler(TestType testType) { + this.testType = testType; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) + throws Exception { + byte[] encoded = new byte[buffer.readableBytes()]; + buffer.readBytes(encoded); + P2pMessage msg = messageFactory.create(encoded); + + logger.info("Handshake Receive from {}, {}", ctx.channel().remoteAddress(), msg); + switch (msg.getType()) { + case P2P_HELLO: + logger.info("HandshakeHandler success"); + break; + case P2P_DISCONNECT: + logger.info("getReasonCode : {}", ((DisconnectMessage) msg).getReasonCode()); + break; + default: + return; + } + + switch (testType) { + case normal: + Assert.assertEquals(msg.getType(), P2P_HELLO); + break; + case errorGenesisBlock: + Assert.assertEquals(msg.getType(), P2P_DISCONNECT); + Assert.assertEquals(((DisconnectMessage) msg).getReasonCode(), INCOMPATIBLE_CHAIN); + break; + case errorVersion: + Assert.assertEquals(msg.getType(), P2P_DISCONNECT); + Assert.assertEquals(((DisconnectMessage) msg).getReasonCode(), INCOMPATIBLE_PROTOCOL); + break; + case errorSolid: + Assert.assertEquals(msg.getType(), P2P_DISCONNECT); + Assert.assertEquals(((DisconnectMessage) msg).getReasonCode(), FORKED); + break; + default: + break; + } + + finish = true; + } + } + + //Unpooled.wrappedBuffer(ArrayUtils.add("nihao".getBytes(), 0, (byte) 1)) + + // @Test public void normalTest() throws InterruptedException { -// Thread.sleep(2000); - Channel channel = createClient(); - org.tron.common.overlay.discover.Node node = new Node( - "enode://e437a4836b77ad9d9ffe73ee782ef2614e6d8370fcf62191a6e488276e23717147073a7ce0b444d485fff5a0c34c4577251a7a990cf80d8542e21b95aa8c5e6c@127.0.0.1:17889"); - BlockId genesisBlockId = new BlockId(); - HelloMessage message = new HelloMessage(node, - System.currentTimeMillis(), manager.getGenesisBlockId(), manager.getSolidBlockId(), - manager.getHeadBlockId()); - //Unpooled.wrappedBuffer(ArrayUtils.add("nihao".getBytes(), 0, (byte) 1)) + Channel channel = createClient(new HandshakeHandler(TestType.normal)); + HelloMessage message = new HelloMessage(node, System.currentTimeMillis(), + manager.getGenesisBlockId(), manager.getSolidBlockId(), manager.getHeadBlockId()); channel.writeAndFlush(message.getSendData()) .addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { @@ -38,21 +119,15 @@ public void normalTest() throws InterruptedException { logger.error("send msg fail", future.cause()); } }); - - Thread.sleep(2000); + validResult(channel); } - @Test + // @Test public void errorGenesisBlockIdTest() throws InterruptedException { -// Thread.sleep(2000); - Channel channel = createClient(); - org.tron.common.overlay.discover.Node node = new Node( - "enode://e437a4836b77ad9d9ffe73ee782ef2614e6d8370fcf62191a6e488276e23717147073a7ce0b444d485fff5a0c34c4577251a7a990cf80d8542e21b95aa8c5e6c@127.0.0.1:17889"); + Channel channel = createClient(new HandshakeHandler(TestType.errorGenesisBlock)); BlockId genesisBlockId = new BlockId(); - HelloMessage message = new HelloMessage(node, - System.currentTimeMillis(), genesisBlockId, manager.getSolidBlockId(), - manager.getHeadBlockId()); - //Unpooled.wrappedBuffer(ArrayUtils.add("nihao".getBytes(), 0, (byte) 1)) + HelloMessage message = new HelloMessage(node, System.currentTimeMillis(), genesisBlockId, + manager.getSolidBlockId(), manager.getHeadBlockId()); channel.writeAndFlush(message.getSendData()) .addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { @@ -62,6 +137,65 @@ public void errorGenesisBlockIdTest() throws InterruptedException { } }); - Thread.sleep(2000); + validResult(channel); + } + + // @Test + public void errorVersionTest() throws InterruptedException { + Channel channel = createClient(new HandshakeHandler(TestType.errorVersion)); + Args.getInstance().setNodeP2pVersion(1); + HelloMessage message = new HelloMessage(node, System.currentTimeMillis(), + manager.getGenesisBlockId(), manager.getSolidBlockId(), manager.getHeadBlockId()); + Args.getInstance().setNodeP2pVersion(2); + channel.writeAndFlush(message.getSendData()) + .addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + logger.info("send msg success"); + } else { + logger.error("send msg fail", future.cause()); + } + }); + + validResult(channel); + } + + // @Test + public void errorSolidBlockIdTest() throws InterruptedException { + Channel channel = createClient(new HandshakeHandler(TestType.errorSolid)); + HelloMessage message = new HelloMessage(node, System.currentTimeMillis(), + manager.getGenesisBlockId(), new BlockId(), manager.getHeadBlockId()); + channel.writeAndFlush(message.getSendData()) + .addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + logger.info("send msg success"); + } else { + logger.error("send msg fail", future.cause()); + } + }); + validResult(channel); + } + + private void validResult(Channel channel) throws InterruptedException { + int trys = 0; + while (!finish && ++trys < tryTimes) { + Thread.sleep(sleepTime); + } + Assert.assertEquals(finish, true); + finish = false; + ReflectUtils.setFieldValue(channelManager, "recentlyDisconnected", Collections + .synchronizedMap(new LRUMap(500))); + channel.close(); + } + + @Test + public void testAll() throws InterruptedException { + logger.info("begin normal test "); + normalTest(); + logger.info("begin errorGenesisBlockId test "); + errorGenesisBlockIdTest(); + logger.info("begin errorVersion test "); + errorVersionTest(); + logger.info("begin errorSolidBlockId test "); + errorSolidBlockIdTest(); } } From c989a0005663e86e421c8ae73f12ea995cef3c0d Mon Sep 17 00:00:00 2001 From: liangzhiyan Date: Tue, 29 May 2018 17:04:11 +0800 Subject: [PATCH 4/7] remove bug code --- src/test/java/org/tron/core/net/node/BaseNetTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/org/tron/core/net/node/BaseNetTest.java b/src/test/java/org/tron/core/net/node/BaseNetTest.java index 6a3bc371f59..787d9466108 100644 --- a/src/test/java/org/tron/core/net/node/BaseNetTest.java +++ b/src/test/java/org/tron/core/net/node/BaseNetTest.java @@ -154,7 +154,6 @@ protected void initChannel(Channel ch) throws Exception { @After public void destroy() { - appT.shutdown(); executorService.shutdownNow(); Args.clearParam(); FileUtil.deleteDir(new File(dbPath)); From af378222fa8b97adb9e336298fce13b99df2f8cb Mon Sep 17 00:00:00 2001 From: liangzhiyan Date: Wed, 30 May 2018 11:29:18 +0800 Subject: [PATCH 5/7] add test case --- .../org/tron/core/net/node/BroadTest.java | 32 ++--- .../org/tron/core/net/node/TcpNetTest.java | 123 +++++++++++++----- 2 files changed, 103 insertions(+), 52 deletions(-) diff --git a/src/test/java/org/tron/core/net/node/BroadTest.java b/src/test/java/org/tron/core/net/node/BroadTest.java index 84258b898c3..227ca05b566 100644 --- a/src/test/java/org/tron/core/net/node/BroadTest.java +++ b/src/test/java/org/tron/core/net/node/BroadTest.java @@ -8,6 +8,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.MapUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -18,6 +19,7 @@ import org.tron.common.overlay.client.PeerClient; import org.tron.common.overlay.discover.Node; import org.tron.common.overlay.message.Message; +import org.tron.common.overlay.server.Channel; import org.tron.common.overlay.server.ChannelManager; import org.tron.common.overlay.server.MessageQueue; import org.tron.common.overlay.server.SyncPool; @@ -26,6 +28,7 @@ import org.tron.common.utils.Sha256Hash; import org.tron.core.config.DefaultConfig; import org.tron.core.config.args.Args; +import org.tron.core.db.ByteArrayWrapper; import org.tron.core.db.Manager; import org.tron.core.net.message.BlockMessage; import org.tron.core.net.message.MessageTypes; @@ -251,29 +254,12 @@ public void run() { } }).start(); Thread.sleep(1000); -// List newChanelList = ReflectUtils.getFieldValue(channelManager, "newPeers"); -// int tryTimes = 0; -// while (CollectionUtils.isEmpty(newChanelList) && ++tryTimes < 10) { -// Thread.sleep(1000); -// } -// logger.info("newChanelList size : {}", newChanelList.size()); - -// Field activePeersField = channelManager.getClass().getDeclaredField("activePeers"); -// activePeersField.setAccessible(true); -// Map activePeersMap = (Map) activePeersField -// .get(channelManager); -// -// Field apField = pool.getClass().getDeclaredField("activePeers"); -// apField.setAccessible(true); -// List activePeers = (List) apField.get(pool); - -// for (Channel channel : newChanelList) { -// activePeersMap.put(channel.getNodeIdWrapper(), channel); -// activePeers.add((PeerConnection) channel); -// } -// apField.set(pool, activePeers); -// activePeersField.set(channelManager, activePeersMap); - // + Map activePeers = ReflectUtils + .getFieldValue(channelManager, "activePeers"); + int tryTimes = 0; + while (MapUtils.isEmpty(activePeers) && ++tryTimes < 10) { + Thread.sleep(1000); + } go = true; } catch (Exception e) { e.printStackTrace(); diff --git a/src/test/java/org/tron/core/net/node/TcpNetTest.java b/src/test/java/org/tron/core/net/node/TcpNetTest.java index c0fd770a614..16dda4b86d9 100644 --- a/src/test/java/org/tron/core/net/node/TcpNetTest.java +++ b/src/test/java/org/tron/core/net/node/TcpNetTest.java @@ -2,11 +2,13 @@ import static org.tron.core.net.message.MessageTypes.P2P_DISCONNECT; import static org.tron.core.net.message.MessageTypes.P2P_HELLO; +import static org.tron.protos.Protocol.ReasonCode.DUPLICATE_PEER; import static org.tron.protos.Protocol.ReasonCode.FORKED; import static org.tron.protos.Protocol.ReasonCode.INCOMPATIBLE_CHAIN; import static org.tron.protos.Protocol.ReasonCode.INCOMPATIBLE_PROTOCOL; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; @@ -17,16 +19,21 @@ import java.util.List; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.map.LRUMap; +import org.apache.commons.lang3.ArrayUtils; import org.junit.Assert; import org.junit.Test; import org.tron.common.overlay.discover.Node; import org.tron.common.overlay.message.DisconnectMessage; import org.tron.common.overlay.message.HelloMessage; +import org.tron.common.overlay.message.Message; import org.tron.common.overlay.message.P2pMessage; import org.tron.common.overlay.message.P2pMessageFactory; import org.tron.common.utils.ReflectUtils; import org.tron.core.capsule.BlockCapsule.BlockId; import org.tron.core.config.args.Args; +import org.tron.core.net.message.BlockMessage; +import org.tron.core.net.peer.PeerConnection; +import org.tron.protos.Protocol.Block; @Slf4j public class TcpNetTest extends BaseNetTest { @@ -48,7 +55,7 @@ public TcpNetTest() { } private enum TestType { - normal, errorGenesisBlock, errorVersion, errorSolid + normal, errorGenesisBlock, errorVersion, errorSolid, repeatConnect } private class HandshakeHandler extends ByteToMessageDecoder { @@ -96,6 +103,10 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List ou Assert.assertEquals(msg.getType(), P2P_DISCONNECT); Assert.assertEquals(((DisconnectMessage) msg).getReasonCode(), FORKED); break; + case repeatConnect: + Assert.assertEquals(msg.getType(), P2P_DISCONNECT); + Assert.assertEquals(((DisconnectMessage) msg).getReasonCode(), DUPLICATE_PEER); + break; default: break; } @@ -111,15 +122,8 @@ public void normalTest() throws InterruptedException { Channel channel = createClient(new HandshakeHandler(TestType.normal)); HelloMessage message = new HelloMessage(node, System.currentTimeMillis(), manager.getGenesisBlockId(), manager.getSolidBlockId(), manager.getHeadBlockId()); - channel.writeAndFlush(message.getSendData()) - .addListener((ChannelFutureListener) future -> { - if (future.isSuccess()) { - logger.info("send msg success"); - } else { - logger.error("send msg fail", future.cause()); - } - }); - validResult(channel); + sendMessage(channel, message); + validResultCloseConnect(channel); } // @Test @@ -128,16 +132,9 @@ public void errorGenesisBlockIdTest() throws InterruptedException { BlockId genesisBlockId = new BlockId(); HelloMessage message = new HelloMessage(node, System.currentTimeMillis(), genesisBlockId, manager.getSolidBlockId(), manager.getHeadBlockId()); - channel.writeAndFlush(message.getSendData()) - .addListener((ChannelFutureListener) future -> { - if (future.isSuccess()) { - logger.info("send msg success"); - } else { - logger.error("send msg fail", future.cause()); - } - }); + sendMessage(channel, message); - validResult(channel); + validResultCloseConnect(channel); } // @Test @@ -147,7 +144,55 @@ public void errorVersionTest() throws InterruptedException { HelloMessage message = new HelloMessage(node, System.currentTimeMillis(), manager.getGenesisBlockId(), manager.getSolidBlockId(), manager.getHeadBlockId()); Args.getInstance().setNodeP2pVersion(2); - channel.writeAndFlush(message.getSendData()) + sendMessage(channel, message); + + validResultCloseConnect(channel); + } + + // @Test + public void errorSolidBlockIdTest() throws InterruptedException { + Channel channel = createClient(new HandshakeHandler(TestType.errorSolid)); + HelloMessage message = new HelloMessage(node, System.currentTimeMillis(), + manager.getGenesisBlockId(), new BlockId(), manager.getHeadBlockId()); + sendMessage(channel, message); + validResultCloseConnect(channel); + } + + // @Test + public void repeatConnectTest() throws InterruptedException { + Channel channel = createClient(new HandshakeHandler(TestType.normal)); + HelloMessage message = new HelloMessage(node, System.currentTimeMillis(), + manager.getGenesisBlockId(), manager.getSolidBlockId(), manager.getHeadBlockId()); + sendMessage(channel, message); + validResultUnCloseConnect(); + Channel repeatChannel = createClient(new HandshakeHandler(TestType.repeatConnect)); + sendMessage(repeatChannel, message); + validResultCloseConnect(repeatChannel); + clearConnect(channel); + } + + // @Test + public void unHandshakeTest() throws InterruptedException { + Channel channel = createClient(new HandshakeHandler(TestType.normal)); + BlockMessage message = new BlockMessage(Block.getDefaultInstance()); + List beforeActivePeers = ReflectUtils.getFieldValue(pool, "activePeers"); + sendMessage(channel, message); + List afterActivePeers = ReflectUtils.getFieldValue(pool, "activePeers"); + Assert.assertEquals(beforeActivePeers.size(), afterActivePeers.size()); + clearConnect(channel); + } + + // @Test + public void errorMsgTest() throws InterruptedException { + Channel channel = createClient(new HandshakeHandler(TestType.normal)); + HelloMessage message = new HelloMessage(node, System.currentTimeMillis(), + manager.getGenesisBlockId(), manager.getSolidBlockId(), manager.getHeadBlockId()); + sendMessage(channel, message); + validResultUnCloseConnect(); + List beforeActivePeers = ReflectUtils.getFieldValue(pool, "activePeers"); + int beforeSize = beforeActivePeers.size(); + logger.info("beforeSize : {}", beforeSize); + channel.writeAndFlush(Unpooled.wrappedBuffer(ArrayUtils.add("nihao".getBytes(), 0, (byte) 1))) .addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { logger.info("send msg success"); @@ -155,15 +200,15 @@ public void errorVersionTest() throws InterruptedException { logger.error("send msg fail", future.cause()); } }); - - validResult(channel); + Thread.sleep(2000); + List afterActivePeers = ReflectUtils.getFieldValue(pool, "activePeers"); + int afterSize = afterActivePeers.size(); + logger.info("afterSize : {}", afterSize); + Assert.assertEquals(beforeSize, afterSize + 1); + clearConnect(channel); } - // @Test - public void errorSolidBlockIdTest() throws InterruptedException { - Channel channel = createClient(new HandshakeHandler(TestType.errorSolid)); - HelloMessage message = new HelloMessage(node, System.currentTimeMillis(), - manager.getGenesisBlockId(), new BlockId(), manager.getHeadBlockId()); + private void sendMessage(Channel channel, Message message) { channel.writeAndFlush(message.getSendData()) .addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { @@ -172,10 +217,9 @@ public void errorSolidBlockIdTest() throws InterruptedException { logger.error("send msg fail", future.cause()); } }); - validResult(channel); } - private void validResult(Channel channel) throws InterruptedException { + private void validResultCloseConnect(Channel channel) throws InterruptedException { int trys = 0; while (!finish && ++trys < tryTimes) { Thread.sleep(sleepTime); @@ -187,6 +231,21 @@ private void validResult(Channel channel) throws InterruptedException { channel.close(); } + private void validResultUnCloseConnect() throws InterruptedException { + int trys = 0; + while (!finish && ++trys < tryTimes) { + Thread.sleep(sleepTime); + } + Assert.assertEquals(finish, true); + finish = false; + } + + private void clearConnect(Channel channel) { + ReflectUtils.setFieldValue(channelManager, "recentlyDisconnected", Collections + .synchronizedMap(new LRUMap(500))); + channel.close(); + } + @Test public void testAll() throws InterruptedException { logger.info("begin normal test "); @@ -197,5 +256,11 @@ public void testAll() throws InterruptedException { errorVersionTest(); logger.info("begin errorSolidBlockId test "); errorSolidBlockIdTest(); + logger.info("begin repeatConnect test"); + repeatConnectTest(); + logger.info("begin unHandshake test"); + unHandshakeTest(); + logger.info("begin errorMsg test"); + errorMsgTest(); } } From 7bbe647f158c4c7910bd624c15663cecf2cae3fa Mon Sep 17 00:00:00 2001 From: liangzhiyan Date: Wed, 30 May 2018 16:57:48 +0800 Subject: [PATCH 6/7] add test case --- .../org/tron/core/net/node/TcpNetTest.java | 52 ++++++++++++------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/src/test/java/org/tron/core/net/node/TcpNetTest.java b/src/test/java/org/tron/core/net/node/TcpNetTest.java index 16dda4b86d9..baf97968f36 100644 --- a/src/test/java/org/tron/core/net/node/TcpNetTest.java +++ b/src/test/java/org/tron/core/net/node/TcpNetTest.java @@ -14,9 +14,11 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.net.InetAddress; +import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.map.LRUMap; import org.apache.commons.lang3.ArrayUtils; @@ -173,12 +175,14 @@ public void repeatConnectTest() throws InterruptedException { // @Test public void unHandshakeTest() throws InterruptedException { + List beforeActivePeers = ReflectUtils.getFieldValue(pool, "activePeers"); + int beforeSize = beforeActivePeers.size(); Channel channel = createClient(new HandshakeHandler(TestType.normal)); BlockMessage message = new BlockMessage(Block.getDefaultInstance()); - List beforeActivePeers = ReflectUtils.getFieldValue(pool, "activePeers"); sendMessage(channel, message); List afterActivePeers = ReflectUtils.getFieldValue(pool, "activePeers"); - Assert.assertEquals(beforeActivePeers.size(), afterActivePeers.size()); + int afterSize = afterActivePeers.size(); + Assert.assertEquals(beforeSize, afterSize); clearConnect(channel); } @@ -226,9 +230,13 @@ private void validResultCloseConnect(Channel channel) throws InterruptedExceptio } Assert.assertEquals(finish, true); finish = false; + channel.close(); + Thread.sleep(sleepTime); ReflectUtils.setFieldValue(channelManager, "recentlyDisconnected", Collections .synchronizedMap(new LRUMap(500))); - channel.close(); + ReflectUtils.setFieldValue(pool, "activePeers", + Collections.synchronizedList(new ArrayList())); + ReflectUtils.setFieldValue(channelManager, "activePeers", new ConcurrentHashMap<>()); } private void validResultUnCloseConnect() throws InterruptedException { @@ -240,27 +248,33 @@ private void validResultUnCloseConnect() throws InterruptedException { finish = false; } - private void clearConnect(Channel channel) { + private void clearConnect(Channel channel) throws InterruptedException { + channel.close(); + Thread.sleep(org.tron.common.overlay.server.SyncPool.getActivePeers); ReflectUtils.setFieldValue(channelManager, "recentlyDisconnected", Collections .synchronizedMap(new LRUMap(500))); - channel.close(); + ReflectUtils.setFieldValue(pool, "activePeers", + Collections.synchronizedList(new ArrayList())); + ReflectUtils.setFieldValue(channelManager, "activePeers", new ConcurrentHashMap<>()); } @Test public void testAll() throws InterruptedException { - logger.info("begin normal test "); - normalTest(); - logger.info("begin errorGenesisBlockId test "); - errorGenesisBlockIdTest(); - logger.info("begin errorVersion test "); - errorVersionTest(); - logger.info("begin errorSolidBlockId test "); - errorSolidBlockIdTest(); - logger.info("begin repeatConnect test"); - repeatConnectTest(); - logger.info("begin unHandshake test"); - unHandshakeTest(); - logger.info("begin errorMsg test"); - errorMsgTest(); + for (int i = 0; i < 100; i++) { + logger.info("begin normal test "); + normalTest(); + logger.info("begin errorGenesisBlockId test "); + errorGenesisBlockIdTest(); + logger.info("begin errorVersion test "); + errorVersionTest(); + logger.info("begin errorSolidBlockId test "); + errorSolidBlockIdTest(); + logger.info("begin repeatConnect test"); + repeatConnectTest(); + logger.info("begin unHandshake test"); + unHandshakeTest(); + logger.info("begin errorMsg test"); + errorMsgTest(); + } } } From b40c8275157a12dc5d80faf1726145cfb0ec070b Mon Sep 17 00:00:00 2001 From: liangzhiyan Date: Wed, 30 May 2018 18:01:24 +0800 Subject: [PATCH 7/7] modify brock test --- src/test/java/org/tron/core/net/node/BroadTest.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/test/java/org/tron/core/net/node/BroadTest.java b/src/test/java/org/tron/core/net/node/BroadTest.java index 9ddb82d72e4..c116b334d06 100644 --- a/src/test/java/org/tron/core/net/node/BroadTest.java +++ b/src/test/java/org/tron/core/net/node/BroadTest.java @@ -8,6 +8,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.MapUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -18,6 +19,7 @@ import org.tron.common.overlay.client.PeerClient; import org.tron.common.overlay.discover.Node; import org.tron.common.overlay.message.Message; +import org.tron.common.overlay.server.Channel; import org.tron.common.overlay.server.ChannelManager; import org.tron.common.overlay.server.MessageQueue; import org.tron.common.overlay.server.SyncPool; @@ -27,6 +29,7 @@ import org.tron.core.capsule.BlockCapsule; import org.tron.core.config.DefaultConfig; import org.tron.core.config.args.Args; +import org.tron.core.db.ByteArrayWrapper; import org.tron.core.db.Manager; import org.tron.core.net.message.BlockMessage; import org.tron.core.net.message.MessageTypes; @@ -234,7 +237,6 @@ public void run() { } } - private static int trys = 0; private void prepare() { try { ExecutorService advertiseLoopThread = ReflectUtils.getFieldValue(node, "broadPool"); @@ -251,7 +253,13 @@ public void run() { peerClient.connect(node.getHost(), node.getPort(), node.getHexId()); } }).start(); - Thread.sleep(5000); + Thread.sleep(2000); + Map activePeers = ReflectUtils + .getFieldValue(channelManager, "activePeers"); + int tryTimes = 0; + while (MapUtils.isEmpty(activePeers) && ++tryTimes < 10) { + Thread.sleep(1000); + } go = true; } catch (Exception e) { e.printStackTrace();