Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions src/test/java/org/tron/core/net/node/BaseNetTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package org.tron.core.net.node;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultMessageSizeEstimator;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.junit.After;
import org.junit.Before;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.tron.common.application.Application;
import org.tron.common.application.ApplicationFactory;
import org.tron.common.overlay.client.PeerClient;
import org.tron.common.overlay.server.ChannelManager;
import org.tron.common.overlay.server.SyncPool;
import org.tron.common.utils.FileUtil;
import org.tron.core.config.DefaultConfig;
import org.tron.core.config.args.Args;
import org.tron.core.db.Manager;
import org.tron.core.services.RpcApiService;
import org.tron.core.services.WitnessService;

@Slf4j
public abstract class BaseNetTest {

protected static AnnotationConfigApplicationContext context;
protected NodeImpl node;
protected RpcApiService rpcApiService;
protected PeerClient peerClient;
protected ChannelManager channelManager;
protected SyncPool pool;
protected Manager manager;
private Application appT;

private String dbPath;
private String dbDirectory;
private String indexDirectory;

private int port;

private ExecutorService executorService = Executors.newFixedThreadPool(1);

public BaseNetTest(String dbPath, String dbDirectory, String indexDirectory, int port) {
this.dbPath = dbPath;
this.dbDirectory = dbDirectory;
this.indexDirectory = indexDirectory;
this.port = port;
}

@Before
public void init() {
executorService.execute(new Runnable() {
@Override
public void run() {
logger.info("Full node running.");
Args.setParam(
new String[]{
"--output-directory", dbPath,
"--storage-db-directory", dbDirectory,
"--storage-index-directory", indexDirectory
},
"config.conf"
);
Args cfgArgs = Args.getInstance();
cfgArgs.setNodeListenPort(port);
cfgArgs.setNodeDiscoveryEnable(false);
cfgArgs.getSeedNode().getIpList().clear();
cfgArgs.setNeedSyncCheck(false);
cfgArgs.setNodeExternalIp("127.0.0.1");

context = new AnnotationConfigApplicationContext(DefaultConfig.class);

if (cfgArgs.isHelp()) {
logger.info("Here is the help message.");
return;
}
appT = ApplicationFactory.create(context);
rpcApiService = context.getBean(RpcApiService.class);
appT.addService(rpcApiService);
if (cfgArgs.isWitness()) {
appT.addService(new WitnessService(appT));
}
appT.initServices(cfgArgs);
appT.startServices();

node = context.getBean(NodeImpl.class);
peerClient = context.getBean(PeerClient.class);
channelManager = context.getBean(ChannelManager.class);
pool = context.getBean(SyncPool.class);
manager = context.getBean(Manager.class);
NodeDelegate nodeDelegate = new NodeDelegateImpl(manager);
node.setNodeDelegate(nodeDelegate);
pool.init(node);

appT.startup();
rpcApiService.blockUntilShutdown();
}
});
int tryTimes = 1;
while (tryTimes <= 30 && (node == null || peerClient == null
|| channelManager == null || pool == null)) {
try {
logger.info("node:{},peerClient:{},channelManager:{},pool:{}", node, peerClient,
channelManager, pool);
Thread.sleep(1000 * tryTimes);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
++tryTimes;
}
}
}

protected Channel createClient(ByteToMessageDecoder decoder)
throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup(1);
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
// limit the size of receiving buffer to 1024
ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(256 * 1024));
ch.config().setOption(ChannelOption.SO_RCVBUF, 256 * 1024);
ch.config().setOption(ChannelOption.SO_BACKLOG, 1024);
ch.pipeline()
.addLast("readTimeoutHandler", new ReadTimeoutHandler(600, TimeUnit.SECONDS))
.addLast("writeTimeoutHandler", new WriteTimeoutHandler(600, TimeUnit.SECONDS));
ch.pipeline().addLast("protoPender", new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast("lengthDecode", new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast("handshakeHandler", decoder);

// be aware of channel closing
ch.closeFuture();
}
}).option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000)
.option(ChannelOption.MESSAGE_SIZE_ESTIMATOR, DefaultMessageSizeEstimator.DEFAULT);
return b.connect("127.0.0.1", port).sync().channel();
}

@After
public void destroy() {
executorService.shutdownNow();
Args.clearParam();
FileUtil.deleteDir(new File(dbPath));
}
}
38 changes: 10 additions & 28 deletions src/test/java/org/tron/core/net/node/BroadTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -18,6 +19,7 @@
import org.tron.common.overlay.client.PeerClient;
import org.tron.common.overlay.discover.Node;
import org.tron.common.overlay.message.Message;
import org.tron.common.overlay.server.Channel;
import org.tron.common.overlay.server.ChannelManager;
import org.tron.common.overlay.server.MessageQueue;
import org.tron.common.overlay.server.SyncPool;
Expand All @@ -27,6 +29,7 @@
import org.tron.core.capsule.BlockCapsule;
import org.tron.core.config.DefaultConfig;
import org.tron.core.config.args.Args;
import org.tron.core.db.ByteArrayWrapper;
import org.tron.core.db.Manager;
import org.tron.core.net.message.BlockMessage;
import org.tron.core.net.message.MessageTypes;
Expand Down Expand Up @@ -242,10 +245,6 @@ private void prepare() {
ReflectUtils.setFieldValue(node, "isAdvertiseActive", false);
ReflectUtils.setFieldValue(node, "isFetchActive", false);

// ScheduledExecutorService mainWorker = ReflectUtils
// .getFieldValue(channelManager, "mainWorker");
// mainWorker.shutdownNow();

Node node = new Node(
"enode://e437a4836b77ad9d9ffe73ee782ef2614e6d8370fcf62191a6e488276e23717147073a7ce0b444d485fff5a0c34c4577251a7a990cf80d8542e21b95aa8c5e6c@127.0.0.1:17889");
new Thread(new Runnable() {
Expand All @@ -254,30 +253,13 @@ public void run() {
peerClient.connect(node.getHost(), node.getPort(), node.getHexId());
}
}).start();
Thread.sleep(5000);
// List<Channel> newChanelList = ReflectUtils.getFieldValue(channelManager, "newPeers");
// int tryTimes = 0;
// while (CollectionUtils.isEmpty(newChanelList) && ++tryTimes < 10) {
// Thread.sleep(1000);
// }
// logger.info("newChanelList size : {}", newChanelList.size());

// Field activePeersField = channelManager.getClass().getDeclaredField("activePeers");
// activePeersField.setAccessible(true);
// Map<ByteArrayWrapper, Channel> activePeersMap = (Map<ByteArrayWrapper, Channel>) activePeersField
// .get(channelManager);
//
// Field apField = pool.getClass().getDeclaredField("activePeers");
// apField.setAccessible(true);
// List<PeerConnection> activePeers = (List<PeerConnection>) apField.get(pool);

// for (Channel channel : newChanelList) {
// activePeersMap.put(channel.getNodeIdWrapper(), channel);
// activePeers.add((PeerConnection) channel);
// }
// apField.set(pool, activePeers);
// activePeersField.set(channelManager, activePeersMap);
//
Thread.sleep(2000);
Map<ByteArrayWrapper, Channel> activePeers = ReflectUtils
.getFieldValue(channelManager, "activePeers");
int tryTimes = 0;
while (MapUtils.isEmpty(activePeers) && ++tryTimes < 10) {
Thread.sleep(1000);
}
go = true;
} catch (Exception e) {
e.printStackTrace();
Expand Down
Loading