Skip to content

Commit

Permalink
Channel peering test passing
Browse files Browse the repository at this point in the history
  • Loading branch information
fireduck64 committed Aug 30, 2019
1 parent e571864 commit 1e10874
Show file tree
Hide file tree
Showing 10 changed files with 292 additions and 32 deletions.
18 changes: 17 additions & 1 deletion BUILD
Expand Up @@ -107,6 +107,22 @@ java_test(
],
)

java_test(
name = "channel_peer_test",
srcs = ["test/ChannelPeerTest.java", "test/TestUtil.java"],
test_class = "channels.ChannelPeerTest",
size="medium",
deps = [
"@junit_junit//jar",
"@snowblossom//lib:lib",
"@snowblossom//client:client",
"@duckutil//:duckutil_lib",
":channelslib",
":protolib",
],
)


java_test(
name = "channel_test",
srcs = ["test/ChannelTest.java"],
Expand All @@ -124,7 +140,7 @@ java_test(

java_test(
name = "channel_validation_test",
srcs = ["test/ChannelValidationTest.java"],
srcs = ["test/ChannelValidationTest.java", "test/TestUtil.java"],
test_class = "channels.ChannelValidationTest",
size="medium",
deps = [
Expand Down
2 changes: 1 addition & 1 deletion src/ChannelGlobals.java
Expand Up @@ -44,7 +44,7 @@ public class ChannelGlobals

public static final long BLOCK_CHUNK_HEADER_DOWNLOAD_SIZE=100L;

public static final long CHANNEL_TIP_SEND_MS = 30000L;
public static final long CHANNEL_TIP_SEND_MS = 3000L;
}


Expand Down
64 changes: 62 additions & 2 deletions src/ChannelLink.java
Expand Up @@ -51,6 +51,7 @@ public ChannelLink(PeerLink peer_link, ChannelID cid, ChannelContext ctx)
client_side = true;
this.peer_link = peer_link;
this.cid = cid;
this.ctx = ctx;

sink = peer_link.getChannelAsyncStub().subscribePeering(this);

Expand Down Expand Up @@ -129,6 +130,7 @@ public void onNext(ChannelPeerMessage msg)
}
else
{
logger.log(Level.INFO, String.format("Client asked to scribe to channel %s", cid.asString()));
ctx.addLink(this);
}
}
Expand All @@ -151,13 +153,60 @@ public void onNext(ChannelPeerMessage msg)
//TODO - Record that peer is actually sending a tip in peer db
//TODO - import peers from tip

ChannelBlockHeader header = ChannelValidation.checkBlockHeaderBasics(cid, tip.getBlockHeader());
considerChannelHeader(new ChainHash(tip.getBlockHeader().getMessageId()), header);
if (tip.getBlockHeader().getMessageId().size() > 0)
{

ChannelBlockHeader header = ChannelValidation.checkBlockHeaderBasics(cid, tip.getBlockHeader());
ChainHash hash = new ChainHash(tip.getBlockHeader().getMessageId());
logger.info(String.format("Channel %s got tip from remote %d %s ", cid.asString(), header.getBlockHeight(), hash.toString()));
considerChannelHeader(hash, header);
}
}
else if (msg.hasHeader())
{
ChannelBlockHeader header = ChannelValidation.checkBlockHeaderBasics(cid, msg.getHeader());
considerChannelHeader(new ChainHash(msg.getHeader().getMessageId()), header);
}
else if (msg.hasReqBlock())
{
ChainHash desired_hash = null;
RequestBlock req = msg.getReqBlock();
if (req.getBlockHash().size() == 0)
{
desired_hash = ctx.db.getBlockHashAtHeight(req.getBlockHeight());
}
else
{
desired_hash = new ChainHash(req.getBlockHash());
}
ChannelBlock blk = ctx.db.getBlockMap().get(desired_hash.getBytes());

writeMessage( ChannelPeerMessage.newBuilder()
.setChannelId( cid.getBytes())
.setBlock(blk)
.build());

}

else if (msg.hasReqHeader())
{
ChainHash desired_hash = null;
RequestBlock req = msg.getReqHeader();
if (req.getBlockHash().size() == 0)
{
desired_hash = ctx.db.getBlockHashAtHeight(req.getBlockHeight());
}
else
{
desired_hash = new ChainHash(req.getBlockHash());
}
ChannelBlockSummary summary = ctx.db.getBlockSummaryMap().get(desired_hash.getBytes());

writeMessage( ChannelPeerMessage.newBuilder()
.setChannelId( cid.getBytes())
.setHeader(summary.getSignedHeader())
.build());

}
else if (msg.hasBlock())
{
Expand All @@ -177,6 +226,7 @@ else if (msg.hasBlock())
if (ctx.block_ingestor.reserveBlock(target))
{
writeMessage( ChannelPeerMessage.newBuilder()
.setChannelId( cid.getBytes())
.setReqBlock(
RequestBlock.newBuilder().setBlockHash(target.getBytes()).build())
.build());
Expand All @@ -192,6 +242,10 @@ else if (msg.hasBlock())
throw(ve);
}
}
else
{
logger.info("Unhandled message: " + msg);
}

}
catch(Throwable t)
Expand All @@ -214,6 +268,10 @@ private void considerChannelHeader(ChainHash block_hash, ChannelBlockHeader head
peer_block_map.put(header.getBlockHeight(), block_hash);
}
// if we don't have this block
if (ctx == null) throw new RuntimeException("ctx");
if (ctx.db == null) throw new RuntimeException("db");
if (ctx.db.getBlockSummaryMap() == null) throw new RuntimeException("map");
if (block_hash == null) throw new RuntimeException("block_hash");
if (ctx.db.getBlockSummaryMap().get(block_hash.getBytes())==null)
{
long height = header.getBlockHeight();
Expand All @@ -222,6 +280,7 @@ private void considerChannelHeader(ChainHash block_hash, ChannelBlockHeader head
if (ctx.block_ingestor.reserveBlock(block_hash))
{
writeMessage( ChannelPeerMessage.newBuilder()
.setChannelId( cid.getBytes())
.setReqBlock(
RequestBlock.newBuilder().setBlockHash(block_hash.getBytes()).build())
.build());
Expand Down Expand Up @@ -251,6 +310,7 @@ private void considerChannelHeader(ChainHash block_hash, ChannelBlockHeader head
}

writeMessage( ChannelPeerMessage.newBuilder()
.setChannelId( cid.getBytes())
.setReqHeader(
RequestBlock.newBuilder().setBlockHeight(next).build())
.build());
Expand Down
8 changes: 5 additions & 3 deletions src/ChannelNode.java
Expand Up @@ -46,6 +46,7 @@ public class ChannelNode
private ChannelPeerServer channel_peer_server;
private DHTCache dht_cache;
private DHTStratUtil dht_strat_util;
private ChannelTipSender channel_tip_sender;

private HashMap<ChannelID, SingleChannelDB> db_map;

Expand Down Expand Up @@ -92,8 +93,9 @@ public ChannelNode(Config config)
WalletUtil.saveWallet(wallet_db, wallet_path);
}


db = new ChannelsDB(config, new JRocksDB(config) );
db_map = new HashMap<>(16,0.5f);

net_ex = new NetworkExaminer(this);
peer_manager = new PeerManager(this);
dht_server = new DHTServer(this);
Expand All @@ -103,14 +105,14 @@ public ChannelNode(Config config)
channel_peer_server = new ChannelPeerServer(this);
dht_cache = new DHTCache(this);
dht_strat_util = new DHTStratUtil();

db_map = new HashMap<>(16,0.5f);
channel_tip_sender = new ChannelTipSender(this);

startServer();

dht_maintainer.start();
peer_manager.start();
channel_peer_maintainer.start();
channel_tip_sender.start();

String node_addr = AddressUtil.getAddressString(ChannelGlobals.NODE_TAG, getNodeID());
logger.info("My node address is: " + node_addr);
Expand Down
1 change: 1 addition & 0 deletions src/ChannelPeerMaintainer.java
Expand Up @@ -79,6 +79,7 @@ public void runPass() throws Exception

// TODO - save good peers in DB in case the DHT is trashed in some way
LinkedList<ChannelPeerInfo> possible_peers = getAllDHTPeers(cid, dht_element_lst, connected_set);
logger.info(String.format("Channel %s: connected %d possible %d", cid.asString(), links.size(), possible_peers.size()));
Collections.shuffle(possible_peers);

// TODO - be better about not hammering down/bad nodes with connects
Expand Down
31 changes: 19 additions & 12 deletions src/ChannelTipSender.java
Expand Up @@ -18,6 +18,8 @@ public class ChannelTipSender extends PeriodicThread
public ChannelTipSender(ChannelNode node)
{
super(500L);
this.setName("ChannelTipSender");
this.setDaemon(true);
this.node = node;

}
Expand Down Expand Up @@ -61,26 +63,31 @@ public void runPass()
}
for(ChannelID cid : send_set)
{

ChannelContext ctx = node.getChannelSubscriber().getContext(cid);
if (ctx != null)
{
ChannelBlockSummary head_sum = ctx.block_ingestor.getHead();
SignedMessage signed_header = null;
ChannelTip.Builder tip = ChannelTip.newBuilder();


if (head_sum != null)
{
List<ChannelLink> links = ctx.getLinks();
ChannelPeerMessage msg = ChannelPeerMessage.newBuilder()
.setChannelId( cid.getBytes())
.setTip(ChannelTip.newBuilder()
.setBlockHeader( head_sum.getSignedHeader() )
.build())
.build();

for(ChannelLink link : links)
{
link.writeMessage(msg);
}
signed_header = head_sum.getSignedHeader();
tip.setBlockHeader( signed_header );
}

List<ChannelLink> links = ctx.getLinks();
ChannelPeerMessage msg = ChannelPeerMessage.newBuilder()
.setChannelId( cid.getBytes())
.setTip(tip.build())
.build();

for(ChannelLink link : links)
{
link.writeMessage(msg);
}
}
synchronized(send_time_map)
{
Expand Down

0 comments on commit 1e10874

Please sign in to comment.