Skip to content

Commit

Permalink
Send multiple blocks in one message
Browse files Browse the repository at this point in the history
  • Loading branch information
earlbread committed Jun 5, 2019
1 parent c9d9a82 commit 00cac59
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 41 deletions.
9 changes: 7 additions & 2 deletions CHANGES.md
Expand Up @@ -12,12 +12,17 @@ To be released.

### Behavioral changes

- `BlockChain<T>.GetNonce()` became to count staged transactions too during nonce
computation. [[#270]]
- `BlockChain<T>.GetNonce()` became to count staged transactions too during
nonce computation. [[#270]]
- `Swarm` became to send and receive multiple blocks with one message instead
of one block per message. [[#273], [#276]]


### Bug fixes

[#270]: https://github.com/planetarium/libplanet/pull/270
[#273]: https://github.com/planetarium/libplanet/issues/273
[#276]: https://github.com/planetarium/libplanet/pull/276


Version 0.3.0
Expand Down
64 changes: 64 additions & 0 deletions Libplanet.Tests/Net/SwarmTest.cs
Expand Up @@ -13,10 +13,12 @@
using Libplanet.Blocks;
using Libplanet.Crypto;
using Libplanet.Net;
using Libplanet.Net.Messages;
using Libplanet.Tests.Common.Action;
using Libplanet.Tests.Store;
using Libplanet.Tx;
using NetMQ;
using NetMQ.Sockets;
using Serilog;
using Xunit;
using Xunit.Abstractions;
Expand Down Expand Up @@ -441,6 +443,68 @@ public async Task CanGetBlock()
}
}

[Fact(Timeout = Timeout)]
public async Task GetMultipleBlocksAtOnce()
{
var privateKey = new PrivateKey();
Swarm swarmA = _swarms[0];
Swarm swarmB = new Swarm(
privateKey,
1,
host: IPAddress.Loopback.ToString());

BlockChain<DumbAction> chainA = _blockchains[0];
BlockChain<DumbAction> chainB = _blockchains[1];

Block<DumbAction> genesis = chainA.MineBlock(_fx1.Address1);
chainB.Append(genesis); // chainA and chainB shares genesis block.
chainA.MineBlock(_fx1.Address1);
chainA.MineBlock(_fx1.Address1);

try
{
await StartAsync(swarmA, chainA);
await StartAsync(swarmB, chainA);

var peer = swarmA.AsPeer;

await swarmB.AddPeersAsync(new[] { peer });

IEnumerable<HashDigest<SHA256>> hashes =
await swarmB.GetBlockHashesAsync(
peer,
new BlockLocator(new[] { genesis.Hash }),
null);

var netMQAddress = $"tcp://{peer.EndPoint.Host}:{peer.EndPoint.Port}";
using (var socket = new DealerSocket(netMQAddress))
{
var request = new GetBlocks(hashes, 2);
await socket.SendMultipartMessageAsync(
request.ToNetMQMessage(privateKey));

NetMQMessage response = await socket.ReceiveMultipartMessageAsync();
Message parsedMessage = Message.Parse(response, true);
Libplanet.Net.Messages.Blocks blockMessage =
(Libplanet.Net.Messages.Blocks)parsedMessage;

Assert.Equal(2, blockMessage.Payloads.Count());

response = await socket.ReceiveMultipartMessageAsync();
parsedMessage = Message.Parse(response, true);
blockMessage = (Libplanet.Net.Messages.Blocks)parsedMessage;

Assert.Single(blockMessage.Payloads);
}
}
finally
{
await Task.WhenAll(
swarmA.StopAsync(),
swarmB.StopAsync());
}
}

[Fact(Timeout = Timeout)]
public async Task GetTx()
{
Expand Down
30 changes: 0 additions & 30 deletions Libplanet/Net/Messages/Block.cs

This file was deleted.

40 changes: 40 additions & 0 deletions Libplanet/Net/Messages/Blocks.cs
@@ -0,0 +1,40 @@
using System.Collections.Generic;
using System.Linq;
using NetMQ;

namespace Libplanet.Net.Messages
{
internal class Blocks : Message
{
public Blocks(IEnumerable<byte[]> payloads)
{
Payloads = payloads;
}

public Blocks(NetMQFrame[] body)
{
int blockCount = body[0].ConvertToInt32();
Payloads = body.Skip(1).Take(blockCount)
.Select(f => f.ToByteArray())
.ToList();
}

public IEnumerable<byte[]> Payloads { get; }

protected override MessageType Type => MessageType.Blocks;

protected override IEnumerable<NetMQFrame> DataFrames
{
get
{
yield return new NetMQFrame(
NetworkOrderBitsConverter.GetBytes(Payloads.Count()));

foreach (var payload in Payloads)
{
yield return new NetMQFrame(payload);
}
}
}
}
}
6 changes: 3 additions & 3 deletions Libplanet/Net/Messages/Message.cs
Expand Up @@ -51,9 +51,9 @@ internal enum MessageType : byte
GetTxs = 0x08,

/// <summary>
/// Message containing serialized block.
/// Message containing serialized blocks.
/// </summary>
Block = 0x09,
Blocks = 0x09,

/// <summary>
/// Message containing serialized transaction.
Expand Down Expand Up @@ -100,7 +100,7 @@ public static Message Parse(NetMQMessage raw, bool reply)
{ MessageType.TxIds, typeof(TxIds) },
{ MessageType.GetBlocks, typeof(GetBlocks) },
{ MessageType.GetTxs, typeof(GetTxs) },
{ MessageType.Block, typeof(Block) },
{ MessageType.Blocks, typeof(Blocks) },
{ MessageType.Tx, typeof(Tx) },
};

Expand Down
34 changes: 28 additions & 6 deletions Libplanet/Net/Swarm.cs
Expand Up @@ -682,12 +682,15 @@ public void BroadcastTxs<T>(IEnumerable<Transaction<T>> txs)
await socket.ReceiveMultipartMessageAsync(
cancellationToken: token);
Message parsedMessage = Message.Parse(response, true);
if (parsedMessage is Block blockMessage)
if (parsedMessage is Messages.Blocks blockMessage)
{
Block<T> block = Block<T>.FromBencodex(
blockMessage.Payload);
await yield.ReturnAsync(block);
hashCount--;
var payloads = blockMessage.Payloads.ToArray();
foreach (var payload in payloads)
{
Block<T> block = Block<T>.FromBencodex(payload);
await yield.ReturnAsync(block);
hashCount--;
}
}
else
{
Expand Down Expand Up @@ -1337,18 +1340,37 @@ private void TransferTxs<T>(BlockChain<T> blockChain, GetTxs getTxs)
where T : IAction, new()
{
_logger.Debug("Trying to transfer blocks...");

var blocks = new List<byte[]>();

foreach (HashDigest<SHA256> hash in getData.BlockHashes)
{
if (blockChain.Blocks.TryGetValue(hash, out Block<T> block))
{
Message response = new Block(block.ToBencodex(true, true))
var payload = block.ToBencodex(true, true);
blocks.Add(payload);
}

if (blocks.Count == getData.ChunkSize)
{
Message response = new Messages.Blocks(blocks.ToArray())
{
Identity = getData.Identity,
};
_replyQueue.Enqueue(response);
blocks.Clear();
}
}

if (blocks.Any())
{
Message response = new Messages.Blocks(blocks.ToArray())
{
Identity = getData.Identity,
};
_replyQueue.Enqueue(response);
}

_logger.Debug("Transfer complete.");
}

Expand Down

0 comments on commit 00cac59

Please sign in to comment.