Skip to content

Commit

Permalink
trickle in mempool updates
Browse files Browse the repository at this point in the history
  • Loading branch information
fireduck64 committed Sep 17, 2019
1 parent c3f1919 commit 0281d09
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/ChannelGlobals.java
Expand Up @@ -2,7 +2,7 @@

public class ChannelGlobals
{
public static final String VERSION = "wakka";
public static final String VERSION = "bakka";

public static final String NODE_ADDRESS_STRING="node";
public static final String CHANNEL_ADDRESS_STRING="chan";
Expand Down
3 changes: 3 additions & 0 deletions src/ChannelNode.java
Expand Up @@ -48,6 +48,7 @@ public class ChannelNode
private DHTStratUtil dht_strat_util;
private ChannelTipSender channel_tip_sender;
private ChannelChunkGetter channel_chunk_getter;
private ChannelOutsiderSender channel_outsider_sender;

private HashMap<ChannelID, SingleChannelDB> db_map;

Expand Down Expand Up @@ -107,6 +108,7 @@ public ChannelNode(Config config)
dht_strat_util = new DHTStratUtil();
channel_tip_sender = new ChannelTipSender(this);
channel_chunk_getter = new ChannelChunkGetter(this);
channel_outsider_sender = new ChannelOutsiderSender(this);

startServer();

Expand All @@ -115,6 +117,7 @@ public ChannelNode(Config config)
channel_peer_maintainer.start();
channel_tip_sender.start();
channel_chunk_getter.start();
channel_outsider_sender.start();

channel_subscriber.loadFromDB();

Expand Down
130 changes: 130 additions & 0 deletions src/ChannelOutsiderSender.java
@@ -0,0 +1,130 @@
package snowblossom.channels;

import com.google.protobuf.ByteString;
import snowblossom.lib.ValidationException;
import duckutil.PeriodicThread;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import snowblossom.channels.proto.*;

public class ChannelOutsiderSender extends PeriodicThread
{

private HashMap<ChannelID, Long> get_time_map = new HashMap<>(16,0.5f);
private ChannelNode node;

private long earliest_time = 0L;



public ChannelOutsiderSender(ChannelNode node)
{
super(500L);
this.setName("ChannelOutsiderSender");
this.setDaemon(true);
this.node = node;

}

@Override
public void runPass()
{
long now = System.currentTimeMillis();
synchronized(get_time_map)
{
if (now < earliest_time) return;
}

Set<ChannelID> chan_set = node.getChannelSubscriber().getChannelSet();

long next_earliest = now + ChannelGlobals.CHANNEL_TIP_SEND_MS;
Set<ChannelID> send_set = new HashSet<>();

synchronized(get_time_map)
{
for(ChannelID cid : chan_set)
{
long tm = 0L;
if (get_time_map.containsKey(cid))
{
tm = get_time_map.get(cid);
}
if (tm <= now)
{
send_set.add(cid);
}
else
{
next_earliest = Math.min(next_earliest, tm);
}

}
earliest_time = next_earliest;

}
for(ChannelID cid : send_set)
{
startSend(cid);

synchronized(get_time_map)
{
get_time_map.put(cid, now + ChannelGlobals.CHANNEL_TIP_SEND_MS);
}

}
}

private void startSend(ChannelID cid)
{
ChannelContext ctx = node.getChannelSubscriber().getContext(cid);
if (ctx != null)
{
//TODO - use random rather than just getting everything
ArrayList<SignedMessage> full_list = new ArrayList<>();

full_list.addAll(ctx.db.getOutsiderMap().getByPrefix(ByteString.EMPTY, 100000).values());
if (full_list.size() == 0) return;

Random rnd = new Random();

SignedMessage sm = full_list.get(rnd.nextInt(full_list.size()));

try
{
ChannelValidation.validateOutsiderContent(sm, ctx.block_ingestor.getHead());

ChannelPeerMessage m_out = ChannelPeerMessage.newBuilder()
.setChannelId(cid.getBytes())
.setContent(sm)
.build();

for(ChannelLink link : ctx.getLinks())
{
link.writeMessage(m_out);
}
}
catch(ValidationException e)
{
ctx.db.getOutsiderMap().remove( sm.getMessageId() );
}


}
}

public void wakeFor(ChannelID cid)
{
synchronized(get_time_map)
{
get_time_map.put(cid, 0L);
earliest_time = 0L;
}
this.wake();
}



}

0 comments on commit 0281d09

Please sign in to comment.