diff --git a/src/test/java/org/tron/core/net/node/BaseNetTest.java b/src/test/java/org/tron/core/net/node/BaseNetTest.java new file mode 100644 index 00000000000..787d9466108 --- /dev/null +++ b/src/test/java/org/tron/core/net/node/BaseNetTest.java @@ -0,0 +1,161 @@ +package org.tron.core.net.node; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.DefaultMessageSizeEstimator; +import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; +import java.io.File; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.junit.After; +import org.junit.Before; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.tron.common.application.Application; +import org.tron.common.application.ApplicationFactory; +import org.tron.common.overlay.client.PeerClient; +import org.tron.common.overlay.server.ChannelManager; +import org.tron.common.overlay.server.SyncPool; +import org.tron.common.utils.FileUtil; +import org.tron.core.config.DefaultConfig; +import org.tron.core.config.args.Args; +import org.tron.core.db.Manager; +import org.tron.core.services.RpcApiService; +import org.tron.core.services.WitnessService; + +@Slf4j +public abstract class BaseNetTest { + + protected static AnnotationConfigApplicationContext context; + protected NodeImpl node; + protected RpcApiService rpcApiService; + protected PeerClient peerClient; + protected ChannelManager channelManager; + protected SyncPool pool; + protected Manager manager; + private Application appT; + + private String dbPath; + private String dbDirectory; + private String indexDirectory; + + private int port; + + private ExecutorService executorService = Executors.newFixedThreadPool(1); + + public BaseNetTest(String dbPath, String dbDirectory, String indexDirectory, int port) { + this.dbPath = dbPath; + this.dbDirectory = dbDirectory; + this.indexDirectory = indexDirectory; + this.port = port; + } + + @Before + public void init() { + executorService.execute(new Runnable() { + @Override + public void run() { + logger.info("Full node running."); + Args.setParam( + new String[]{ + "--output-directory", dbPath, + "--storage-db-directory", dbDirectory, + "--storage-index-directory", indexDirectory + }, + "config.conf" + ); + Args cfgArgs = Args.getInstance(); + cfgArgs.setNodeListenPort(port); + cfgArgs.setNodeDiscoveryEnable(false); + cfgArgs.getSeedNode().getIpList().clear(); + cfgArgs.setNeedSyncCheck(false); + cfgArgs.setNodeExternalIp("127.0.0.1"); + + context = new AnnotationConfigApplicationContext(DefaultConfig.class); + + if (cfgArgs.isHelp()) { + logger.info("Here is the help message."); + return; + } + appT = ApplicationFactory.create(context); + rpcApiService = context.getBean(RpcApiService.class); + appT.addService(rpcApiService); + if (cfgArgs.isWitness()) { + appT.addService(new WitnessService(appT)); + } + appT.initServices(cfgArgs); + appT.startServices(); + + node = context.getBean(NodeImpl.class); + peerClient = context.getBean(PeerClient.class); + channelManager = context.getBean(ChannelManager.class); + pool = context.getBean(SyncPool.class); + manager = context.getBean(Manager.class); + NodeDelegate nodeDelegate = new NodeDelegateImpl(manager); + node.setNodeDelegate(nodeDelegate); + pool.init(node); + + appT.startup(); + rpcApiService.blockUntilShutdown(); + } + }); + int tryTimes = 1; + while (tryTimes <= 30 && (node == null || peerClient == null + || channelManager == null || pool == null)) { + try { + logger.info("node:{},peerClient:{},channelManager:{},pool:{}", node, peerClient, + channelManager, pool); + Thread.sleep(1000 * tryTimes); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + ++tryTimes; + } + } + } + + protected Channel createClient(ByteToMessageDecoder decoder) + throws InterruptedException { + NioEventLoopGroup group = new NioEventLoopGroup(1); + Bootstrap b = new Bootstrap(); + b.group(group).channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + // limit the size of receiving buffer to 1024 + ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(256 * 1024)); + ch.config().setOption(ChannelOption.SO_RCVBUF, 256 * 1024); + ch.config().setOption(ChannelOption.SO_BACKLOG, 1024); + ch.pipeline() + .addLast("readTimeoutHandler", new ReadTimeoutHandler(600, TimeUnit.SECONDS)) + .addLast("writeTimeoutHandler", new WriteTimeoutHandler(600, TimeUnit.SECONDS)); + ch.pipeline().addLast("protoPender", new ProtobufVarint32LengthFieldPrepender()); + ch.pipeline().addLast("lengthDecode", new ProtobufVarint32FrameDecoder()); + ch.pipeline().addLast("handshakeHandler", decoder); + + // be aware of channel closing + ch.closeFuture(); + } + }).option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000) + .option(ChannelOption.MESSAGE_SIZE_ESTIMATOR, DefaultMessageSizeEstimator.DEFAULT); + return b.connect("127.0.0.1", port).sync().channel(); + } + + @After + public void destroy() { + executorService.shutdownNow(); + Args.clearParam(); + FileUtil.deleteDir(new File(dbPath)); + } +} diff --git a/src/test/java/org/tron/core/net/node/BroadTest.java b/src/test/java/org/tron/core/net/node/BroadTest.java index be9f4e6f458..c116b334d06 100644 --- a/src/test/java/org/tron/core/net/node/BroadTest.java +++ b/src/test/java/org/tron/core/net/node/BroadTest.java @@ -8,6 +8,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.MapUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -18,6 +19,7 @@ import org.tron.common.overlay.client.PeerClient; import org.tron.common.overlay.discover.Node; import org.tron.common.overlay.message.Message; +import org.tron.common.overlay.server.Channel; import org.tron.common.overlay.server.ChannelManager; import org.tron.common.overlay.server.MessageQueue; import org.tron.common.overlay.server.SyncPool; @@ -27,6 +29,7 @@ import org.tron.core.capsule.BlockCapsule; import org.tron.core.config.DefaultConfig; import org.tron.core.config.args.Args; +import org.tron.core.db.ByteArrayWrapper; import org.tron.core.db.Manager; import org.tron.core.net.message.BlockMessage; import org.tron.core.net.message.MessageTypes; @@ -242,10 +245,6 @@ private void prepare() { ReflectUtils.setFieldValue(node, "isAdvertiseActive", false); ReflectUtils.setFieldValue(node, "isFetchActive", false); -// ScheduledExecutorService mainWorker = ReflectUtils -// .getFieldValue(channelManager, "mainWorker"); -// mainWorker.shutdownNow(); - Node node = new Node( "enode://e437a4836b77ad9d9ffe73ee782ef2614e6d8370fcf62191a6e488276e23717147073a7ce0b444d485fff5a0c34c4577251a7a990cf80d8542e21b95aa8c5e6c@127.0.0.1:17889"); new Thread(new Runnable() { @@ -254,30 +253,13 @@ public void run() { peerClient.connect(node.getHost(), node.getPort(), node.getHexId()); } }).start(); - Thread.sleep(5000); -// List newChanelList = ReflectUtils.getFieldValue(channelManager, "newPeers"); -// int tryTimes = 0; -// while (CollectionUtils.isEmpty(newChanelList) && ++tryTimes < 10) { -// Thread.sleep(1000); -// } -// logger.info("newChanelList size : {}", newChanelList.size()); - -// Field activePeersField = channelManager.getClass().getDeclaredField("activePeers"); -// activePeersField.setAccessible(true); -// Map activePeersMap = (Map) activePeersField -// .get(channelManager); -// -// Field apField = pool.getClass().getDeclaredField("activePeers"); -// apField.setAccessible(true); -// List activePeers = (List) apField.get(pool); - -// for (Channel channel : newChanelList) { -// activePeersMap.put(channel.getNodeIdWrapper(), channel); -// activePeers.add((PeerConnection) channel); -// } -// apField.set(pool, activePeers); -// activePeersField.set(channelManager, activePeersMap); - // + Thread.sleep(2000); + Map activePeers = ReflectUtils + .getFieldValue(channelManager, "activePeers"); + int tryTimes = 0; + while (MapUtils.isEmpty(activePeers) && ++tryTimes < 10) { + Thread.sleep(1000); + } go = true; } catch (Exception e) { e.printStackTrace(); diff --git a/src/test/java/org/tron/core/net/node/TcpNetTest.java b/src/test/java/org/tron/core/net/node/TcpNetTest.java new file mode 100644 index 00000000000..b48eeb6de86 --- /dev/null +++ b/src/test/java/org/tron/core/net/node/TcpNetTest.java @@ -0,0 +1,279 @@ +package org.tron.core.net.node; + +import static org.tron.core.net.message.MessageTypes.P2P_DISCONNECT; +import static org.tron.core.net.message.MessageTypes.P2P_HELLO; +import static org.tron.protos.Protocol.ReasonCode.DUPLICATE_PEER; +import static org.tron.protos.Protocol.ReasonCode.FORKED; +import static org.tron.protos.Protocol.ReasonCode.INCOMPATIBLE_CHAIN; +import static org.tron.protos.Protocol.ReasonCode.INCOMPATIBLE_PROTOCOL; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.map.LRUMap; +import org.apache.commons.lang3.ArrayUtils; +import org.junit.Assert; +import org.junit.Test; +import org.tron.common.overlay.discover.Node; +import org.tron.common.overlay.message.DisconnectMessage; +import org.tron.common.overlay.message.HelloMessage; +import org.tron.common.overlay.message.Message; +import org.tron.common.overlay.message.P2pMessage; +import org.tron.common.overlay.message.P2pMessageFactory; +import org.tron.common.utils.ReflectUtils; +import org.tron.core.capsule.BlockCapsule; +import org.tron.core.capsule.BlockCapsule.BlockId; +import org.tron.core.config.args.Args; +import org.tron.core.net.message.BlockMessage; +import org.tron.core.net.peer.PeerConnection; +import org.tron.protos.Protocol.Block; + +@Slf4j +public class TcpNetTest extends BaseNetTest { + + private static final String dbPath = "output-nodeImplTest/tcpNet"; + private static final String dbDirectory = "db_tcp_test"; + private static final String indexDirectory = "index_tcp_test"; + public static final int sleepTime = 1000; + private boolean finish = false; + private final static int tryTimes = 10; + private final static int port = 17899; + + Node node = new Node( + "enode://e437a4836b77ad9d9ffe73ee782ef2614e6d8370fcf62191a6e488276e23717147073a7ce0b444d485fff5a0c34c4577251a7a990cf80d8542e21b95aa8c5e6c@127.0.0.1:17889"); + + + public TcpNetTest() { + super(dbPath, dbDirectory, indexDirectory, port); + } + + private enum TestType { + normal, errorGenesisBlock, errorVersion, errorSolid, repeatConnect + } + + private class HandshakeHandler extends ByteToMessageDecoder { + + private P2pMessageFactory messageFactory = new P2pMessageFactory(); + + private TestType testType; + + public HandshakeHandler(TestType testType) { + this.testType = testType; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) + throws Exception { + byte[] encoded = new byte[buffer.readableBytes()]; + buffer.readBytes(encoded); + P2pMessage msg = messageFactory.create(encoded); + + logger.info("Handshake Receive from {}, {}", ctx.channel().remoteAddress(), msg); + switch (msg.getType()) { + case P2P_HELLO: + logger.info("HandshakeHandler success"); + break; + case P2P_DISCONNECT: + logger.info("getReasonCode : {}", ((DisconnectMessage) msg).getReasonCode()); + break; + default: + return; + } + + switch (testType) { + case normal: + Assert.assertEquals(msg.getType(), P2P_HELLO); + break; + case errorGenesisBlock: + Assert.assertEquals(msg.getType(), P2P_DISCONNECT); + Assert.assertEquals(((DisconnectMessage) msg).getReasonCode(), INCOMPATIBLE_CHAIN); + break; + case errorVersion: + Assert.assertEquals(msg.getType(), P2P_DISCONNECT); + Assert.assertEquals(((DisconnectMessage) msg).getReasonCode(), INCOMPATIBLE_PROTOCOL); + break; + case errorSolid: + Assert.assertEquals(msg.getType(), P2P_DISCONNECT); + Assert.assertEquals(((DisconnectMessage) msg).getReasonCode(), FORKED); + break; + case repeatConnect: + Assert.assertEquals(msg.getType(), P2P_DISCONNECT); + Assert.assertEquals(((DisconnectMessage) msg).getReasonCode(), DUPLICATE_PEER); + break; + default: + break; + } + + finish = true; + } + } + + //Unpooled.wrappedBuffer(ArrayUtils.add("nihao".getBytes(), 0, (byte) 1)) + + // @Test + public void normalTest() throws InterruptedException { + Channel channel = createClient(new HandshakeHandler(TestType.normal)); + HelloMessage message = new HelloMessage(node, System.currentTimeMillis(), + manager.getGenesisBlockId(), manager.getSolidBlockId(), manager.getHeadBlockId()); + sendMessage(channel, message); + validResultCloseConnect(channel); + } + + // @Test + public void errorGenesisBlockIdTest() throws InterruptedException { + Channel channel = createClient(new HandshakeHandler(TestType.errorGenesisBlock)); + BlockId genesisBlockId = new BlockId(); + HelloMessage message = new HelloMessage(node, System.currentTimeMillis(), genesisBlockId, + manager.getSolidBlockId(), manager.getHeadBlockId()); + sendMessage(channel, message); + + validResultCloseConnect(channel); + } + + // @Test + public void errorVersionTest() throws InterruptedException { + Channel channel = createClient(new HandshakeHandler(TestType.errorVersion)); + Args.getInstance().setNodeP2pVersion(1); + HelloMessage message = new HelloMessage(node, System.currentTimeMillis(), + manager.getGenesisBlockId(), manager.getSolidBlockId(), manager.getHeadBlockId()); + Args.getInstance().setNodeP2pVersion(2); + sendMessage(channel, message); + + validResultCloseConnect(channel); + } + + // @Test + public void errorSolidBlockIdTest() throws InterruptedException { + Channel channel = createClient(new HandshakeHandler(TestType.errorSolid)); + HelloMessage message = new HelloMessage(node, System.currentTimeMillis(), + manager.getGenesisBlockId(), new BlockId(), manager.getHeadBlockId()); + sendMessage(channel, message); + validResultCloseConnect(channel); + } + + // @Test + public void repeatConnectTest() throws InterruptedException { + Channel channel = createClient(new HandshakeHandler(TestType.normal)); + HelloMessage message = new HelloMessage(node, System.currentTimeMillis(), + manager.getGenesisBlockId(), manager.getSolidBlockId(), manager.getHeadBlockId()); + sendMessage(channel, message); + validResultUnCloseConnect(); + Channel repeatChannel = createClient(new HandshakeHandler(TestType.repeatConnect)); + sendMessage(repeatChannel, message); + validResultCloseConnect(repeatChannel); + clearConnect(channel); + } + + // @Test + public void unHandshakeTest() throws InterruptedException { + List beforeActivePeers = ReflectUtils.getFieldValue(pool, "activePeers"); + int beforeSize = beforeActivePeers.size(); + Channel channel = createClient(new HandshakeHandler(TestType.normal)); + BlockMessage message = new BlockMessage(new BlockCapsule(Block.getDefaultInstance())); + sendMessage(channel, message); + List afterActivePeers = ReflectUtils.getFieldValue(pool, "activePeers"); + int afterSize = afterActivePeers.size(); + Assert.assertEquals(beforeSize, afterSize); + clearConnect(channel); + } + + // @Test + public void errorMsgTest() throws InterruptedException { + Channel channel = createClient(new HandshakeHandler(TestType.normal)); + HelloMessage message = new HelloMessage(node, System.currentTimeMillis(), + manager.getGenesisBlockId(), manager.getSolidBlockId(), manager.getHeadBlockId()); + sendMessage(channel, message); + validResultUnCloseConnect(); + List beforeActivePeers = ReflectUtils.getFieldValue(pool, "activePeers"); + int beforeSize = beforeActivePeers.size(); + logger.info("beforeSize : {}", beforeSize); + channel.writeAndFlush(Unpooled.wrappedBuffer(ArrayUtils.add("nihao".getBytes(), 0, (byte) 1))) + .addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + logger.info("send msg success"); + } else { + logger.error("send msg fail", future.cause()); + } + }); + Thread.sleep(2000); + List afterActivePeers = ReflectUtils.getFieldValue(pool, "activePeers"); + int afterSize = afterActivePeers.size(); + logger.info("afterSize : {}", afterSize); + Assert.assertEquals(beforeSize, afterSize + 1); + clearConnect(channel); + } + + private void sendMessage(Channel channel, Message message) { + channel.writeAndFlush(message.getSendData()) + .addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + logger.info("send msg success"); + } else { + logger.error("send msg fail", future.cause()); + } + }); + } + + private void validResultCloseConnect(Channel channel) throws InterruptedException { + int trys = 0; + while (!finish && ++trys < tryTimes) { + Thread.sleep(sleepTime); + } + Assert.assertEquals(finish, true); + finish = false; + channel.close(); + Thread.sleep(sleepTime); + ReflectUtils.setFieldValue(channelManager, "recentlyDisconnected", Collections + .synchronizedMap(new LRUMap(500))); + ReflectUtils.setFieldValue(pool, "activePeers", + Collections.synchronizedList(new ArrayList())); + ReflectUtils.setFieldValue(channelManager, "activePeers", new ConcurrentHashMap<>()); + } + + private void validResultUnCloseConnect() throws InterruptedException { + int trys = 0; + while (!finish && ++trys < tryTimes) { + Thread.sleep(sleepTime); + } + Assert.assertEquals(finish, true); + finish = false; + } + + private void clearConnect(Channel channel) throws InterruptedException { + channel.close(); + Thread.sleep(sleepTime); + ReflectUtils.setFieldValue(channelManager, "recentlyDisconnected", Collections + .synchronizedMap(new LRUMap(500))); + ReflectUtils.setFieldValue(pool, "activePeers", + Collections.synchronizedList(new ArrayList())); + ReflectUtils.setFieldValue(channelManager, "activePeers", new ConcurrentHashMap<>()); + } + + @Test + public void testAll() throws InterruptedException { + logger.info("begin normal test "); + normalTest(); + logger.info("begin errorGenesisBlockId test "); + errorGenesisBlockIdTest(); + logger.info("begin errorVersion test "); + errorVersionTest(); + logger.info("begin errorSolidBlockId test "); + errorSolidBlockIdTest(); + logger.info("begin repeatConnect test"); + repeatConnectTest(); + logger.info("begin unHandshake test"); + unHandshakeTest(); + logger.info("begin errorMsg test"); + errorMsgTest(); + } +}