Skip to content

Commit

Permalink
Testing channel peering a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
fireduck64 committed Aug 29, 2019
1 parent aa3bfe3 commit 73a2f9c
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 32 deletions.
33 changes: 30 additions & 3 deletions protolib/channels.proto
Expand Up @@ -20,11 +20,39 @@ service ChannelService {
}

message ChannelPeerMessage
{
bytes channel_id = 1;
oneof z {
ChannelTip tip = 2;
RequestBlock req_block = 3;
ChannelBlock block = 4;

RequestBlock req_header = 5;
SignedMessage header = 6;

RequestContent req_content = 7;
ContentInfo content = 8;
}
}

message ChannelTip
{
SignedMessage block_header = 1;
// This allows us to gossip channel peers without depending on DHT
repeated ChannelPeerInfo peers = 2;
}

message RequestBlock
{
oneof z {
SignedMessage signed_update = 1;
bytes block_hash = 2;
int32 block_height = 3;
}
bytes channel_id = 2;
}

message RequestContent
{
bytes message_id = 1;
}

message StoreDHTRequest
Expand Down Expand Up @@ -194,7 +222,6 @@ message ContentInfo {
// A signed ChannelSettings is how a channel is founded
message ChannelSettings
{
bool active = 1;
string display_name = 2;

// These are folks who can create new blocks and do general mod functions like that
Expand Down
3 changes: 3 additions & 0 deletions src/ChannelGlobals.java
Expand Up @@ -38,6 +38,9 @@ public class ChannelGlobals
// 2500 channels, 100 elements each
public static final int DHT_CACHE_ELEMENTS = 2500 * 100;
public static final long DHT_CACHE_EXPIRE = 120000L;

// TODO - move this way up once we have saving wait for DHT peering to be ready
public static final long DHT_SAVE_CACHE_EXPIRE = 120000L;


}
Expand Down
13 changes: 13 additions & 0 deletions src/ChannelPeerMaintainer.java
Expand Up @@ -12,6 +12,7 @@
import java.util.logging.Logger;
import snowblossom.channels.proto.*;
import snowblossom.lib.AddressSpecHash;
import snowblossom.lib.ChainHash;
import snowblossom.lib.ValidationException;

/**
Expand All @@ -28,6 +29,8 @@ public class ChannelPeerMaintainer extends PeriodicThread
private static final int DESIRED_CHANNEL_PEERS = 5;
private static final int CONNECT_CHANNEL_PEERS_PER_PASS = 2;

private boolean first_pass_done;

public ChannelPeerMaintainer(ChannelNode node)
{
super(5000L);
Expand All @@ -44,6 +47,11 @@ public ChannelPeerMaintainer(ChannelNode node)
@Override
public void runPass() throws Exception
{
if (!first_pass_done)
{
Thread.sleep(7500);
first_pass_done=true;
}
ChannelPeerInfo my_info = node.getNetworkExaminer().createPeerInfo();

for(ChannelID cid : node.getChannelSubscriber().getChannelSet())
Expand All @@ -59,13 +67,17 @@ public void runPass() throws Exception
// TODO - actually get head settings
ChannelSettings settings = null;
List<ByteString> dht_element_lst = node.getDHTStratUtil().getDHTLocations(cid, settings);

// TODO - only bother doing the save once the DHT peering is up and running reasonably
saveDHT(cid, dht_element_lst, my_info);

if (links.size() < DESIRED_CHANNEL_PEERS)
{
Set<AddressSpecHash> connected_set = getConnectedNodeSet(links);
connected_set.add(node.getNodeID());


// TODO - save good peers in DB in case the DHT is trashed in some way
LinkedList<ChannelPeerInfo> possible_peers = getAllDHTPeers(cid, dht_element_lst, connected_set);
Collections.shuffle(possible_peers);

Expand Down Expand Up @@ -117,6 +129,7 @@ private void saveDHT(ChannelID cid, List<ByteString> dht_element_lst, ChannelPee
.setDesiredResultCount(0)
.setSignedDhtData(sm)
.build());
logger.info(String.format("DHT Saved %s for %s", new ChainHash(element_id), cid.asString()));

node.getDHTCache().markWrite(element_id);
}
Expand Down
1 change: 1 addition & 0 deletions src/ChannelSubscriber.java
Expand Up @@ -40,6 +40,7 @@ public ChannelContext openChannel(ChannelID cid)
if (doOpen)
{
cc.setResult(openChannelInternal(cid));
node.getChannelPeerMaintainer().wake();
}

return cc.get();
Expand Down
48 changes: 44 additions & 4 deletions src/ChannelValidation.java
Expand Up @@ -12,6 +12,7 @@
import snowblossom.channels.proto.ContentInfo;
import snowblossom.channels.proto.ContentReference;
import snowblossom.channels.proto.SignedMessage;
import snowblossom.channels.proto.DHTDataSet;
import snowblossom.channels.proto.SignedMessagePayload;
import snowblossom.lib.AddressSpecHash;
import snowblossom.lib.AddressUtil;
Expand Down Expand Up @@ -46,7 +47,7 @@ public static void checkBlockBasics(ChannelID chan_id, ChannelBlock blk, boolean
throw new ValidationException("Message id from initial settings must be channel id");
}

if (header.getSettings().getActive())
if (header.hasSettings())
{
throw new ValidationException("Must not have initial_settings and settings on block zero");
}
Expand Down Expand Up @@ -200,7 +201,7 @@ public static ChannelBlockSummary deepBlockValidation(ChannelID chan_id, Channel
{
throw new ValidationException("Must have prev block on non-zero block");
}
if (header.getSettings().getActive())
if (header.hasSettings())
{
throw new ValidationException("Block zero must not have settings");
}
Expand Down Expand Up @@ -228,7 +229,7 @@ public static ChannelBlockSummary deepBlockValidation(ChannelID chan_id, Channel
throw new ValidationException("Block signer not on signer list");
}

if (header.getSettings().getActive())
if (header.hasSettings())
{
if (!admin_signers.contains(signer.getBytes()))
{
Expand All @@ -252,7 +253,7 @@ public static ChannelBlockSummary deepBlockValidation(ChannelID chan_id, Channel

BigInteger work_sum = prev_work_sum.add( BigInteger.valueOf( header.getWeight() + 1L ) );

if (header.getSettings().getActive())
if (header.hasSettings())
{
settings = header.getSettings();
}
Expand All @@ -265,5 +266,44 @@ public static ChannelBlockSummary deepBlockValidation(ChannelID chan_id, Channel
return sum.build();

}


public static SignedMessagePayload validateDHTData(SignedMessage dht_data)
throws ValidationException
{
SignedMessagePayload payload = ChannelSigUtil.validateSignedMessage(dht_data);

if (!payload.hasDhtData())
{
throw new ValidationException("Payload is not dht_data");
}

AddressSpecHash signed_hash = AddressUtil.getHashForSpec(payload.getClaim());

if (!signed_hash.equals(payload.getDhtData().getPeerInfo().getAddressSpecHash()))
{
throw new ValidationException("Signer of DHT data does not match peer info");
}
if (payload.getDhtData().getElementId().size() != ChannelGlobals.DHT_ELEMENT_SIZE)
{
throw new ValidationException("Element id wrong length");
}

return payload;

}

public static void validateDHTDataSet(DHTDataSet ds, ByteString element_id)
throws ValidationException
{
for(SignedMessage sm : ds.getDhtDataList())
{
SignedMessagePayload payload = validateDHTData(sm);
if (!element_id.equals(payload.getDhtData().getElementId()))
{
throw new ValidationException("Not requested element_id");
}
}
}
}

19 changes: 2 additions & 17 deletions src/DHTCache.java
Expand Up @@ -24,7 +24,7 @@ public DHTCache(ChannelNode node)
{
this.node = node;
get_cache = new ExpiringLRUCache<>(ChannelGlobals.DHT_CACHE_ELEMENTS, ChannelGlobals.DHT_CACHE_EXPIRE);
save_cache = new ExpiringLRUCache<>(ChannelGlobals.DHT_CACHE_ELEMENTS, ChannelGlobals.DHT_CACHE_EXPIRE);
save_cache = new ExpiringLRUCache<>(ChannelGlobals.DHT_CACHE_ELEMENTS, ChannelGlobals.DHT_SAVE_CACHE_EXPIRE);
}

public boolean haveWritten(ByteString element_id)
Expand Down Expand Up @@ -92,22 +92,7 @@ public void onNext(DHTDataSet ds)
{
try
{
for(SignedMessage sm : ds.getDhtDataList())
{
SignedMessagePayload payload = ChannelSigUtil.validateSignedMessage(sm);

if (!element_id.equals(payload.getDhtData().getElementId()))
{
throw new ValidationException("Not requested element_id");
}
AddressSpecHash signed_hash = AddressUtil.getHashForSpec(payload.getClaim());

if (!signed_hash.equals(payload.getDhtData().getPeerInfo().getAddressSpecHash()))
{
throw new ValidationException("Signer of DHT data does not match peer info");
}

}
ChannelValidation.validateDHTDataSet(ds, element_id);
}
catch(ValidationException e)
{
Expand Down
9 changes: 3 additions & 6 deletions src/DHTServer.java
Expand Up @@ -149,17 +149,14 @@ public void storeDHTDataAsyncTrusted(StoreDHTRequest req)
public DHTDataSet storeDHTData(StoreDHTRequest req)
throws ValidationException
{
SignedMessagePayload payload = ChannelSigUtil.validateSignedMessage(req.getSignedDhtData());
SignedMessagePayload payload = ChannelValidation.validateDHTData(req.getSignedDhtData());

if (payload.getTimestamp() + ChannelGlobals.MAX_DHT_DATA_AGE < System.currentTimeMillis())
{
throw new ValidationException("Request too old");
}
if (payload.getTimestamp() > System.currentTimeMillis() + ChannelGlobals.ALLOWED_CLOCK_SKEW)
{
throw new ValidationException("Request in future");
}
DHTData data = payload.getDhtData();

DHTData data = payload.getDhtData();
ByteString target = data.getElementId();
PeerLink next_peer = findClosestPeer(target);

Expand Down
1 change: 0 additions & 1 deletion test/ChannelValidationTest.java
Expand Up @@ -131,7 +131,6 @@ public void testChannelValidateGood()
settings.addBlockSignerSpecHashes( getAddr(block_db).getBytes() );
settings.addBlockSignerSpecHashes( getAddr(block_db2).getBytes() );
settings.addAdminSignerSpecHashes( getAddr(admin_db).getBytes() );
settings.setActive(true);

ChannelBlockHeader.Builder header = ChannelBlockHeader.newBuilder();
header.setBlockHeight(sum.getHeader().getBlockHeight() + 1L);
Expand Down
2 changes: 1 addition & 1 deletion test/DHTTest.java
Expand Up @@ -57,7 +57,7 @@ public void testDHTReadWrite()


Random rnd = new Random();
byte[] id_bytes = new byte[16];
byte[] id_bytes = new byte[32];

int b_match = 0;
for(int i=0; i<100; i++)
Expand Down

0 comments on commit 73a2f9c

Please sign in to comment.