Skip to content

Commit

Permalink
wip: bootstrap
Browse files Browse the repository at this point in the history
  • Loading branch information
OnedgeLee committed Jun 26, 2023
1 parent a051767 commit 8feecd7
Show file tree
Hide file tree
Showing 20 changed files with 996 additions and 69 deletions.
77 changes: 77 additions & 0 deletions Libplanet.Net/Consensus/ConsensusContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ public ConsensusStep Step
}
}

/// <summary>
/// Represents if <see cref="ConsensusContext"/> is bootstrapping or not.
/// </summary>
internal bool Bootstrapping => _bootstrapping;

/// <summary>
/// A dictionary of <see cref="Context"/> for each heights. Each key represents the
/// height of value, and value is the <see cref="Context"/>.
Expand Down Expand Up @@ -304,6 +309,78 @@ public IEnumerable<ConsensusMsg> HandleVoteSetBits(VoteSetBits voteSetBits)
return Array.Empty<ConsensusMsg>();
}

/// <summary>
/// Handles a received <see cref="ConsensusBootstrapMsg"/>
/// and return <see cref="VotesRecall"/> to send as a reply.
/// </summary>
/// <param name="bootstrapMsg">The <see cref="ConsensusBootstrapMsg"/>
/// received from bootstrapping validator.
/// </param>
/// <returns>
/// A nullable <see cref="VotesRecall"/> to reply back.
/// </returns>
public VotesRecall? HandleBootstrap(ConsensusBootstrapMsg bootstrapMsg)
{
long height = bootstrapMsg.Bootstrap.Height;
int round = bootstrapMsg.Bootstrap.Round;
if (height < Height)
{
_logger.Debug(
"Ignore a received Bootstrap as its height " +
"#{Height} is lower than the current context's height #{ContextHeight}",
height,
Height);
}
else
{
lock (_contextLock)
{
if (_contexts.ContainsKey(height))
{
return _contexts[height]
.GetVotesRecall(round);
}
}
}

return null;
}

/// <summary>
/// Handles a received <see cref="ConsensusVotesRecallMsg"/>.
/// </summary>
/// <param name="votesRecallMsg">The <see cref="ConsensusVotesRecallMsg"/>
/// received from running validator.
/// </param>
/// <returns>
/// <see langword="true"/> if <paramref name="votesRecallMsg"/> is dispatched to
/// a <see cref="Context"/>, <see langword="false"/> otherwise.
/// </returns>
public bool HandleVotesRecall(ConsensusVotesRecallMsg votesRecallMsg)
{
long height = votesRecallMsg.Height;
if (height < Height)
{
_logger.Debug(
"Discarding a received message as its height #{MessageHeight} " +
"is lower than the current context's height #{ContextHeight}",
height,
Height);
return false;
}

lock (_contextLock)
{
if (!_contexts.ContainsKey(height))
{
_contexts[height] = CreateContext(height);
}

_contexts[height].ProduceMessage(votesRecallMsg);
return true;
}
}

/// <summary>
/// Returns the summary for <see cref="ConsensusContext"/>.
/// </summary>
Expand Down
34 changes: 33 additions & 1 deletion Libplanet.Net/Consensus/ConsensusReactor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ private void ProcessMessage(MessageContent content)
VoteSetBits voteSetBits = _consensusContext.Contexts[maj23Msg.Height]
.GetVoteSetBits(
maj23Msg.Round,
maj23Msg.BlockHash,
maj23Msg.Maj23.BlockHash,
maj23Msg.Maj23.Flag);
var sender = _gossip.Peers.First(
peer => peer.PublicKey.Equals(maj23Msg.ValidatorPublicKey));
Expand All @@ -210,6 +210,38 @@ private void ProcessMessage(MessageContent content)

break;

case ConsensusBootstrapMsg bootstrapMsg:
try
{
_consensusContext.HandleMessage(bootstrapMsg);
var sender = _gossip.Peers.First(
peer => peer.PublicKey.Equals(bootstrapMsg.ValidatorPublicKey));
var reply = _consensusContext.HandleBootstrap(bootstrapMsg);
if (reply is null)
{
// Reply is not needed. Ignore the message.
break;
}

_gossip.PublishMessage(
new ConsensusVotesRecallMsg(reply),
new[] { sender });
}
catch (InvalidOperationException)
{
_logger.Debug(
"Cannot respond received ConsensusBootstrapMsg message " +
"{Message} since there is no corresponding peer in the table",
bootstrapMsg);
}

break;

case ConsensusVotesRecallMsg votesRecallMsg:
// Note: ConsensusVoteSetBitsMsg will not be stored to context's message log.
_consensusContext.HandleVotesRecall(votesRecallMsg);
break;

case ConsensusMsg consensusMsg:
_consensusContext.HandleMessage(consensusMsg);
break;
Expand Down
30 changes: 1 addition & 29 deletions Libplanet.Net/Consensus/Context.Async.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,12 @@ public void Start(BlockCommit? lastCommit = null, bool bootstrapping = false)
Height,
lastCommit);
_lastCommit = lastCommit;
_bootstrapping = bootstrapping;
ProduceMutation(() => StartRound(0));

// FIXME: Exceptions inside tasks should be handled properly.
_ = MessageConsumerTask(_cancellationTokenSource.Token);
_ = MutationConsumerTask(_cancellationTokenSource.Token);

if (bootstrapping)
{
_ = BootstrappingTask(_cancellationTokenSource.Token);
}
}

/// <summary>
Expand Down Expand Up @@ -99,30 +95,6 @@ internal async Task MutationConsumerTask(CancellationToken cancellationToken)
}
}

internal async Task BootstrappingTask(CancellationToken cancellationToken)
{
while (true)
{
try
{
cancellationToken.ThrowIfCancellationRequested();
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
}
catch (OperationCanceledException oce)
{
_logger.Debug(oce, "Cancellation was requested");
ExceptionOccurred?.Invoke(this, oce);
throw;
}
#pragma warning disable S125
/*if (_heightVoteSet.GetRandomMessage() is { } message)
{
BroadcastMessage(message)
}*/
#pragma warning restore S125
}
}

/// <summary>
/// Adds <paramref name="message"/> to the message queue.
/// </summary>
Expand Down
78 changes: 54 additions & 24 deletions Libplanet.Net/Consensus/Context.Mutate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ private void StartRound(int round)
ToString());
Round = round;
_heightVoteSet.SetRound(round);
if (_bootstrapping)
{
Bootstrap bootstrap = new BootstrapMetadata(
Height, round, DateTimeOffset.UtcNow, _privateKey.PublicKey).Sign(_privateKey);
BroadcastMessage(
new ConsensusBootstrapMsg(bootstrap));
}

Proposal = null;
Step = ConsensusStep.Propose;
if (_validatorSet.GetProposer(Height, Round).PublicKey == _privateKey.PublicKey)
Expand Down Expand Up @@ -101,33 +109,55 @@ private bool AddMessage(ConsensusMsg message)
message);
}

switch (message)
if (message is ConsensusVoteMsg voteMsg)
{
case ConsensusProposalMsg proposal:
AddProposal(proposal.Proposal);
break;
case ConsensusPreVoteMsg preVote:
_heightVoteSet.AddVote(preVote.PreVote);
break;
case ConsensusPreCommitMsg preCommit:
_heightVoteSet.AddVote(preCommit.PreCommit);
break;
case ConsensusMaj23Msg maj23:
_heightVoteSet.SetPeerMaj23(maj23.Maj23);
break;
switch (voteMsg)
{
case ConsensusProposalMsg proposal:
AddProposal(proposal.Proposal);
break;
case ConsensusPreVoteMsg preVote:
_heightVoteSet.AddVote(preVote.PreVote);
break;
case ConsensusPreCommitMsg preCommit:
_heightVoteSet.AddVote(preCommit.PreCommit);
break;
}

_logger.Debug(
"{FName}: Message: {Message} => Height: {Height}, Round: {Round}, " +
"Validator Address: {VAddress}, " +
"Hash: {BlockHash}. (context: {Context})",
nameof(AddMessage),
voteMsg,
voteMsg.Height,
voteMsg.Round,
voteMsg.ValidatorPublicKey.ToAddress(),
voteMsg.BlockHash,
ToString());
}
else
{
switch (message)
{
case ConsensusMaj23Msg maj23:
_heightVoteSet.SetPeerMaj23(maj23.Maj23);
break;
case ConsensusVotesRecallMsg votesRecall:
CatchupWithVotesRecall(votesRecall.VotesRecall);
break;
}

_logger.Debug(
"{FName}: Message: {Message} => Height: {Height}, Round: {Round}, " +
"Validator Address: {VAddress}, " +
"Hash: {BlockHash}. (context: {Context})",
nameof(AddMessage),
message,
message.Height,
message.Round,
message.ValidatorPublicKey.ToAddress(),
message.BlockHash,
ToString());
_logger.Debug(
"{FName}: Message: {Message} => Height: {Height}, Round: {Round}, " +
"Validator Address: {VAddress}. (context: {Context})",
nameof(AddMessage),
message,
message.Height,
message.Round,
message.ValidatorPublicKey.ToAddress(),
ToString());
}

return true;
}
Expand Down
44 changes: 44 additions & 0 deletions Libplanet.Net/Consensus/Context.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ private readonly
private Block? _decision;
private int _committedRound;
private BlockCommit? _lastCommit;
private bool _bootstrapping;

/// <summary>
/// Initializes a new instance of the <see cref="Context"/> class.
Expand Down Expand Up @@ -178,6 +179,7 @@ private readonly
_validRound = -1;
_decision = null;
_committedRound = -1;
_bootstrapping = false;
_blockChain = blockChain;
_codec = new Codec();
_messageRequests = Channel.CreateUnbounded<ConsensusMsg>();
Expand Down Expand Up @@ -299,6 +301,48 @@ public IEnumerable<ConsensusMsg> GetVoteSetBitsResponse(VoteSetBits voteSetBits)
});
}

public VotesRecall GetVotesRecall(int round)
{
ImmutableHashSet<Vote> votes;
if (round < Round)
{
round++;
}

votes = _heightVoteSet.PreVotes(round).List().Concat(
_heightVoteSet.PreCommits(round).List()).ToImmutableHashSet();

return new VotesRecallMetadata(
Height,
round,
DateTimeOffset.UtcNow,
_privateKey.PublicKey,
votes).Sign(_privateKey);
}

public void CatchupWithVotesRecall(VotesRecall votesRecall)
{
foreach (Vote vote in votesRecall.Votes)
{
if (vote.Height != Height)
{
continue;
}

if (vote.Round > Round + 1)
{
continue;
}

if (!vote.Verify())
{
continue;
}

_heightVoteSet.AddVote(vote);
}
}

/// <summary>
/// Returns the summary of context in JSON-formatted string.
/// </summary>
Expand Down
1 change: 0 additions & 1 deletion Libplanet.Net/Consensus/HeightVoteSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public void SetRound(int round)
{
lock (_lock)
{
// FIXME: This shouldn't be _round + 1?
var newRound = _round + 1;
if (_round != 0 && (round < newRound))
{
Expand Down
Loading

0 comments on commit 8feecd7

Please sign in to comment.