Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send multiple blocks in one message #276

Merged
merged 2 commits into from Jun 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 8 additions & 2 deletions CHANGES.md
Expand Up @@ -12,12 +12,18 @@ To be released.

### Behavioral changes

- `BlockChain<T>.GetNonce()` became to count staged transactions too during nonce
computation. [[#270]]
- `BlockChain<T>.GetNonce()` became to count staged transactions too during
nonce computation. [[#270]]
- A message `Swarm` became to have multiple blocks within it, which means
round trips on the network are now much reduced. [[#273], [#276]]
- `Message.Block` has been replaced by `Message.Blocks` and the magic number
has been changed to `0x0a`. [[#276]]

### Bug fixes

[#270]: https://github.com/planetarium/libplanet/pull/270
[#273]: https://github.com/planetarium/libplanet/issues/273
[#276]: https://github.com/planetarium/libplanet/pull/276


Version 0.3.0
Expand Down
64 changes: 64 additions & 0 deletions Libplanet.Tests/Net/SwarmTest.cs
Expand Up @@ -13,10 +13,12 @@
using Libplanet.Blocks;
using Libplanet.Crypto;
using Libplanet.Net;
using Libplanet.Net.Messages;
using Libplanet.Tests.Common.Action;
using Libplanet.Tests.Store;
using Libplanet.Tx;
using NetMQ;
using NetMQ.Sockets;
using Serilog;
using Xunit;
using Xunit.Abstractions;
Expand Down Expand Up @@ -441,6 +443,68 @@ public async Task CanGetBlock()
}
}

[Fact(Timeout = Timeout)]
public async Task GetMultipleBlocksAtOnce()
{
var privateKey = new PrivateKey();
Swarm swarmA = _swarms[0];
Swarm swarmB = new Swarm(
privateKey,
1,
host: IPAddress.Loopback.ToString());

BlockChain<DumbAction> chainA = _blockchains[0];
BlockChain<DumbAction> chainB = _blockchains[1];

Block<DumbAction> genesis = chainA.MineBlock(_fx1.Address1);
chainB.Append(genesis); // chainA and chainB shares genesis block.
chainA.MineBlock(_fx1.Address1);
chainA.MineBlock(_fx1.Address1);

try
{
await StartAsync(swarmA, chainA);
await StartAsync(swarmB, chainA);

var peer = swarmA.AsPeer;

await swarmB.AddPeersAsync(new[] { peer });

IEnumerable<HashDigest<SHA256>> hashes =
await swarmB.GetBlockHashesAsync(
peer,
new BlockLocator(new[] { genesis.Hash }),
null);

var netMQAddress = $"tcp://{peer.EndPoint.Host}:{peer.EndPoint.Port}";
using (var socket = new DealerSocket(netMQAddress))
{
var request = new GetBlocks(hashes, 2);
await socket.SendMultipartMessageAsync(
request.ToNetMQMessage(privateKey));

NetMQMessage response = await socket.ReceiveMultipartMessageAsync();
Message parsedMessage = Message.Parse(response, true);
Libplanet.Net.Messages.Blocks blockMessage =
(Libplanet.Net.Messages.Blocks)parsedMessage;

Assert.Equal(2, blockMessage.Payloads.Count);

response = await socket.ReceiveMultipartMessageAsync();
parsedMessage = Message.Parse(response, true);
blockMessage = (Libplanet.Net.Messages.Blocks)parsedMessage;

Assert.Single(blockMessage.Payloads);
}
}
finally
{
await Task.WhenAll(
swarmA.StopAsync(),
swarmB.StopAsync());
}
}

[Fact(Timeout = Timeout)]
public async Task GetTx()
{
Expand Down
30 changes: 0 additions & 30 deletions Libplanet/Net/Messages/Block.cs

This file was deleted.

48 changes: 48 additions & 0 deletions Libplanet/Net/Messages/Blocks.cs
@@ -0,0 +1,48 @@
using System;
using System.Collections.Generic;
using System.Linq;
using NetMQ;

namespace Libplanet.Net.Messages
{
internal class Blocks : Message
{
public Blocks(IEnumerable<byte[]> payloads)
earlbread marked this conversation as resolved.
Show resolved Hide resolved
{
if (payloads.Count() > int.MaxValue)
{
throw new ArgumentOutOfRangeException(
nameof(payloads),
$"The number of payloads can't exceed {int.MaxValue}.");
}

Payloads = payloads.ToList();
}

public Blocks(NetMQFrame[] body)
{
int payloadCount = body[0].ConvertToInt32();
Payloads = body.Skip(1).Take(payloadCount)
.Select(f => f.ToByteArray())
.ToList();
}

public List<byte[]> Payloads { get; }

protected override MessageType Type => MessageType.Blocks;

protected override IEnumerable<NetMQFrame> DataFrames
{
get
{
yield return new NetMQFrame(
NetworkOrderBitsConverter.GetBytes(Payloads.Count));

foreach (var payload in Payloads)
{
yield return new NetMQFrame(payload);
}
}
}
}
}
19 changes: 18 additions & 1 deletion Libplanet/Net/Messages/GetBlocks.cs
@@ -1,3 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
Expand All @@ -7,9 +8,19 @@ namespace Libplanet.Net.Messages
{
internal class GetBlocks : Message
{
public GetBlocks(IEnumerable<HashDigest<SHA256>> hashes)
public GetBlocks(
IEnumerable<HashDigest<SHA256>> hashes,
int chunkSize = 500)
{
if (chunkSize <= 0)
{
throw new ArgumentOutOfRangeException(
nameof(chunkSize),
"Chunk size must be greater than 0.");
}

BlockHashes = hashes;
ChunkSize = chunkSize;
}

public GetBlocks(NetMQFrame[] frames)
Expand All @@ -19,10 +30,13 @@ public GetBlocks(NetMQFrame[] frames)
.Skip(1).Take(hashCount)
.Select(f => f.ConvertToHashDigest<SHA256>())
.ToList();
ChunkSize = frames[1 + hashCount].ConvertToInt32();
}

public IEnumerable<HashDigest<SHA256>> BlockHashes { get; }

public int ChunkSize { get; }

protected override MessageType Type => MessageType.GetBlocks;

protected override IEnumerable<NetMQFrame> DataFrames
Expand All @@ -36,6 +50,9 @@ protected override IEnumerable<NetMQFrame> DataFrames
{
yield return new NetMQFrame(hash.ToByteArray());
}

yield return new NetMQFrame(
NetworkOrderBitsConverter.GetBytes(ChunkSize));
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions Libplanet/Net/Messages/Message.cs
Expand Up @@ -51,9 +51,9 @@ internal enum MessageType : byte
GetTxs = 0x08,

/// <summary>
/// Message containing serialized block.
/// Message containing serialized blocks.
/// </summary>
Block = 0x09,
Blocks = 0x0a,

/// <summary>
/// Message containing serialized transaction.
Expand Down Expand Up @@ -100,7 +100,7 @@ public static Message Parse(NetMQMessage raw, bool reply)
{ MessageType.TxIds, typeof(TxIds) },
{ MessageType.GetBlocks, typeof(GetBlocks) },
{ MessageType.GetTxs, typeof(GetTxs) },
{ MessageType.Block, typeof(Block) },
{ MessageType.Blocks, typeof(Blocks) },
{ MessageType.Tx, typeof(Tx) },
};

Expand Down
33 changes: 27 additions & 6 deletions Libplanet/Net/Swarm.cs
Expand Up @@ -682,12 +682,14 @@ public void BroadcastTxs<T>(IEnumerable<Transaction<T>> txs)
await socket.ReceiveMultipartMessageAsync(
cancellationToken: token);
Message parsedMessage = Message.Parse(response, true);
if (parsedMessage is Block blockMessage)
if (parsedMessage is Messages.Blocks blockMessage)
{
Block<T> block = Block<T>.FromBencodex(
blockMessage.Payload);
await yield.ReturnAsync(block);
hashCount--;
foreach (byte[] payload in blockMessage.Payloads)
{
Block<T> block = Block<T>.FromBencodex(payload);
await yield.ReturnAsync(block);
hashCount--;
}
}
else
{
Expand Down Expand Up @@ -1337,18 +1339,37 @@ private void TransferTxs<T>(BlockChain<T> blockChain, GetTxs getTxs)
where T : IAction, new()
{
_logger.Debug("Trying to transfer blocks...");

var blocks = new List<byte[]>();

foreach (HashDigest<SHA256> hash in getData.BlockHashes)
{
if (blockChain.Blocks.TryGetValue(hash, out Block<T> block))
{
Message response = new Block(block.ToBencodex(true, true))
byte[] payload = block.ToBencodex(true, true);
blocks.Add(payload);
}

if (blocks.Count == getData.ChunkSize)
{
var response = new Messages.Blocks(blocks)
{
Identity = getData.Identity,
};
_replyQueue.Enqueue(response);
blocks.Clear();
}
}

if (blocks.Any())
{
var response = new Messages.Blocks(blocks)
{
Identity = getData.Identity,
};
_replyQueue.Enqueue(response);
}

_logger.Debug("Transfer complete.");
}

Expand Down