Skip to content

Commit

Permalink
Sending files
Browse files Browse the repository at this point in the history
  • Loading branch information
fireduck64 committed Sep 7, 2019
1 parent e19a161 commit aa8186c
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 17 deletions.
3 changes: 2 additions & 1 deletion src/ChannelBlockIngestor.java
Expand Up @@ -242,7 +242,8 @@ public void ingestChunk(ContentChunk chunk)

if (!ChunkMapUtils.doIWant(ctx, content_id))
{
throw new ValidationException(String.format("I don't want chunk: %s", content_id));
logger.info(String.format("I don't want chunk: %s", content_id));
return;
}

ContentInfo ci = ChannelSigUtil.quickPayload(db.getContentMap().get(content_id.getBytes())).getContentInfo();
Expand Down
68 changes: 62 additions & 6 deletions src/ChannelChunkGetter.java
Expand Up @@ -4,8 +4,12 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.LinkedList;
import java.util.Set;
import snowblossom.channels.proto.*;
import snowblossom.lib.ChainHash;
import java.util.Collections;
import java.util.BitSet;

public class ChannelChunkGetter extends PeriodicThread
{
Expand All @@ -15,6 +19,8 @@ public class ChannelChunkGetter extends PeriodicThread

private long earliest_time = 0L;

private static final int SENDS_PER_PASS = 4;

public ChannelChunkGetter(ChannelNode node)
{
super(500L);
Expand Down Expand Up @@ -63,13 +69,8 @@ public void runPass()
}
for(ChannelID cid : send_set)
{
startPulls(cid);

ChannelContext ctx = node.getChannelSubscriber().getContext(cid);
if (ctx != null)
{
//TODO -- do things
List<ChannelLink> links = ctx.getLinks();
}
synchronized(get_time_map)
{
get_time_map.put(cid, now + ChannelGlobals.CHANNEL_TIP_SEND_MS);
Expand All @@ -78,6 +79,61 @@ public void runPass()
}
}

private void startPulls(ChannelID cid)
{
ChannelContext ctx = node.getChannelSubscriber().getContext(cid);
if (ctx != null)
{
List<ChainHash> want_list = ChunkMapUtils.getWantList(ctx);
List<ChannelLink> links = ctx.getLinks();

if (links.size() == 0) return;

int sent = 0;

// TODO - do something smarter about link selection

for(ChainHash content_id : want_list)
{
Collections.shuffle(links);
ContentInfo ci = ctx.db.getContentInfo(content_id);
if (ci == null) continue;

BitSet bs = ChunkMapUtils.getSavedChunksSet(ctx, content_id);
int total_chunks = MiscUtils.getNumberOfChunks(ci);
List<Integer> chunk_want_list = new LinkedList<>();
for(int i=0; i<total_chunks; i++)
{
if (!bs.get(i))
{
chunk_want_list.add(i);
}
}
Collections.shuffle(chunk_want_list);

for(int w : chunk_want_list)
{
Collections.shuffle(links);
ChannelLink link = links.get(0);

link.writeMessage(
ChannelPeerMessage.newBuilder()
.setChannelId( cid.getBytes())
.setReqChunk( RequestChunk.newBuilder().setMessageId(content_id.getBytes()).setChunk(w).build() )
.build());

if (sent >= SENDS_PER_PASS) return;
}




if (sent >= SENDS_PER_PASS) return;
}

}
}

public void wakeFor(ChannelID cid)
{
synchronized(get_time_map)
Expand Down
3 changes: 0 additions & 3 deletions src/ChannelLink.java
Expand Up @@ -309,11 +309,8 @@ else if (msg.hasChunk())
{
BitSet bs = BitSet.valueOf(chunk.getChunkHaveBitmap().asReadOnlyByteBuffer());
peer_chunks.put( new ChainHash(chunk.getMessageId()), bs);


}


}
else
{
Expand Down
6 changes: 1 addition & 5 deletions src/ChannelValidation.java
Expand Up @@ -159,11 +159,7 @@ public static void validateContent(ContentInfo ci, MessageDigest md)

if (ci.getContentLength() > ChannelGlobals.CONTENT_DATA_BLOCK_SIZE)
{
int expected_len = (int) (ci.getContentLength() / ChannelGlobals.CONTENT_DATA_BLOCK_SIZE);
if (ci.getContentLength() % ChannelGlobals.CONTENT_DATA_BLOCK_SIZE != 0)
{
expected_len++;
}
int expected_len = MiscUtils.getNumberOfChunks(ci);
if (ci.getChunkHashCount() != expected_len)
{
throw new ValidationException("content is longer than data_block_size so must have chunk_hash defined");
Expand Down
16 changes: 16 additions & 0 deletions src/MiscUtils.java
@@ -0,0 +1,16 @@
package snowblossom.channels;

import snowblossom.channels.proto.ContentInfo;

public class MiscUtils
{
public static int getNumberOfChunks(ContentInfo ci)
{
int x = (int)(ci.getContentLength() / ChannelGlobals.CONTENT_DATA_BLOCK_SIZE);

if (ci.getContentLength() % ChannelGlobals.CONTENT_DATA_BLOCK_SIZE != 0) x++;
return x;

}

}
10 changes: 10 additions & 0 deletions src/SingleChannelDB.java
Expand Up @@ -13,6 +13,7 @@
import snowblossom.channels.proto.ChannelBlockSummary;
import snowblossom.channels.proto.LocalPeerInfo;
import snowblossom.channels.proto.SignedMessage;
import snowblossom.channels.proto.ContentInfo;
import snowblossom.lib.ChainHash;
import snowblossom.lib.db.DBMap;
import snowblossom.lib.db.DBProvider;
Expand Down Expand Up @@ -132,4 +133,13 @@ public void run()
}
}
}

public ContentInfo getContentInfo(ChainHash hash)
{
SignedMessage sm = getContentMap().get(hash.getBytes());
if (sm == null) return null;

return ChannelSigUtil.quickPayload(sm).getContentInfo();

}
}
7 changes: 5 additions & 2 deletions test/ChannelPeerTest.java
Expand Up @@ -135,6 +135,7 @@ public void testChannelPeerAndSync()

}

Assert.assertEquals(0, ChunkMapUtils.getWantList(ctx_a).size());
Assert.assertEquals(20, ctx_a.block_ingestor.getHead().getHeader().getBlockHeight());

for(int i=0; i<100; i++)
Expand All @@ -144,14 +145,16 @@ public void testChannelPeerAndSync()
}
Assert.assertEquals(1, ctx_a.getLinks().size());
Assert.assertEquals(1, ctx_b.getLinks().size());
for(int i=0; i<25; i++)
for(int i=0; i<200; i++)
{
Thread.sleep(100);
if (ctx_b.block_ingestor.getHead() != null)
if (ctx_b.block_ingestor.getHead().getHeader().getBlockHeight() == 20) break;
if (ctx_b.block_ingestor.getHead().getHeader().getBlockHeight() == 20)
if (ChunkMapUtils.getWantList(ctx_b).size() == 0) break;
}

Assert.assertEquals(20, ctx_b.block_ingestor.getHead().getHeader().getBlockHeight());
Assert.assertEquals(0, ChunkMapUtils.getWantList(ctx_b).size());

}

Expand Down
7 changes: 7 additions & 0 deletions test/ChunkMapUtilsTest.java
Expand Up @@ -47,6 +47,13 @@ public void testChunkMap()

ChainHash content_id = new ChainHash(b);

Assert.assertFalse( ChunkMapUtils.doIWant(ctx, content_id));
ChunkMapUtils.markWant(ctx, content_id);
Assert.assertTrue( ChunkMapUtils.doIWant(ctx, content_id));
ChunkMapUtils.markDone(ctx, content_id);
Assert.assertFalse( ChunkMapUtils.doIWant(ctx, content_id));


TreeSet<Integer> added=new TreeSet<>();
for(int i=0; i<100; i++)
{
Expand Down

0 comments on commit aa8186c

Please sign in to comment.