Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rollback #1759 #1798

Merged
merged 6 commits into from Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGES.md
Expand Up @@ -8,8 +8,14 @@ To be released.

- Fixed *@planetarium/cli* npm package's bug where installation had not
worked since 0.27.0. [[#1790], [#1799]]
- (Libplanet.Net) Return the fixed races between `Swarm<T>`'s dual systems
for block synchronization (polling and event-driven in parallel)
on updating chain by serializing the post-download process
which is common for both synchronization mechanisms. [[#1731], [#1798]]


[#1790]: https://github.com/planetarium/libplanet/issues/1790
[#1798]: https://github.com/planetarium/libplanet/pull/1798
[#1799]: https://github.com/planetarium/libplanet/pull/1799


Expand Down
17 changes: 11 additions & 6 deletions Libplanet.Net.Tests/SwarmTest.Broadcast.cs
Expand Up @@ -801,13 +801,15 @@ public async Task IgnoreExistingBlocks()
[Fact(Timeout = Timeout)]
public async Task PullBlocks()
{
var swarmOptions = new SwarmOptions();
swarmOptions.Type = SwarmOptions.TransportType.NetMQTransport;
var keyA = new PrivateKey();
var keyB = new PrivateKey();
var keyC = new PrivateKey();

var swarmA = CreateSwarm(keyA);
var swarmB = CreateSwarm(keyB);
var swarmC = CreateSwarm(keyC);
var swarmA = CreateSwarm(keyA, options: swarmOptions);
var swarmB = CreateSwarm(keyB, options: swarmOptions);
var swarmC = CreateSwarm(keyC, options: swarmOptions);

BlockChain<DumbAction> chainA = swarmA.BlockChain;
BlockChain<DumbAction> chainB = swarmB.BlockChain;
Expand Down Expand Up @@ -840,6 +842,7 @@ public async Task PullBlocks()
await BootstrapAsync(swarmC, swarmA.AsPeer);

await swarmC.PullBlocksAsync(TimeSpan.FromSeconds(5), int.MaxValue, default);
await swarmC.BlockAppended.WaitAsync();
Assert.Equal(chainC.Tip, chainATip);
}
finally
Expand All @@ -857,6 +860,8 @@ public async Task PullBlocks()
[Fact(Timeout = Timeout)]
public async Task PullBlocksByDifficulty()
{
var swarmOptions = new SwarmOptions();
swarmOptions.Type = SwarmOptions.TransportType.NetMQTransport;
var policy = new BlockPolicy<DumbAction>(new MinerReward(1));
var chain1 = TestUtils.MakeBlockChain(
policy,
Expand All @@ -870,8 +875,8 @@ public async Task PullBlocksByDifficulty()
var key1 = new PrivateKey();
var key2 = new PrivateKey();

var miner1 = CreateSwarm(chain1, key1);
var miner2 = CreateSwarm(chain2, key2);
var miner1 = CreateSwarm(chain1, key1, options: swarmOptions);
var miner2 = CreateSwarm(chain2, key2, options: swarmOptions);

await chain1.MineBlock(key1);
await chain1.MineBlock(key2);
Expand All @@ -897,7 +902,7 @@ public async Task PullBlocksByDifficulty()
await BootstrapAsync(miner2, miner1.AsPeer);

await miner1.PullBlocksAsync(TimeSpan.FromSeconds(5), int.MaxValue, default);

await miner1.BlockAppended.WaitAsync();
Assert.Equal(miner2.BlockChain.Count, miner1.BlockChain.Count);
Assert.Equal(miner2.BlockChain.Tip, miner1.BlockChain.Tip);
}
Expand Down
7 changes: 4 additions & 3 deletions Libplanet.Net/Swarm.BlockCandidate.cs
Expand Up @@ -174,7 +174,8 @@ public partial class Swarm<T>
);
}

if (!(workspace.Tip is null))
if (!(workspace.Tip is null) &&
!workspace.Tip.Hash.Equals(blocks.First().PreviousHash))
{
blocks = blocks.Skip(1).ToList();
}
Expand Down Expand Up @@ -238,7 +239,7 @@ private Block<T> FindBranchpoint(Block<T> oldTip, Block<T> newTip, List<Block<T>
{
newTip = newBlocks.Single(x => x.Hash.Equals(bPrev));
}
catch (ArgumentNullException)
catch (InvalidOperationException)
{
newTip = BlockChain[bPrev];
}
Expand Down Expand Up @@ -267,7 +268,7 @@ private Block<T> FindBranchpoint(Block<T> oldTip, Block<T> newTip, List<Block<T>
{
newTip = newBlocks.Single(x => x.Hash.Equals(bPrev));
}
catch (ArgumentNullException)
catch (InvalidOperationException)
{
newTip = BlockChain[bPrev];
}
Expand Down
176 changes: 134 additions & 42 deletions Libplanet.Net/Swarm.BlockSync.cs
Expand Up @@ -74,28 +74,123 @@ public partial class Swarm<T>
return;
}

var totalBlocksToDownload = 0L;
Block<T> tempTip = BlockChain.Tip;
var blocks = new List<Block<T>>();

try
{
_logger.Debug(
$"The chain before {nameof(PullBlocksAsync)}() : " +
"{Id} #{Index} {Hash}",
BlockChain.Id,
BlockChain.Tip.Index,
BlockChain.Tip.Hash);
System.Action renderSwap = await CompleteBlocksAsync(
peersWithBlockExcerpt,
var blockCompletion = new BlockCompletion<BoundPeer, T>(
completionPredicate: BlockChain.Store.ContainsBlock,
window: InitialBlockDownloadWindow
);
var demandBlockHashes = GetDemandBlockHashes(
BlockChain,
peersWithBlockExcerpt,
null,
preload: false,
render: true,
cancellationToken: cancellationToken);
BroadcastBlock(BlockChain.Tip);
renderSwap();

// FIXME: Should add new event handler that indicates rendering has finished,
// and move these events before BroadcastBlock()
BlockReceived.Set();
BlockAppended.Set();
cancellationToken
).WithCancellation(cancellationToken);
await foreach ((long index, BlockHash hash) in demandBlockHashes)
{
cancellationToken.ThrowIfCancellationRequested();

if (index == 0 && !hash.Equals(BlockChain.Genesis.Hash))
{
// FIXME: This behavior can unexpectedly terminate the swarm
// (and the game app) if it encounters a peer having a different
// blockchain, and therefore can be exploited to remotely shut
// down other nodes as well.
// Since the intention of this behavior is to prevent mistakes
// to try to connect incorrect seeds (by a user),
// this behavior should be limited for only seed peers.
// FIXME: ChainStatus message became to contain hash value of
// the genesis block, so this exception will not be happened.
var msg =
$"Since the genesis block is fixed to {BlockChain.Genesis} " +
"protocol-wise, the blockchain which does not share " +
"any mutual block is not acceptable.";
var e = new InvalidGenesisBlockException(
hash,
BlockChain.Genesis.Hash,
msg);
throw new AggregateException(msg, e);
}

_logger.Verbose(
"Enqueue #{BlockIndex} {BlockHash} to demands queue...",
index,
hash
);
if (blockCompletion.Demand(hash))
{
totalBlocksToDownload++;
}
}

if (totalBlocksToDownload == 0)
{
_logger.Debug("No any blocks to fetch.");
return;
}

IAsyncEnumerable<Tuple<Block<T>, BoundPeer>> completedBlocks =
blockCompletion.Complete(
peers: peersWithBlockExcerpt.Select(pair => pair.Item1).ToList(),
blockFetcher: GetBlocksAsync,
singleSessionTimeout: Options.MaxTimeout,
cancellationToken: cancellationToken
);

await foreach (
(Block<T> block, BoundPeer sourcePeer)
in completedBlocks.WithCancellation(cancellationToken))
{
_logger.Verbose(
"Got #{BlockIndex} {BlockHash} from {Peer}.",
block.Index,
block.Hash,
sourcePeer
);
cancellationToken.ThrowIfCancellationRequested();

if (block.Index == 0 && !block.Hash.Equals(BlockChain.Genesis.Hash))
{
// FIXME: This behavior can unexpectedly terminate the swarm
// (and the game app) if it encounters a peer having a different
// blockchain, and therefore can be exploited to remotely shut
// down other nodes as well.
// Since the intention of this behavior is to prevent mistakes
// to try to connect incorrect seeds (by a user),
// this behavior should be limited for only seed peers.
var msg =
$"Since the genesis block is fixed to {BlockChain.Genesis} " +
"protocol-wise, the blockchain which does not share " +
"any mutual block is not acceptable.";

// Although it's actually not aggregated, but to be consistent with
// above code throwing InvalidGenesisBlockException, makes this
// to wrap an exception with AggregateException... Not sure if
// it show be wrapped from the very beginning.
throw new AggregateException(
msg,
new InvalidGenesisBlockException(
block.Hash,
BlockChain.Genesis.Hash,
msg
)
);
}

block.ValidateTimestamp();
blocks.Add(block);
if (tempTip is null ||
BlockChain.Policy.CanonicalChainComparer.Compare(
BlockChain.PerceiveBlock(block),
BlockChain.PerceiveBlock(tempTip)) > 0)
{
tempTip = block;
}
}
}
catch (Exception e)
{
Expand All @@ -106,6 +201,12 @@ public partial class Swarm<T>
}
finally
{
if (blocks.Count != 0)
{
BlockCandidateTable.Add(BlockChain.Tip.Header, blocks);
BlockReceived.Set();
}

ProcessFillBlocksFinished.Set();
_logger.Debug($"{nameof(PullBlocksAsync)}() has finished successfully.");
}
Expand Down Expand Up @@ -185,10 +286,8 @@ CancellationToken cancellationToken
IList<(BoundPeer, IBlockExcerpt)> peersWithExcerpt,
BlockChain<T> workspace,
IProgress<PreloadState> progress,
bool preload,
bool render,
CancellationToken cancellationToken
)
CancellationToken cancellationToken)
{
// As preloading takes long, the blockchain data can corrupt if a program suddenly
// terminates during preloading is going on. In order to make preloading done
Expand Down Expand Up @@ -422,17 +521,14 @@ in completedBlocks.WithCancellation(blockDownloadCts.Token))
if (bottomBlock.PreviousHash is { } bp)
{
branchpoint = workspace[bp];
if (preload || workspace[-1] != branchpoint)
{
_logger.Debug(
"Branchpoint block is #{Index} {Hash}.",
branchpoint.Index,
branchpoint.Hash);
workspace = workspace.Fork(bp, inheritRenderers: true);
chainIds.Add(workspace.Id);
renderBlocks = false;
renderActions = false;
}
_logger.Debug(
"Branchpoint block is #{Index} {Hash}.",
branchpoint.Index,
branchpoint.Hash);
workspace = workspace.Fork(bp, inheritRenderers: true);
chainIds.Add(workspace.Id);
renderBlocks = false;
renderActions = false;

try
{
Expand All @@ -447,7 +543,7 @@ in completedBlocks.WithCancellation(blockDownloadCts.Token))
deltaBlock.Hash);
workspace.Append(
deltaBlock,
evaluateActions: !preload,
evaluateActions: false,
renderBlocks: renderBlocks,
renderActions: renderActions
);
Expand Down Expand Up @@ -482,15 +578,11 @@ in completedBlocks.WithCancellation(blockDownloadCts.Token))
}
}

if (preload)
{
ExecuteActions(
workspace,
branchpoint,
progress,
cancellationToken);
}

ExecuteActions(
workspace,
branchpoint,
progress,
cancellationToken);
complete = true;
}
finally
Expand Down
1 change: 0 additions & 1 deletion Libplanet.Net/Swarm.cs
Expand Up @@ -636,7 +636,6 @@ CancellationToken cancellationToken
peersWithExcerpts,
workspace,
progress,
preload: true,
render: render,
cancellationToken: cancellationToken);
renderSwap();
Expand Down