-
Notifications
You must be signed in to change notification settings - Fork 0
/
consensus_service.go
executable file
·701 lines (616 loc) · 23.4 KB
/
consensus_service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
package consensus
import (
"math/big"
"sync/atomic"
"time"
"github.com/zennittians/intelchain/core"
"github.com/zennittians/intelchain/core/types"
"github.com/zennittians/intelchain/crypto/bls"
"github.com/zennittians/intelchain/multibls"
"github.com/zennittians/intelchain/webhooks"
"github.com/ethereum/go-ethereum/common"
protobuf "github.com/golang/protobuf/proto"
"github.com/pkg/errors"
"github.com/rs/zerolog"
bls_core "github.com/zennittians/bls/ffi/go/bls"
msg_pb "github.com/zennittians/intelchain/api/proto/message"
consensus_engine "github.com/zennittians/intelchain/consensus/engine"
"github.com/zennittians/intelchain/consensus/quorum"
"github.com/zennittians/intelchain/consensus/signature"
bls_cosi "github.com/zennittians/intelchain/crypto/bls"
"github.com/zennittians/intelchain/crypto/hash"
"github.com/zennittians/intelchain/internal/chain"
"github.com/zennittians/intelchain/internal/utils"
"github.com/zennittians/intelchain/shard"
"github.com/zennittians/intelchain/shard/committee"
)
// WaitForNewRandomness listens to the RndChannel to receive new VDF randomness.
func (consensus *Consensus) WaitForNewRandomness() {
go func() {
for {
vdfOutput := <-consensus.RndChannel
consensus.pendingRnds = append(consensus.pendingRnds, vdfOutput)
}
}()
}
// GetNextRnd returns the oldest available randomness along with the hash of the block there randomness preimage is committed.
func (consensus *Consensus) GetNextRnd() ([vdFAndProofSize]byte, [32]byte, error) {
if len(consensus.pendingRnds) == 0 {
return [vdFAndProofSize]byte{}, [32]byte{}, errors.New("No available randomness")
}
vdfOutput := consensus.pendingRnds[0]
vdfBytes := [vdFAndProofSize]byte{}
seed := [32]byte{}
copy(vdfBytes[:], vdfOutput[:vdFAndProofSize])
copy(seed[:], vdfOutput[vdFAndProofSize:])
//pop the first vdfOutput from the list
consensus.pendingRnds = consensus.pendingRnds[1:]
return vdfBytes, seed, nil
}
var (
empty = []byte{}
)
// Signs the consensus message and returns the marshaled message.
func (consensus *Consensus) signAndMarshalConsensusMessage(message *msg_pb.Message,
priKey *bls_core.SecretKey) ([]byte, error) {
if err := consensus.signConsensusMessage(message, priKey); err != nil {
return empty, err
}
marshaledMessage, err := protobuf.Marshal(message)
if err != nil {
return empty, err
}
return marshaledMessage, nil
}
// UpdatePublicKeys updates the PublicKeys for
// quorum on current subcommittee, protected by a mutex
func (consensus *Consensus) UpdatePublicKeys(pubKeys, allowlist []bls_cosi.PublicKeyWrapper) int64 {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
return consensus.updatePublicKeys(pubKeys, allowlist)
}
func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.PublicKeyWrapper) int64 {
consensus.decider.UpdateParticipants(pubKeys, allowlist)
consensus.getLogger().Info().Msg("My Committee updated")
for i := range pubKeys {
consensus.getLogger().Info().
Int("index", i).
Str("BLSPubKey", pubKeys[i].Bytes.Hex()).
Msg("Member")
}
allKeys := consensus.decider.Participants()
if len(allKeys) != 0 {
consensus.LeaderPubKey = &allKeys[0]
consensus.getLogger().Info().
Str("info", consensus.LeaderPubKey.Bytes.Hex()).Msg("Setting leader as first validator, because provided new keys")
} else {
consensus.getLogger().Error().
Msg("[UpdatePublicKeys] Participants is empty")
}
for i := range pubKeys {
consensus.getLogger().Info().
Int("index", i).
Str("BLSPubKey", pubKeys[i].Bytes.Hex()).
Msg("Member")
}
// reset states after update public keys
// TODO: incorporate bitmaps in the decider, so their state can't be inconsistent.
consensus.updateBitmaps()
consensus.resetState()
// do not reset view change state if it is in view changing mode
if !consensus.isViewChangingMode() {
consensus.resetViewChangeState()
}
return consensus.decider.ParticipantsCount()
}
// Sign on the hash of the message
func (consensus *Consensus) signMessage(message []byte, priKey *bls_core.SecretKey) []byte {
hash := hash.Keccak256(message)
signature := priKey.SignHash(hash[:])
return signature.Serialize()
}
// Sign on the consensus message signature field.
func (consensus *Consensus) signConsensusMessage(message *msg_pb.Message,
priKey *bls_core.SecretKey) error {
message.Signature = nil
marshaledMessage, err := protobuf.Marshal(message)
if err != nil {
return err
}
// 64 byte of signature on previous data
signature := consensus.signMessage(marshaledMessage, priKey)
message.Signature = signature
return nil
}
// UpdateBitmaps update the bitmaps for prepare and commit phase
func (consensus *Consensus) updateBitmaps() {
consensus.getLogger().Debug().
Str("MessageType", consensus.phase.String()).
Msg("[UpdateBitmaps] Updating consensus bitmaps")
members := consensus.decider.Participants()
prepareBitmap := bls_cosi.NewMask(members)
commitBitmap := bls_cosi.NewMask(members)
multiSigBitmap := bls_cosi.NewMask(members)
consensus.prepareBitmap = prepareBitmap
consensus.commitBitmap = commitBitmap
consensus.multiSigBitmap = multiSigBitmap
}
// ResetState resets the state of the consensus
func (consensus *Consensus) resetState() {
consensus.switchPhase("ResetState", FBFTAnnounce)
consensus.blockHash = [32]byte{}
consensus.block = []byte{}
consensus.decider.ResetPrepareAndCommitVotes()
if consensus.prepareBitmap != nil {
consensus.prepareBitmap.Clear()
}
if consensus.commitBitmap != nil {
consensus.commitBitmap.Clear()
}
consensus.aggregatedPrepareSig = nil
consensus.aggregatedCommitSig = nil
}
// IsValidatorInCommittee returns whether the given validator BLS address is part of my committee
func (consensus *Consensus) IsValidatorInCommittee(pubKey bls.SerializedPublicKey) bool {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return consensus.isValidatorInCommittee(pubKey)
}
func (consensus *Consensus) isValidatorInCommittee(pubKey bls.SerializedPublicKey) bool {
return consensus.decider.IndexOf(pubKey) != -1
}
// SetMode sets the mode of consensus
func (consensus *Consensus) SetMode(m Mode) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.setMode(m)
}
// SetMode sets the mode of consensus
func (consensus *Consensus) setMode(m Mode) {
if m == Normal && consensus.isBackup {
m = NormalBackup
}
consensus.getLogger().Debug().
Str("Mode", m.String()).
Msg("[SetMode]")
consensus.current.SetMode(m)
}
// SetIsBackup sets the mode of consensus
func (consensus *Consensus) SetIsBackup(isBackup bool) {
consensus.getLogger().Debug().
Bool("IsBackup", isBackup).
Msg("[SetIsBackup]")
consensus.isBackup = isBackup
consensus.current.SetIsBackup(isBackup)
}
// Mode returns the mode of consensus
func (consensus *Consensus) Mode() Mode {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return consensus.mode()
}
// mode returns the mode of consensus
func (consensus *Consensus) mode() Mode {
return consensus.current.Mode()
}
// RegisterPRndChannel registers the channel for receiving randomness preimage from DRG protocol
func (consensus *Consensus) RegisterPRndChannel(pRndChannel chan []byte) {
consensus.PRndChannel = pRndChannel
}
// RegisterRndChannel registers the channel for receiving final randomness from DRG protocol
func (consensus *Consensus) RegisterRndChannel(rndChannel chan [548]byte) {
consensus.RndChannel = rndChannel
}
// Check viewID, caller's responsibility to hold lock when change ignoreViewIDCheck
func (consensus *Consensus) checkViewID(msg *FBFTMessage) error {
// just ignore consensus check for the first time when node join
if consensus.IgnoreViewIDCheck.IsSet() {
//in syncing mode, node accepts incoming messages without viewID/leaderKey checking
//so only set mode to normal when new node enters consensus and need checking viewID
consensus.setMode(Normal)
consensus.setViewIDs(msg.ViewID)
if !msg.HasSingleSender() {
return errors.New("Leader message can not have multiple sender keys")
}
consensus.LeaderPubKey = msg.SenderPubkeys[0]
consensus.IgnoreViewIDCheck.UnSet()
consensus.consensusTimeout[timeoutConsensus].Start()
consensus.getLogger().Info().
Str("leaderKey", consensus.LeaderPubKey.Bytes.Hex()).
Msg("[checkViewID] Start consensus timer")
return nil
} else if msg.ViewID > consensus.getCurBlockViewID() {
return consensus_engine.ErrViewIDNotMatch
} else if msg.ViewID < consensus.getCurBlockViewID() {
return errors.New("view ID belongs to the past")
}
return nil
}
// SetBlockNum sets the blockNum in consensus object, called at node bootstrap
func (consensus *Consensus) SetBlockNum(blockNum uint64) {
atomic.StoreUint64(&consensus.blockNum, blockNum)
}
// SetBlockNum sets the blockNum in consensus object, called at node bootstrap
func (consensus *Consensus) setBlockNum(blockNum uint64) {
atomic.StoreUint64(&consensus.blockNum, blockNum)
}
// ReadSignatureBitmapPayload read the payload for signature and bitmap; offset is the beginning position of reading
func (consensus *Consensus) ReadSignatureBitmapPayload(recvPayload []byte, offset int) (*bls_core.Sign, *bls_cosi.Mask, error) {
consensus.mutex.RLock()
members := consensus.decider.Participants()
consensus.mutex.RUnlock()
return consensus.readSignatureBitmapPayload(recvPayload, offset, members)
}
func (consensus *Consensus) readSignatureBitmapPayload(recvPayload []byte, offset int, members multibls.PublicKeys) (*bls_core.Sign, *bls_cosi.Mask, error) {
if offset+bls.BLSSignatureSizeInBytes > len(recvPayload) {
return nil, nil, errors.New("payload not have enough length")
}
sigAndBitmapPayload := recvPayload[offset:]
// TODO(audit): keep a Mask in the Decider so it won't be reconstructed on the fly.
return chain.ReadSignatureBitmapByPublicKeys(
sigAndBitmapPayload, members,
)
}
// UpdateConsensusInformation will update shard information (epoch, publicKeys, blockNum, viewID)
// based on the local blockchain. It is called in two cases for now:
// 1. consensus object initialization. because of current dependency where chainreader is only available
// after node is initialized; node is only available after consensus is initialized
// we need call this function separately after create consensus object
// 2. after state syncing is finished
// It will return the mode:
// (a) node not in committed: Listening mode
// (b) node in committed but has any err during processing: Syncing mode
// (c) node in committed and everything looks good: Normal mode
func (consensus *Consensus) UpdateConsensusInformation() Mode {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
return consensus.updateConsensusInformation()
}
func (consensus *Consensus) updateConsensusInformation() Mode {
curHeader := consensus.Blockchain().CurrentHeader()
curEpoch := curHeader.Epoch()
nextEpoch := new(big.Int).Add(curHeader.Epoch(), common.Big1)
// Overwrite nextEpoch if the shard state has a epoch number
if curHeader.IsLastBlockInEpoch() {
nextShardState, err := curHeader.GetShardState()
if err != nil {
consensus.getLogger().Error().
Err(err).
Uint32("shard", consensus.ShardID).
Msg("[UpdateConsensusInformation] Error retrieving current shard state in the first block")
return Syncing
}
if nextShardState.Epoch != nil {
nextEpoch = nextShardState.Epoch
}
}
consensus.BlockPeriod = 5 * time.Second
// Enable 2s block time at the twoSecondsEpoch
if consensus.Blockchain().Config().IsTwoSeconds(nextEpoch) {
consensus.BlockPeriod = 2 * time.Second
}
isFirstTimeStaking := consensus.Blockchain().Config().IsStaking(nextEpoch) &&
curHeader.IsLastBlockInEpoch() && !consensus.Blockchain().Config().IsStaking(curEpoch)
haventUpdatedDecider := consensus.Blockchain().Config().IsStaking(curEpoch) &&
consensus.decider.Policy() != quorum.SuperMajorityStake
// Only happens once, the flip-over to a new Decider policy
if isFirstTimeStaking || haventUpdatedDecider {
decider := quorum.NewDecider(quorum.SuperMajorityStake, consensus.ShardID)
consensus.decider = decider
}
var committeeToSet *shard.Committee
epochToSet := curEpoch
hasError := false
curShardState, err := committee.WithStakingEnabled.ReadFromDB(
curEpoch, consensus.Blockchain(),
)
if err != nil {
consensus.getLogger().Error().
Err(err).
Uint32("shard", consensus.ShardID).
Msg("[UpdateConsensusInformation] Error retrieving current shard state")
return Syncing
}
consensus.getLogger().Info().Msg("[UpdateConsensusInformation] Updating.....")
// genesis block is a special case that will have shard state and needs to skip processing
isNotGenesisBlock := curHeader.Number().Cmp(big.NewInt(0)) > 0
if curHeader.IsLastBlockInEpoch() && isNotGenesisBlock {
nextShardState, err := committee.WithStakingEnabled.ReadFromDB(
nextEpoch, consensus.Blockchain(),
)
if err != nil {
consensus.getLogger().Error().
Err(err).
Uint32("shard", consensus.ShardID).
Msg("Error retrieving nextEpoch shard state")
return Syncing
}
subComm, err := nextShardState.FindCommitteeByID(curHeader.ShardID())
if err != nil {
consensus.getLogger().Error().
Err(err).
Uint32("shard", consensus.ShardID).
Msg("Error retrieving nextEpoch shard state")
return Syncing
}
committeeToSet = subComm
epochToSet = nextEpoch
} else {
subComm, err := curShardState.FindCommitteeByID(curHeader.ShardID())
if err != nil {
consensus.getLogger().Error().
Err(err).
Uint32("shard", consensus.ShardID).
Msg("Error retrieving current shard state")
return Syncing
}
committeeToSet = subComm
}
if len(committeeToSet.Slots) == 0 {
consensus.getLogger().Warn().
Msg("[UpdateConsensusInformation] No members in the committee to update")
hasError = true
}
// update public keys in the committee
oldLeader := consensus.LeaderPubKey
pubKeys, _ := committeeToSet.BLSPublicKeys()
consensus.getLogger().Info().
Int("numPubKeys", len(pubKeys)).
Msg("[UpdateConsensusInformation] Successfully updated public keys")
consensus.updatePublicKeys(pubKeys, shard.Schedule.InstanceForEpoch(nextEpoch).ExternalAllowlist())
// Update voters in the committee
if _, err := consensus.decider.SetVoters(
committeeToSet, epochToSet,
); err != nil {
consensus.getLogger().Error().
Err(err).
Uint32("shard", consensus.ShardID).
Msg("Error when updating voters")
return Syncing
}
// take care of possible leader change during the epoch
// TODO: in a very rare case, when a M1 view change happened, the block contains coinbase for last leader
// but the new leader is actually recognized by most of the nodes. At this time, if a node sync to this
// exact block and set its leader, it will set with the failed leader as in the coinbase of the block.
// This is a very rare case scenario and not likely to cause any issue in mainnet. But we need to think about
// a solution to take care of this case because the coinbase of the latest block doesn't really represent the
// the real current leader in case of M1 view change.
if !curHeader.IsLastBlockInEpoch() && curHeader.Number().Uint64() != 0 {
leaderPubKey, err := chain.GetLeaderPubKeyFromCoinbase(consensus.Blockchain(), curHeader)
if err != nil || leaderPubKey == nil {
consensus.getLogger().Error().Err(err).
Msg("[UpdateConsensusInformation] Unable to get leaderPubKey from coinbase")
consensus.IgnoreViewIDCheck.Set()
hasError = true
} else {
consensus.getLogger().Info().
Str("leaderPubKey", leaderPubKey.Bytes.Hex()).
Msg("[UpdateConsensusInformation] Most Recent LeaderPubKey Updated Based on BlockChain")
consensus.LeaderPubKey = leaderPubKey
}
}
for _, key := range pubKeys {
// in committee
myPubKeys := consensus.getPublicKeys()
if myPubKeys.Contains(key.Object) {
if hasError {
consensus.getLogger().Error().
Str("myKey", myPubKeys.SerializeToHexStr()).
Msg("[UpdateConsensusInformation] hasError")
return Syncing
}
// If the leader changed and I myself become the leader
if (oldLeader != nil && consensus.LeaderPubKey != nil &&
!consensus.LeaderPubKey.Object.IsEqual(oldLeader.Object)) && consensus.isLeader() {
go func() {
consensus.GetLogger().Info().
Str("myKey", myPubKeys.SerializeToHexStr()).
Msg("[UpdateConsensusInformation] I am the New Leader")
consensus.ReadySignal(SyncProposal)
}()
}
return Normal
}
}
consensus.getLogger().Info().
Msg("[UpdateConsensusInformation] not in committee, Listening")
// not in committee
return Listening
}
// IsLeader check if the node is a leader or not by comparing the public key of
// the node with the leader public key
func (consensus *Consensus) IsLeader() bool {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return consensus.isLeader()
}
// isLeader check if the node is a leader or not by comparing the public key of
// the node with the leader public key. This function assume it runs under lock.
func (consensus *Consensus) isLeader() bool {
obj := consensus.LeaderPubKey.Object
for _, key := range consensus.priKey {
if key.Pub.Object.IsEqual(obj) {
return true
}
}
return false
}
// SetViewIDs set both current view ID and view changing ID to the height
// of the blockchain. It is used during client startup to recover the state
func (consensus *Consensus) SetViewIDs(height uint64) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.setViewIDs(height)
}
// SetViewIDs set both current view ID and view changing ID to the height
// of the blockchain. It is used during client startup to recover the state
func (consensus *Consensus) setViewIDs(height uint64) {
consensus.setCurBlockViewID(height)
consensus.setViewChangingID(height)
}
// SetCurBlockViewID set the current view ID
func (consensus *Consensus) SetCurBlockViewID(viewID uint64) uint64 {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
return consensus.setCurBlockViewID(viewID)
}
// SetCurBlockViewID set the current view ID
func (consensus *Consensus) setCurBlockViewID(viewID uint64) uint64 {
return consensus.current.SetCurBlockViewID(viewID)
}
// SetViewChangingID set the current view change ID
func (consensus *Consensus) SetViewChangingID(viewID uint64) {
consensus.current.SetViewChangingID(viewID)
}
// SetViewChangingID set the current view change ID
func (consensus *Consensus) setViewChangingID(viewID uint64) {
consensus.current.SetViewChangingID(viewID)
}
// StartFinalityCount set the finality counter to current time
func (consensus *Consensus) StartFinalityCount() {
consensus.finalityCounter.Store(time.Now().UnixNano())
}
// FinishFinalityCount calculate the current finality
func (consensus *Consensus) FinishFinalityCount() {
d := time.Now().UnixNano()
if prior, ok := consensus.finalityCounter.Load().(int64); ok {
consensus.finality = (d - prior) / 1000000
consensusFinalityHistogram.Observe(float64(consensus.finality))
}
}
// GetFinality returns the finality time in milliseconds of previous consensus
func (consensus *Consensus) GetFinality() int64 {
return consensus.finality
}
// switchPhase will switch FBFTPhase to desired phase.
func (consensus *Consensus) switchPhase(subject string, desired FBFTPhase) {
consensus.getLogger().Info().
Str("from:", consensus.phase.String()).
Str("to:", desired.String()).
Str("switchPhase:", subject)
consensus.phase = desired
}
var (
errGetPreparedBlock = errors.New("failed to get prepared block for self commit")
errReadBitmapPayload = errors.New("failed to read signature bitmap payload")
)
// selfCommit will create a commit message and commit it locally
// it is used by the new leadder of the view change routine
// when view change is succeeded and the new leader
// received prepared payload from other validators or from local
func (consensus *Consensus) selfCommit(payload []byte) error {
var blockHash [32]byte
copy(blockHash[:], payload[:32])
// Leader sign and add commit message
block := consensus.fBFTLog.GetBlockByHash(blockHash)
if block == nil {
return errGetPreparedBlock
}
aggSig, mask, err := consensus.readSignatureBitmapPayload(payload, 32, consensus.decider.Participants())
if err != nil {
return errReadBitmapPayload
}
// Have to keep the block hash so the leader can finish the commit phase of prepared block
consensus.resetState()
copy(consensus.blockHash[:], blockHash[:])
consensus.switchPhase("selfCommit", FBFTCommit)
consensus.aggregatedPrepareSig = aggSig
consensus.prepareBitmap = mask
commitPayload := signature.ConstructCommitPayload(consensus.ChainReader().Config(),
block.Epoch(), block.Hash(), block.NumberU64(), block.Header().ViewID().Uint64())
for i, key := range consensus.priKey {
if err := consensus.commitBitmap.SetKey(key.Pub.Bytes, true); err != nil {
consensus.getLogger().Error().
Err(err).
Int("Index", i).
Str("Key", key.Pub.Bytes.Hex()).
Msg("[selfCommit] New Leader commit bitmap set failed")
continue
}
if _, err := consensus.decider.AddNewVote(
quorum.Commit,
[]*bls_cosi.PublicKeyWrapper{key.Pub},
key.Pri.SignHash(commitPayload),
common.BytesToHash(consensus.blockHash[:]),
block.NumberU64(),
block.Header().ViewID().Uint64(),
); err != nil {
consensus.getLogger().Warn().
Err(err).
Int("Index", i).
Str("Key", key.Pub.Bytes.Hex()).
Msg("[selfCommit] submit vote on viewchange commit failed")
}
}
return nil
}
// NumSignaturesIncludedInBlock returns the number of signatures included in the block
func (consensus *Consensus) NumSignaturesIncludedInBlock(block *types.Block) uint32 {
count := uint32(0)
consensus.mutex.Lock()
members := consensus.decider.Participants()
pubKeys := consensus.getPublicKeys()
consensus.mutex.Unlock()
// TODO(audit): do not reconstruct the Mask
mask := bls.NewMask(members)
err := mask.SetMask(block.Header().LastCommitBitmap())
if err != nil {
return count
}
for _, key := range pubKeys {
if ok, err := mask.KeyEnabled(key.Bytes); err == nil && ok {
count++
}
}
return count
}
// GetLogger returns logger for consensus contexts added.
func (consensus *Consensus) GetLogger() *zerolog.Logger {
consensus.mutex.RLock()
defer consensus.mutex.RUnlock()
return consensus.getLogger()
}
// getLogger returns logger for consensus contexts added
func (consensus *Consensus) getLogger() *zerolog.Logger {
logger := utils.Logger().With().
Uint64("myBlock", consensus.blockNum).
Uint64("myViewID", consensus.getCurBlockViewID()).
Str("phase", consensus.phase.String()).
Str("mode", consensus.current.Mode().String()).
Logger()
return &logger
}
// BlockVerifier is called by consensus participants to verify the block (account model) they are
// running consensus on.
func (consensus *Consensus) BlockVerifier(newBlock *types.Block) error {
if err := consensus.Blockchain().ValidateNewBlock(newBlock, consensus.Beaconchain()); err != nil {
switch {
case errors.Is(err, core.ErrKnownBlock):
return nil
default:
}
if hooks := consensus.registry.GetWebHooks(); hooks != nil {
if p := hooks.ProtocolIssues; p != nil {
url := p.OnCannotCommit
go func() {
webhooks.DoPost(url, map[string]interface{}{
"bad-header": newBlock.Header(),
"reason": err.Error(),
})
}()
}
}
utils.Logger().Error().
Str("blockHash", newBlock.Hash().Hex()).
Int("numTx", len(newBlock.Transactions())).
Int("numStakingTx", len(newBlock.StakingTransactions())).
Err(err).
Msgf("[VerifyNewBlock] Cannot Verify New Block!!!, blockHeight %d, myHeight %d", newBlock.NumberU64(), consensus.Blockchain().CurrentHeader().NumberU64())
return errors.WithMessagef(err,
"[VerifyNewBlock] Cannot Verify New Block!!! block-hash %s txn-count %d",
newBlock.Hash().Hex(),
len(newBlock.Transactions()),
)
}
return nil
}