Skip to content

Commit

Permalink
Merge pull request #155 from semuxgo/p2p-block-parts
Browse files Browse the repository at this point in the history
Net: implement the missing BLOCK_PARTS handlers
  • Loading branch information
semuxgo committed Jun 30, 2019
2 parents a7d2534 + a2b5450 commit e823836
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 22 deletions.
42 changes: 33 additions & 9 deletions src/main/java/org/semux/consensus/SemuxSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.semux.net.msg.Message;
import org.semux.net.msg.ReasonCode;
import org.semux.net.msg.consensus.BlockMessage;
import org.semux.net.msg.consensus.BlockPartsMessage;
import org.semux.net.msg.consensus.GetBlockMessage;
import org.semux.util.ByteArray;
import org.semux.util.TimeUtil;
Expand Down Expand Up @@ -206,6 +207,16 @@ public boolean isRunning() {
return isRunning.get();
}

protected void addBlock(Block block, Channel channel) {
synchronized (lock) {
if (toDownload.remove(block.getNumber())) {
growToDownloadQueue();
}
toComplete.remove(block.getNumber());
toProcess.add(Pair.of(block, channel));
}
}

@Override
public void onMessage(Channel channel, Message msg) {
if (!isRunning()) {
Expand All @@ -216,19 +227,32 @@ public void onMessage(Channel channel, Message msg) {
case BLOCK: {
BlockMessage blockMsg = (BlockMessage) msg;
Block block = blockMsg.getBlock();
synchronized (lock) {
if (toDownload.remove(block.getNumber())) {
growToDownloadQueue();
}
toComplete.remove(block.getNumber());
toProcess.add(Pair.of(block, channel));
}
addBlock(block, channel);
break;
}
case BLOCK_HEADER: {
// TODO implement block header
case BLOCK_PARTS: {
// try re-construct a block
BlockPartsMessage blockPartsMsg = (BlockPartsMessage) msg;
List<Block.BlockPart> parts = Block.BlockPart.decode(blockPartsMsg.getParts());
List<byte[]> data = blockPartsMsg.getData();
if (parts.size() != data.size()) {
logger.debug("Parts id and data do not match");
break;
}

// We need header, transactions, and votes
if (parts.get(0) != Block.BlockPart.HEADER || parts.get(1) != Block.BlockPart.TRANSACTIONS
|| parts.get(0) != Block.BlockPart.VOTES) {
try {
Block block = Block.fromComponents(data.get(0), data.get(1), null, data.get(2));
addBlock(block, channel);
} catch (Exception e) {
logger.debug("Failed to parse a block from components", e);
}
}
break;
}
case BLOCK_HEADER: // deprecated
default: {
break;
}
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/org/semux/core/Block.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,23 @@
public class Block {

public enum BlockPart {
HEADER(1 << 0), TRANSACTIONS(1 << 1), RECEIPTS(1 << 2), VOTES(1 << 3);
HEADER(1 << 0), TRANSACTIONS(1 << 1), RESULTS(1 << 2), VOTES(1 << 3);

private int code;

BlockPart(int code) {
this.code = code;
}

public static int parts(BlockPart... parts) {
public static int encode(BlockPart... parts) {
int result = 0;
for (BlockPart part : parts) {
result |= part.code;
}
return result;
}

public List<BlockPart> parts(int parts) {
public static List<BlockPart> decode(int parts) {
List<BlockPart> result = new ArrayList<>();
// NOTE: values() returns an array containing all of the values of the enum type
// in the order they are declared.
Expand Down
45 changes: 36 additions & 9 deletions src/main/java/org/semux/net/SemuxP2pHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
import org.semux.net.msg.ReasonCode;
import org.semux.net.msg.consensus.BlockHeaderMessage;
import org.semux.net.msg.consensus.BlockMessage;
import org.semux.net.msg.consensus.BlockPartsMessage;
import org.semux.net.msg.consensus.GetBlockHeaderMessage;
import org.semux.net.msg.consensus.GetBlockMessage;
import org.semux.net.msg.consensus.GetBlockPartsMessage;
import org.semux.net.msg.consensus.NewHeightMessage;
import org.semux.net.msg.p2p.DisconnectMessage;
import org.semux.net.msg.p2p.GetNodesMessage;
Expand Down Expand Up @@ -244,6 +246,8 @@ public void channelRead0(final ChannelHandlerContext ctx, Message msg) throws In
case BLOCK:
case GET_BLOCK_HEADER:
case BLOCK_HEADER:
case GET_BLOCK_PARTS:
case BLOCK_PARTS:
onSync(msg);
break;

Expand Down Expand Up @@ -436,24 +440,47 @@ protected void onSync(Message msg) {
channel.getMessageQueue().sendMessage(new BlockMessage(block));
break;
}
case BLOCK: {
sync.onMessage(channel, msg);
break;
}
case GET_BLOCK_HEADER: {
GetBlockHeaderMessage m = (GetBlockHeaderMessage) msg;
BlockHeader header = chain.getBlockHeader(m.getNumber());
channel.getMessageQueue().sendMessage(new BlockHeaderMessage(header));
break;
}
case BLOCK_HEADER: {
sync.onMessage(channel, msg);
case GET_BLOCK_PARTS: {
GetBlockPartsMessage m = (GetBlockPartsMessage) msg;
long number = m.getNumber();
int parts = m.getParts();

List<byte[]> partsSerilized = new ArrayList<>();
Block block = chain.getBlock(number);
for (Block.BlockPart part : Block.BlockPart.decode(parts)) {
switch (part) {
case HEADER:
partsSerilized.add(block.getEncodedHeader());
break;
case TRANSACTIONS:
partsSerilized.add(block.getEncodedTransactions());
break;
case RESULTS:
partsSerilized.add(block.getEncodedResults());
break;
case VOTES:
partsSerilized.add(block.getEncodedVotes());
break;
default:
throw new UnreachableException();
}
}

channel.getMessageQueue().sendMessage(new BlockPartsMessage(number, parts, partsSerilized));
break;
}
case GET_BLOCK_PARTS:
case BLOCK_PARTS:
// TODO: add handler
case BLOCK:
case BLOCK_HEADER:
case BLOCK_PARTS: {
sync.onMessage(channel, msg);
break;
}
default:
throw new UnreachableException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class BlockPartsMessage extends Message {
private final int parts;
private final List<byte[]> data;

public BlockPartsMessage(int number, int parts, List<byte[]> data) {
public BlockPartsMessage(long number, int parts, List<byte[]> data) {
super(MessageCode.BLOCK_PARTS, null);

this.number = number;
Expand Down

0 comments on commit e823836

Please sign in to comment.