Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add commit phase to consensus algorithm #534

Merged
merged 10 commits into from
Jan 14, 2019
3 changes: 1 addition & 2 deletions neo.UnitTests/UT_Consensus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ public void ConsensusService_Primary_Sends_PrepareRequest_After_OnStart()
Nonce = mockConsensusContext.Object.Nonce,
NextConsensus = mockConsensusContext.Object.NextConsensus,
TransactionHashes = new UInt256[0],
MinerTransaction = minerTx, //(MinerTransaction)Transactions[TransactionHashes[0]],
Signature = new byte[64]//Signatures[MyIndex]
MinerTransaction = minerTx //(MinerTransaction)Transactions[TransactionHashes[0]],
};

ConsensusMessage mprep = prep;
Expand Down
25 changes: 25 additions & 0 deletions neo/Consensus/Commit.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System.IO;

namespace Neo.Consensus
{
internal class Commit : ConsensusMessage
{
public byte[] Signature;

public override int Size => base.Size + Signature.Length;

public Commit() : base(ConsensusMessageType.Commit) { }

public override void Deserialize(BinaryReader reader)
{
base.Deserialize(reader);
Signature = reader.ReadBytes(64);
}

public override void Serialize(BinaryWriter writer)
{
base.Serialize(writer);
writer.Write(Signature);
}
}
}
45 changes: 23 additions & 22 deletions neo/Consensus/ConsensusContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ internal class ConsensusContext : IConsensusContext
public UInt160 NextConsensus { get; set; }
public UInt256[] TransactionHashes { get; set; }
public Dictionary<UInt256, Transaction> Transactions { get; set; }
public byte[][] Signatures { get; set; }
public bool[] Preparations { get; set; }
public byte[][] Commits { get; set; }
public byte[] ExpectedView { get; set; }
private Snapshot snapshot;
private KeyPair keyPair;
Expand All @@ -46,14 +47,12 @@ public ConsensusContext(Wallet wallet)

public void ChangeView(byte view_number)
{
State &= ConsensusState.SignatureSent;
State = ConsensusState.Initial;
ViewNumber = view_number;
PrimaryIndex = GetPrimaryIndex(view_number);
if (State == ConsensusState.Initial)
{
TransactionHashes = null;
Signatures = new byte[Validators.Length][];
}
TransactionHashes = null;
Preparations = new bool[Validators.Length];
Commits = new byte[Validators.Length][];
if (MyIndex >= 0)
ExpectedView[MyIndex] = view_number;
_header = null;
Expand All @@ -66,9 +65,9 @@ public Block CreateBlock()
Contract contract = Contract.CreateMultiSigContract(M, Validators);
ContractParametersContext sc = new ContractParametersContext(block);
for (int i = 0, j = 0; i < Validators.Length && j < M; i++)
if (Signatures[i] != null)
if (Commits[i] != null)
{
sc.AddSignature(contract, Validators[i], Signatures[i]);
sc.AddSignature(contract, Validators[i], Commits[i]);
j++;
}
sc.Verifiable.Witnesses = sc.GetWitnesses();
Expand All @@ -95,6 +94,16 @@ public ConsensusPayload MakeChangeView()
});
}

public ConsensusPayload MakeCommit()
{
if (Commits[MyIndex] == null)
Commits[MyIndex] = MakeHeader()?.Sign(keyPair);
erikzhang marked this conversation as resolved.
Show resolved Hide resolved
return MakeSignedPayload(new Commit
{
Signature = Commits[MyIndex]
});
}

private Block _header = null;
public Block MakeHeader()
{
Expand Down Expand Up @@ -132,11 +141,6 @@ private ConsensusPayload MakeSignedPayload(ConsensusMessage message)
return payload;
}

public void SignHeader()
{
Signatures[MyIndex] = MakeHeader()?.Sign(keyPair);
}

private void SignPayload(ConsensusPayload payload)
{
ContractParametersContext sc;
Expand All @@ -159,17 +163,13 @@ public ConsensusPayload MakePrepareRequest()
Nonce = Nonce,
NextConsensus = NextConsensus,
TransactionHashes = TransactionHashes,
MinerTransaction = (MinerTransaction)Transactions[TransactionHashes[0]],
Signature = Signatures[MyIndex]
MinerTransaction = (MinerTransaction)Transactions[TransactionHashes[0]]
});
}

public ConsensusPayload MakePrepareResponse(byte[] signature)
public ConsensusPayload MakePrepareResponse()
{
return MakeSignedPayload(new PrepareResponse
{
Signature = signature
});
return MakeSignedPayload(new PrepareResponse());
}

public void Reset()
Expand All @@ -184,7 +184,8 @@ public void Reset()
MyIndex = -1;
PrimaryIndex = BlockIndex % (uint)Validators.Length;
TransactionHashes = null;
Signatures = new byte[Validators.Length][];
Preparations = new bool[Validators.Length];
Commits = new byte[Validators.Length][];
ExpectedView = new byte[Validators.Length];
keyPair = null;
for (int i = 0; i < Validators.Length; i++)
Expand Down
2 changes: 2 additions & 0 deletions neo/Consensus/ConsensusMessageType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@ internal enum ConsensusMessageType : byte
PrepareRequest = 0x20,
[ReflectionCache(typeof(PrepareResponse))]
PrepareResponse = 0x21,
[ReflectionCache(typeof(Commit))]
Commit = 0x30,
}
}
108 changes: 66 additions & 42 deletions neo/Consensus/ConsensusService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ private bool AddTransaction(Transaction tx, bool verify)
if (context.VerifyRequest())
{
Log($"send prepare response");
context.State |= ConsensusState.SignatureSent;
context.SignHeader();
localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakePrepareResponse(context.Signatures[context.MyIndex]) });
CheckSignatures();
context.State |= ConsensusState.ResponseSent;
context.Preparations[context.MyIndex] = true;
localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakePrepareResponse() });
CheckPreparations();
}
else
{
Expand All @@ -83,6 +83,17 @@ private void ChangeTimer(TimeSpan delay)
}, ActorRefs.NoSender);
}

private void CheckCommits()
{
if (context.Commits.Count(p => p != null) >= context.M && context.TransactionHashes.All(p => context.Transactions.ContainsKey(p)))
{
Block block = context.CreateBlock();
Log($"relay block: {block.Hash}");
localNode.Tell(new LocalNode.Relay { Inventory = block });
context.State |= ConsensusState.BlockSent;
}
}

private void CheckExpectedView(byte view_number)
{
if (context.ViewNumber == view_number) return;
Expand All @@ -92,14 +103,15 @@ private void CheckExpectedView(byte view_number)
}
}

private void CheckSignatures()
private void CheckPreparations()
{
if (context.Signatures.Count(p => p != null) >= context.M && context.TransactionHashes.All(p => context.Transactions.ContainsKey(p)))
if (context.Preparations.Count(p => p) >= context.M && context.TransactionHashes.All(p => context.Transactions.ContainsKey(p)))
{
Block block = context.CreateBlock();
Log($"relay block: {block.Hash}");
localNode.Tell(new LocalNode.Relay { Inventory = block });
context.State |= ConsensusState.BlockSent;
ConsensusPayload payload = context.MakeCommit();
Log($"send commit");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra information is also nice here, such as view and height,. good for drawing logs.

localNode.Tell(new LocalNode.SendDirectly { Inventory = payload });
context.State |= ConsensusState.CommitSent;
CheckCommits();
}
}

Expand Down Expand Up @@ -136,13 +148,31 @@ private void Log(string message, LogLevel level = LogLevel.Info)

private void OnChangeViewReceived(ConsensusPayload payload, ChangeView message)
{
if (context.State.HasFlag(ConsensusState.CommitSent))
erikzhang marked this conversation as resolved.
Show resolved Hide resolved
return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not have recover message. What's would happen if half of the CN was restarted when others are in this step

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it has to have the regeneration message. I noticed the same thing as well, but believe they want to make the change in a separate PR on top of this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it will come in other PR, it's fine

Copy link
Member

@vncoelho vncoelho Jan 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still prefer PR #426, @erikzhang.

That PR is more complete and already tested.
The only different is, basically, the partial signature merged with Payload Witness.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will save the state of the commit phase on the hard disk in the next pr. If the node restarts and finds the state of the commit phase last time, it will automatically recover from this.

Copy link
Member

@vncoelho vncoelho Jan 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Erik, the Regeneration phase is also important for replying to nodes that asks for change_view (or any other payload that is not expected during commit phase). Those that receive the Regeneration Payload should be able to verify that commit was already achieved.

Copy link
Member

@vncoelho vncoelho Jan 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This point was not resolved here. However, it will be further discussed in the next PR to be merged into #547.

if (message.NewViewNumber <= context.ExpectedView[payload.ValidatorIndex])
return;
Log($"{nameof(OnChangeViewReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex} nv={message.NewViewNumber}");
context.ExpectedView[payload.ValidatorIndex] = message.NewViewNumber;
CheckExpectedView(message.NewViewNumber);
}

private void OnCommitReceived(ConsensusPayload payload, Commit commit)
{
if (context.Commits[payload.ValidatorIndex] != null) return;
Log($"{nameof(OnCommitReceived)}: height={payload.BlockIndex} view={commit.ViewNumber} index={payload.ValidatorIndex}");
byte[] hashData = context.MakeHeader()?.GetHashData();
if (hashData == null)
{
jsolman marked this conversation as resolved.
Show resolved Hide resolved
context.Commits[payload.ValidatorIndex] = commit.Signature;
}
else if (Crypto.Default.VerifySignature(hashData, commit.Signature, context.Validators[payload.ValidatorIndex].EncodePoint(false)))
{
context.Commits[payload.ValidatorIndex] = commit.Signature;
CheckCommits();
}
}

private void OnConsensusPayload(ConsensusPayload payload)
{
if (context.State.HasFlag(ConsensusState.BlockSent)) return;
Expand All @@ -169,16 +199,19 @@ private void OnConsensusPayload(ConsensusPayload payload)
}
if (message.ViewNumber != context.ViewNumber && message.Type != ConsensusMessageType.ChangeView)
return;
switch (message.Type)
switch (message)
erikzhang marked this conversation as resolved.
Show resolved Hide resolved
{
case ConsensusMessageType.ChangeView:
OnChangeViewReceived(payload, (ChangeView)message);
case ChangeView view:
OnChangeViewReceived(payload, view);
break;
case ConsensusMessageType.PrepareRequest:
OnPrepareRequestReceived(payload, (PrepareRequest)message);
case PrepareRequest request:
OnPrepareRequestReceived(payload, request);
break;
case ConsensusMessageType.PrepareResponse:
OnPrepareResponseReceived(payload, (PrepareResponse)message);
case PrepareResponse response:
OnPrepareResponseReceived(payload, response);
break;
case Commit commit:
OnCommitReceived(payload, commit);
break;
}
}
Expand Down Expand Up @@ -212,13 +245,12 @@ private void OnPrepareRequestReceived(ConsensusPayload payload, PrepareRequest m
context.NextConsensus = message.NextConsensus;
context.TransactionHashes = message.TransactionHashes;
context.Transactions = new Dictionary<UInt256, Transaction>();
context.Preparations[payload.ValidatorIndex] = true;
byte[] hashData = context.MakeHeader().GetHashData();
if (!Crypto.Default.VerifySignature(hashData, message.Signature, context.Validators[payload.ValidatorIndex].EncodePoint(false))) return;
for (int i = 0; i < context.Signatures.Length; i++)
if (context.Signatures[i] != null)
if (!Crypto.Default.VerifySignature(hashData, context.Signatures[i], context.Validators[i].EncodePoint(false)))
context.Signatures[i] = null;
context.Signatures[payload.ValidatorIndex] = message.Signature;
for (int i = 0; i < context.Commits.Length; i++)
if (context.Commits[i] != null)
if (!Crypto.Default.VerifySignature(hashData, context.Commits[i], context.Validators[i].EncodePoint(false)))
vncoelho marked this conversation as resolved.
Show resolved Hide resolved
context.Commits[i] = null;
Dictionary<UInt256, Transaction> mempool = Blockchain.Singleton.GetMemoryPool().ToDictionary(p => p.Hash);
List<Transaction> unverified = new List<Transaction>();
foreach (UInt256 hash in context.TransactionHashes.Skip(1))
Expand Down Expand Up @@ -251,18 +283,12 @@ private void OnPrepareRequestReceived(ConsensusPayload payload, PrepareRequest m

private void OnPrepareResponseReceived(ConsensusPayload payload, PrepareResponse message)
{
if (context.Signatures[payload.ValidatorIndex] != null) return;
if (context.Preparations[payload.ValidatorIndex]) return;
Log($"{nameof(OnPrepareResponseReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex}");
byte[] hashData = context.MakeHeader()?.GetHashData();
if (hashData == null)
{
context.Signatures[payload.ValidatorIndex] = message.Signature;
}
else if (Crypto.Default.VerifySignature(hashData, message.Signature, context.Validators[payload.ValidatorIndex].EncodePoint(false)))
{
context.Signatures[payload.ValidatorIndex] = message.Signature;
CheckSignatures();
}
if (context.State.HasFlag(ConsensusState.CommitSent)) return;
context.Preparations[payload.ValidatorIndex] = true;
if (context.State.HasFlag(ConsensusState.RequestSent) || context.State.HasFlag(ConsensusState.RequestReceived))
CheckPreparations();
}

protected override void OnReceive(object message)
Expand Down Expand Up @@ -304,13 +330,10 @@ private void OnTimer(Timer timer)
if (context.State.HasFlag(ConsensusState.Primary) && !context.State.HasFlag(ConsensusState.RequestSent))
{
Log($"send prepare request: height={timer.Height} view={timer.ViewNumber}");
context.State |= ConsensusState.RequestSent;
if (!context.State.HasFlag(ConsensusState.SignatureSent))
{
context.Fill();
context.SignHeader();
}
context.Fill();
erikzhang marked this conversation as resolved.
Show resolved Hide resolved
localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakePrepareRequest() });
context.State |= ConsensusState.RequestSent;
context.Preparations[context.MyIndex] = true;
if (context.TransactionHashes.Length > 1)
{
foreach (InvPayload payload in InvPayload.CreateGroup(InventoryType.TX, context.TransactionHashes.Skip(1).ToArray()))
Expand All @@ -320,14 +343,15 @@ private void OnTimer(Timer timer)
}
else if ((context.State.HasFlag(ConsensusState.Primary) && context.State.HasFlag(ConsensusState.RequestSent)) || context.State.HasFlag(ConsensusState.Backup))
{
RequestChangeView();
if (!context.State.HasFlag(ConsensusState.CommitSent))
erikzhang marked this conversation as resolved.
Show resolved Hide resolved
RequestChangeView();
}
}

private void OnTransaction(Transaction transaction)
{
if (transaction.Type == TransactionType.MinerTransaction) return;
if (!context.State.HasFlag(ConsensusState.Backup) || !context.State.HasFlag(ConsensusState.RequestReceived) || context.State.HasFlag(ConsensusState.SignatureSent) || context.State.HasFlag(ConsensusState.ViewChanging) || context.State.HasFlag(ConsensusState.BlockSent))
if (!context.State.HasFlag(ConsensusState.Backup) || !context.State.HasFlag(ConsensusState.RequestReceived) || context.State.HasFlag(ConsensusState.ResponseSent) || context.State.HasFlag(ConsensusState.ViewChanging) || context.State.HasFlag(ConsensusState.BlockSent))
return;
if (context.Transactions.ContainsKey(transaction.Hash)) return;
if (!context.TransactionHashes.Contains(transaction.Hash)) return;
Expand Down
7 changes: 4 additions & 3 deletions neo/Consensus/ConsensusState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ public enum ConsensusState : byte
Backup = 0x02,
RequestSent = 0x04,
RequestReceived = 0x08,
SignatureSent = 0x10,
BlockSent = 0x20,
ViewChanging = 0x40,
ResponseSent = 0x10,
CommitSent = 0x20,
BlockSent = 0x40,
ViewChanging = 0x80,
}
}
9 changes: 5 additions & 4 deletions neo/Consensus/IConsensusContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ public interface IConsensusContext : IDisposable
UInt160 NextConsensus { get; set; }
UInt256[] TransactionHashes { get; set; }
Dictionary<UInt256, Transaction> Transactions { get; set; }
byte[][] Signatures { get; set; }
bool[] Preparations { get; set; }
byte[][] Commits { get; set; }
byte[] ExpectedView { get; set; }

int M { get; }
Expand All @@ -40,13 +41,13 @@ public interface IConsensusContext : IDisposable

ConsensusPayload MakeChangeView();

Block MakeHeader();
ConsensusPayload MakeCommit();

void SignHeader();
Block MakeHeader();

ConsensusPayload MakePrepareRequest();

ConsensusPayload MakePrepareResponse(byte[] signature);
ConsensusPayload MakePrepareResponse();

void Reset();

Expand Down
Loading