From 4784f694054d3fce2c37d55f1bc6e23b6bd00787 Mon Sep 17 00:00:00 2001 From: wubin01 Date: Thu, 10 May 2018 11:06:25 +0800 Subject: [PATCH 1/4] add trxs msg --- .../tron/common/overlay/message/Message.java | 28 ++++----- .../core/net/message/TransactionsMessage.java | 60 ++++++------------- .../java/org/tron/core/net/node/NodeImpl.java | 2 - src/main/protos/core/Tron.proto | 4 ++ 4 files changed, 36 insertions(+), 58 deletions(-) diff --git a/src/main/java/org/tron/common/overlay/message/Message.java b/src/main/java/org/tron/common/overlay/message/Message.java index e3a6b50ae55..b1417ad4fa5 100644 --- a/src/main/java/org/tron/common/overlay/message/Message.java +++ b/src/main/java/org/tron/common/overlay/message/Message.java @@ -9,11 +9,8 @@ import org.tron.common.utils.Sha256Hash; import org.tron.core.net.message.MessageTypes; - public abstract class Message { - protected static final Logger logger = LoggerFactory.getLogger("Net"); - protected boolean unpacked; protected byte[] data; protected byte type; @@ -32,7 +29,6 @@ public Message(byte type, byte[] packed) { unpacked = false; } - public ByteBuf getSendData(){ return Unpooled.wrappedBuffer(ArrayUtils.add(this.getData(), 0 ,type)); } @@ -41,13 +37,25 @@ public Sha256Hash getMessageId() { return Sha256Hash.of(getData()); } - public abstract byte[] getData(); + public byte[] getData(){ + return this.data; + } + + public MessageTypes getType(){ + return MessageTypes.fromByte(this.type); + } + public abstract Class getAnswerMessage(); + + @Override public String toString() { return "[Message Type: " + getType() + ", Message Hash: " + getMessageId() + "]"; } - public abstract Class getAnswerMessage(); + @Override + public int hashCode() { + return Arrays.hashCode(data); + } @Override public boolean equals(Object o) { @@ -60,12 +68,4 @@ public boolean equals(Object o) { Message message = (Message) o; return Arrays.equals(data, message.data); } - - @Override - public int hashCode() { - return Arrays.hashCode(data); - } - - public abstract MessageTypes getType(); - } \ No newline at end of file diff --git a/src/main/java/org/tron/core/net/message/TransactionsMessage.java b/src/main/java/org/tron/core/net/message/TransactionsMessage.java index 690b7de9dba..947608f77b9 100644 --- a/src/main/java/org/tron/core/net/message/TransactionsMessage.java +++ b/src/main/java/org/tron/core/net/message/TransactionsMessage.java @@ -1,35 +1,39 @@ package org.tron.core.net.message; -import com.google.protobuf.InvalidProtocolBufferException; -import java.util.ArrayList; import java.util.List; -import org.tron.protos.Protocol.Items; + +import org.tron.core.exception.P2pException; +import org.tron.protos.Protocol; import org.tron.protos.Protocol.Transaction; public class TransactionsMessage extends TronMessage { - private List trxs = new ArrayList(); + private Protocol.Transactions transactions; public TransactionsMessage(List trxs) { - this.trxs = trxs; - unpacked = true; + Protocol.Transactions.Builder builder = Protocol.Transactions.newBuilder(); + trxs.forEach(trx -> builder.addTransactions(trx)); + this.transactions = builder.build(); this.type = MessageTypes.TRXS.asByte(); + this.data = this.transactions.toByteArray(); } - public TransactionsMessage(byte[] packed) { - super(packed); - this.type = MessageTypes.TRXS.asByte(); + public TransactionsMessage(byte[] data) throws Exception{ + try { + this.type = MessageTypes.TRXS.asByte(); + this.data = data; + this.transactions = Protocol.Transactions.parseFrom(data); + }catch (Exception e){ + throw new P2pException(P2pException.TypeEnum.PARSE_MESSAGE_FAILED); + } } - public TransactionsMessage() { - this.type = MessageTypes.TRXS.asByte(); + public Protocol.Transactions getTransactions() { + return transactions; } @Override public byte[] getData() { - if (data == null) { - pack(); - } return data; } @@ -47,32 +51,4 @@ public Class getAnswerMessage() { public MessageTypes getType() { return MessageTypes.fromByte(this.type); } - - public List getTransactions() { - unPack(); - return trxs; - } - - private void pack() { - Items.Builder itemsBuilder = Items.newBuilder(); - itemsBuilder.setType(Items.ItemType.TRX); - itemsBuilder.addAllTransactions(this.trxs); - this.data = itemsBuilder.build().toByteArray(); - } - - private synchronized void unPack() { - if (unpacked) { - return; - } - try { - Items items = Items.parseFrom(data); - if (items.getType() == Items.ItemType.TRX) { - trxs = items.getTransactionsList(); - } - } catch (InvalidProtocolBufferException e) { - logger.debug(e.getMessage()); - } - - unpacked = true; - } } diff --git a/src/main/java/org/tron/core/net/node/NodeImpl.java b/src/main/java/org/tron/core/net/node/NodeImpl.java index 9162af06788..4090dabfc35 100644 --- a/src/main/java/org/tron/core/net/node/NodeImpl.java +++ b/src/main/java/org/tron/core/net/node/NodeImpl.java @@ -118,8 +118,6 @@ void sendFetch() { } } - - private ScheduledExecutorService logExecutor = Executors.newSingleThreadScheduledExecutor(); //public diff --git a/src/main/protos/core/Tron.proto b/src/main/protos/core/Tron.proto index 3f032cb7339..2fe64478dbf 100644 --- a/src/main/protos/core/Tron.proto +++ b/src/main/protos/core/Tron.proto @@ -166,6 +166,10 @@ message Transaction { repeated Result ret = 5; } +message Transactions { + repeated Transaction transactions = 1; +} + message BlockHeader { message raw { int64 timestamp = 1; From ed1ad2de4e43a17d05016e49a6fc5239812a5a70 Mon Sep 17 00:00:00 2001 From: wubin01 Date: Thu, 10 May 2018 12:07:20 +0800 Subject: [PATCH 2/4] add trx msg handle --- .../tron/common/overlay/message/Message.java | 2 + .../java/org/tron/core/net/node/NodeImpl.java | 59 +++++++++++++------ 2 files changed, 43 insertions(+), 18 deletions(-) diff --git a/src/main/java/org/tron/common/overlay/message/Message.java b/src/main/java/org/tron/common/overlay/message/Message.java index b1417ad4fa5..33c20bad39e 100644 --- a/src/main/java/org/tron/common/overlay/message/Message.java +++ b/src/main/java/org/tron/common/overlay/message/Message.java @@ -11,6 +11,8 @@ public abstract class Message { + protected static final Logger logger = LoggerFactory.getLogger("Message"); + protected boolean unpacked; protected byte[] data; protected byte type; diff --git a/src/main/java/org/tron/core/net/node/NodeImpl.java b/src/main/java/org/tron/core/net/node/NodeImpl.java index 4090dabfc35..263113bd0a4 100644 --- a/src/main/java/org/tron/core/net/node/NodeImpl.java +++ b/src/main/java/org/tron/core/net/node/NodeImpl.java @@ -8,6 +8,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import io.netty.util.internal.ConcurrentSet; import java.util.*; @@ -40,24 +41,17 @@ import org.tron.core.config.Parameter.ChainConstant; import org.tron.core.config.Parameter.NetConstants; import org.tron.core.config.Parameter.NodeConstant; +import org.tron.core.db.api.pojo.Transaction; import org.tron.core.exception.BadBlockException; import org.tron.core.exception.BadTransactionException; import org.tron.core.exception.StoreException; import org.tron.core.exception.TraitorPeerException; import org.tron.core.exception.TronException; import org.tron.core.exception.UnLinkedBlockException; -import org.tron.core.net.message.BlockInventoryMessage; -import org.tron.core.net.message.BlockMessage; -import org.tron.core.net.message.ChainInventoryMessage; -import org.tron.core.net.message.FetchInvDataMessage; -import org.tron.core.net.message.InventoryMessage; -import org.tron.core.net.message.ItemNotFound; -import org.tron.core.net.message.MessageTypes; -import org.tron.core.net.message.SyncBlockChainMessage; -import org.tron.core.net.message.TransactionMessage; -import org.tron.core.net.message.TronMessage; +import org.tron.core.net.message.*; import org.tron.core.net.peer.PeerConnection; import org.tron.core.net.peer.PeerConnectionDelegate; +import org.tron.protos.Protocol; import org.tron.protos.Protocol.Inventory.InventoryType; @Slf4j @@ -67,14 +61,18 @@ public class NodeImpl extends PeerConnectionDelegate implements Node { @Autowired private SyncPool pool; - Cache TrxCache = CacheBuilder.newBuilder() + private Cache TrxCache = CacheBuilder.newBuilder() .maximumSize(10000).expireAfterWrite(600, TimeUnit.SECONDS) .recordStats().build(); - Cache BlockCache = CacheBuilder.newBuilder() + private Cache BlockCache = CacheBuilder.newBuilder() .maximumSize(10).expireAfterWrite(60, TimeUnit.SECONDS) .recordStats().build(); + private int maxTrxsSize = 1_000_000; + + private int maxTrxsCnt = 100; + class InvToSend { private HashMap>> send @@ -211,6 +209,8 @@ public void onMessage(PeerConnection peer, TronMessage msg) { break; case TRX: onHandleTransactionMessage(peer, (TransactionMessage) msg); + case TRXS: + onHandleTransactionsMessage(peer, (TransactionsMessage) msg); break; case SYNC_BLOCK_CHAIN: onHandleSyncBlockChainMessage(peer, (SyncBlockChainMessage) msg); @@ -766,6 +766,13 @@ private void onHandleTransactionMessage(PeerConnection peer, TransactionMessage } } + private void onHandleTransactionsMessage(PeerConnection peer, TransactionsMessage msg){ + logger.info("onHandleTransactionsMessage, size = {}, peer {}", + msg.getTransactions().getTransactionsList().size(), peer.getNode().getHost()); + msg.getTransactions().getTransactionsList().forEach(transaction -> + onHandleTransactionMessage(peer, new TransactionMessage(transaction))); + } + private void onHandleSyncBlockChainMessage(PeerConnection peer, SyncBlockChainMessage syncMsg) { //logger.info("on handle sync block chain message"); peer.setTronState(TronState.SYNCING); @@ -816,6 +823,10 @@ private void onHandleFetchDataMessage(PeerConnection peer, FetchInvDataMessage f BlockCapsule block = null; + List transactions = Lists.newArrayList(); + + int size = 0; + for (Sha256Hash hash : fetchInvDataMsg.getHashList()) { Message msg; @@ -830,20 +841,32 @@ private void onHandleFetchDataMessage(PeerConnection peer, FetchInvDataMessage f msg = del.getData(hash, type); } - if (msg != null) { - if (type.equals(MessageTypes.BLOCK)) { - block = ((BlockMessage) msg).getBlockCapsule(); - } - peer.sendMessage(msg); - } else { + if (msg == null){ logger.error("fetch message {} {} failed.", type, hash); peer.sendMessage(new ItemNotFound()); + return; + } + + if (type.equals(MessageTypes.BLOCK)) { + block = ((BlockMessage) msg).getBlockCapsule(); + peer.sendMessage(msg); + }else { + transactions.add(((TransactionMessage)msg).getTransaction()); + size += ((TransactionMessage)msg).getTransaction().getSerializedSize(); + if (transactions.size() % maxTrxsCnt == 0 || size > maxTrxsSize) { + peer.sendMessage(new TransactionsMessage(transactions)); + transactions = Lists.newArrayList(); + size = 0; + } } } if (block != null) { updateBlockWeBothHave(peer, block); } + if (transactions.size() > 0){ + peer.sendMessage(new TransactionsMessage(transactions)); + } } private void banTraitorPeer(PeerConnection peer, ReasonCode reason) { From 03d3e3c290774069364bd1e72c28ccb10fa7a091 Mon Sep 17 00:00:00 2001 From: wubin01 Date: Thu, 10 May 2018 12:29:52 +0800 Subject: [PATCH 3/4] mdf TransactionsMessage --- .../tron/core/net/message/TransactionsMessage.java | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/src/main/java/org/tron/core/net/message/TransactionsMessage.java b/src/main/java/org/tron/core/net/message/TransactionsMessage.java index 947608f77b9..9ea8aac50ad 100644 --- a/src/main/java/org/tron/core/net/message/TransactionsMessage.java +++ b/src/main/java/org/tron/core/net/message/TransactionsMessage.java @@ -32,14 +32,9 @@ public Protocol.Transactions getTransactions() { return transactions; } - @Override - public byte[] getData() { - return data; - } - @Override public String toString() { - return null; + return "trx_size:" + this.transactions.getTransactionsList().size(); } @Override @@ -47,8 +42,4 @@ public Class getAnswerMessage() { return null; } - @Override - public MessageTypes getType() { - return MessageTypes.fromByte(this.type); - } } From 227e894fec2ee09b768c804a8d6484ccdd8c546b Mon Sep 17 00:00:00 2001 From: wubin01 Date: Thu, 10 May 2018 14:54:15 +0800 Subject: [PATCH 4/4] mdf tron msg --- .../tron/common/overlay/message/MessageFactory.java | 3 --- .../org/tron/core/net/message/TransactionsMessage.java | 10 +++------- .../org/tron/core/net/message/TronMessageFactory.java | 2 +- 3 files changed, 4 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/tron/common/overlay/message/MessageFactory.java b/src/main/java/org/tron/common/overlay/message/MessageFactory.java index 431d227ce65..bb02f375980 100644 --- a/src/main/java/org/tron/common/overlay/message/MessageFactory.java +++ b/src/main/java/org/tron/common/overlay/message/MessageFactory.java @@ -16,9 +16,6 @@ public abstract class MessageFactory { - public static String ERR_NO_SUCH_MSG = "No such message"; - public static String ERR_PARSE_FAILED = "parse message failed"; - protected abstract Message create(byte[] data) throws Exception; } diff --git a/src/main/java/org/tron/core/net/message/TransactionsMessage.java b/src/main/java/org/tron/core/net/message/TransactionsMessage.java index 9ea8aac50ad..5e7417c9fb4 100644 --- a/src/main/java/org/tron/core/net/message/TransactionsMessage.java +++ b/src/main/java/org/tron/core/net/message/TransactionsMessage.java @@ -19,13 +19,9 @@ public TransactionsMessage(List trxs) { } public TransactionsMessage(byte[] data) throws Exception{ - try { - this.type = MessageTypes.TRXS.asByte(); - this.data = data; - this.transactions = Protocol.Transactions.parseFrom(data); - }catch (Exception e){ - throw new P2pException(P2pException.TypeEnum.PARSE_MESSAGE_FAILED); - } + this.type = MessageTypes.TRXS.asByte(); + this.data = data; + this.transactions = Protocol.Transactions.parseFrom(data); } public Protocol.Transactions getTransactions() { diff --git a/src/main/java/org/tron/core/net/message/TronMessageFactory.java b/src/main/java/org/tron/core/net/message/TronMessageFactory.java index 38bf1737f59..4d9aba5ec80 100644 --- a/src/main/java/org/tron/core/net/message/TronMessageFactory.java +++ b/src/main/java/org/tron/core/net/message/TronMessageFactory.java @@ -27,7 +27,7 @@ public TronMessage create(byte[] data) throws Exception{ private TronMessage create(byte type, byte[] packed) throws Exception{ MessageTypes receivedTypes = MessageTypes.fromByte(type); if (receivedTypes == null){ - throw new RuntimeException(MessageFactory.ERR_NO_SUCH_MSG + ", type=" + type); + throw new P2pException(P2pException.TypeEnum.NO_SUCH_MESSAGE, "type=" + type); } switch (receivedTypes) { case TRX: