Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions src/main/java/org/tron/common/overlay/message/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
import org.tron.common.utils.Sha256Hash;
import org.tron.core.net.message.MessageTypes;


public abstract class Message {

protected static final Logger logger = LoggerFactory.getLogger("Net");
protected static final Logger logger = LoggerFactory.getLogger("Message");

protected boolean unpacked;
protected byte[] data;
Expand All @@ -32,7 +31,6 @@ public Message(byte type, byte[] packed) {
unpacked = false;
}


public ByteBuf getSendData(){
return Unpooled.wrappedBuffer(ArrayUtils.add(this.getData(), 0 ,type));
}
Expand All @@ -41,13 +39,25 @@ public Sha256Hash getMessageId() {
return Sha256Hash.of(getData());
}

public abstract byte[] getData();
public byte[] getData(){
return this.data;
}

public MessageTypes getType(){
return MessageTypes.fromByte(this.type);
}

public abstract Class<?> getAnswerMessage();

@Override
public String toString() {
return "[Message Type: " + getType() + ", Message Hash: " + getMessageId() + "]";
}

public abstract Class<?> getAnswerMessage();
@Override
public int hashCode() {
return Arrays.hashCode(data);
}

@Override
public boolean equals(Object o) {
Expand All @@ -60,12 +70,4 @@ public boolean equals(Object o) {
Message message = (Message) o;
return Arrays.equals(data, message.data);
}

@Override
public int hashCode() {
return Arrays.hashCode(data);
}

public abstract MessageTypes getType();

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@

public abstract class MessageFactory {

public static String ERR_NO_SUCH_MSG = "No such message";
public static String ERR_PARSE_FAILED = "parse message failed";

protected abstract Message create(byte[] data) throws Exception;

}
65 changes: 14 additions & 51 deletions src/main/java/org/tron/core/net/message/TransactionsMessage.java
Original file line number Diff line number Diff line change
@@ -1,78 +1,41 @@
package org.tron.core.net.message;

import com.google.protobuf.InvalidProtocolBufferException;
import java.util.ArrayList;
import java.util.List;
import org.tron.protos.Protocol.Items;

import org.tron.core.exception.P2pException;
import org.tron.protos.Protocol;
import org.tron.protos.Protocol.Transaction;

public class TransactionsMessage extends TronMessage {

private List<Transaction> trxs = new ArrayList<Transaction>();
private Protocol.Transactions transactions;

public TransactionsMessage(List<Transaction> trxs) {
this.trxs = trxs;
unpacked = true;
this.type = MessageTypes.TRXS.asByte();
}

public TransactionsMessage(byte[] packed) {
super(packed);
Protocol.Transactions.Builder builder = Protocol.Transactions.newBuilder();
trxs.forEach(trx -> builder.addTransactions(trx));
this.transactions = builder.build();
this.type = MessageTypes.TRXS.asByte();
this.data = this.transactions.toByteArray();
}

public TransactionsMessage() {
public TransactionsMessage(byte[] data) throws Exception{
this.type = MessageTypes.TRXS.asByte();
this.data = data;
this.transactions = Protocol.Transactions.parseFrom(data);
}

@Override
public byte[] getData() {
if (data == null) {
pack();
}
return data;
public Protocol.Transactions getTransactions() {
return transactions;
}

@Override
public String toString() {
return null;
return "trx_size:" + this.transactions.getTransactionsList().size();
}

@Override
public Class<?> getAnswerMessage() {
return null;
}

@Override
public MessageTypes getType() {
return MessageTypes.fromByte(this.type);
}

public List<Transaction> getTransactions() {
unPack();
return trxs;
}

private void pack() {
Items.Builder itemsBuilder = Items.newBuilder();
itemsBuilder.setType(Items.ItemType.TRX);
itemsBuilder.addAllTransactions(this.trxs);
this.data = itemsBuilder.build().toByteArray();
}

private synchronized void unPack() {
if (unpacked) {
return;
}
try {
Items items = Items.parseFrom(data);
if (items.getType() == Items.ItemType.TRX) {
trxs = items.getTransactionsList();
}
} catch (InvalidProtocolBufferException e) {
logger.debug(e.getMessage());
}

unpacked = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public TronMessage create(byte[] data) throws Exception{
private TronMessage create(byte type, byte[] packed) throws Exception{
MessageTypes receivedTypes = MessageTypes.fromByte(type);
if (receivedTypes == null){
throw new RuntimeException(MessageFactory.ERR_NO_SUCH_MSG + ", type=" + type);
throw new P2pException(P2pException.TypeEnum.NO_SUCH_MESSAGE, "type=" + type);
}
switch (receivedTypes) {
case TRX:
Expand Down
61 changes: 41 additions & 20 deletions src/main/java/org/tron/core/net/node/NodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.netty.util.internal.ConcurrentSet;

import java.util.*;
Expand Down Expand Up @@ -40,24 +41,17 @@
import org.tron.core.config.Parameter.ChainConstant;
import org.tron.core.config.Parameter.NetConstants;
import org.tron.core.config.Parameter.NodeConstant;
import org.tron.core.db.api.pojo.Transaction;
import org.tron.core.exception.BadBlockException;
import org.tron.core.exception.BadTransactionException;
import org.tron.core.exception.StoreException;
import org.tron.core.exception.TraitorPeerException;
import org.tron.core.exception.TronException;
import org.tron.core.exception.UnLinkedBlockException;
import org.tron.core.net.message.BlockInventoryMessage;
import org.tron.core.net.message.BlockMessage;
import org.tron.core.net.message.ChainInventoryMessage;
import org.tron.core.net.message.FetchInvDataMessage;
import org.tron.core.net.message.InventoryMessage;
import org.tron.core.net.message.ItemNotFound;
import org.tron.core.net.message.MessageTypes;
import org.tron.core.net.message.SyncBlockChainMessage;
import org.tron.core.net.message.TransactionMessage;
import org.tron.core.net.message.TronMessage;
import org.tron.core.net.message.*;
import org.tron.core.net.peer.PeerConnection;
import org.tron.core.net.peer.PeerConnectionDelegate;
import org.tron.protos.Protocol;
import org.tron.protos.Protocol.Inventory.InventoryType;

@Slf4j
Expand All @@ -67,14 +61,18 @@ public class NodeImpl extends PeerConnectionDelegate implements Node {
@Autowired
private SyncPool pool;

Cache<Sha256Hash, TransactionMessage> TrxCache = CacheBuilder.newBuilder()
private Cache<Sha256Hash, TransactionMessage> TrxCache = CacheBuilder.newBuilder()
.maximumSize(10000).expireAfterWrite(600, TimeUnit.SECONDS)
.recordStats().build();

Cache<Sha256Hash, BlockMessage> BlockCache = CacheBuilder.newBuilder()
private Cache<Sha256Hash, BlockMessage> BlockCache = CacheBuilder.newBuilder()
.maximumSize(10).expireAfterWrite(60, TimeUnit.SECONDS)
.recordStats().build();

private int maxTrxsSize = 1_000_000;

private int maxTrxsCnt = 100;

class InvToSend {

private HashMap<PeerConnection, HashMap<InventoryType, LinkedList<Sha256Hash>>> send
Expand Down Expand Up @@ -118,8 +116,6 @@ void sendFetch() {
}
}



private ScheduledExecutorService logExecutor = Executors.newSingleThreadScheduledExecutor();

//public
Expand Down Expand Up @@ -213,6 +209,8 @@ public void onMessage(PeerConnection peer, TronMessage msg) {
break;
case TRX:
onHandleTransactionMessage(peer, (TransactionMessage) msg);
case TRXS:
onHandleTransactionsMessage(peer, (TransactionsMessage) msg);
break;
case SYNC_BLOCK_CHAIN:
onHandleSyncBlockChainMessage(peer, (SyncBlockChainMessage) msg);
Expand Down Expand Up @@ -768,6 +766,13 @@ private void onHandleTransactionMessage(PeerConnection peer, TransactionMessage
}
}

private void onHandleTransactionsMessage(PeerConnection peer, TransactionsMessage msg){
logger.info("onHandleTransactionsMessage, size = {}, peer {}",
msg.getTransactions().getTransactionsList().size(), peer.getNode().getHost());
msg.getTransactions().getTransactionsList().forEach(transaction ->
onHandleTransactionMessage(peer, new TransactionMessage(transaction)));
}

private void onHandleSyncBlockChainMessage(PeerConnection peer, SyncBlockChainMessage syncMsg) {
//logger.info("on handle sync block chain message");
peer.setTronState(TronState.SYNCING);
Expand Down Expand Up @@ -818,6 +823,10 @@ private void onHandleFetchDataMessage(PeerConnection peer, FetchInvDataMessage f

BlockCapsule block = null;

List<Protocol.Transaction> transactions = Lists.newArrayList();

int size = 0;

for (Sha256Hash hash : fetchInvDataMsg.getHashList()) {

Message msg;
Expand All @@ -832,20 +841,32 @@ private void onHandleFetchDataMessage(PeerConnection peer, FetchInvDataMessage f
msg = del.getData(hash, type);
}

if (msg != null) {
if (type.equals(MessageTypes.BLOCK)) {
block = ((BlockMessage) msg).getBlockCapsule();
}
peer.sendMessage(msg);
} else {
if (msg == null){
logger.error("fetch message {} {} failed.", type, hash);
peer.sendMessage(new ItemNotFound());
return;
}

if (type.equals(MessageTypes.BLOCK)) {
block = ((BlockMessage) msg).getBlockCapsule();
peer.sendMessage(msg);
}else {
transactions.add(((TransactionMessage)msg).getTransaction());
size += ((TransactionMessage)msg).getTransaction().getSerializedSize();
if (transactions.size() % maxTrxsCnt == 0 || size > maxTrxsSize) {
peer.sendMessage(new TransactionsMessage(transactions));
transactions = Lists.newArrayList();
size = 0;
}
}
}

if (block != null) {
updateBlockWeBothHave(peer, block);
}
if (transactions.size() > 0){
peer.sendMessage(new TransactionsMessage(transactions));
}
}

private void banTraitorPeer(PeerConnection peer, ReasonCode reason) {
Expand Down
4 changes: 4 additions & 0 deletions src/main/protos/core/Tron.proto
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ message Transaction {
repeated Result ret = 5;
}

message Transactions {
repeated Transaction transactions = 1;
}

message BlockHeader {
message raw {
int64 timestamp = 1;
Expand Down