Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrated multikey in consensus #4463

Merged
merged 5 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 34 additions & 16 deletions consensus/broadcast/commonMessenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ type delayedBroadcaster interface {
SetValidatorData(data *delayedBroadcastData) error
SetHeaderForValidator(vData *validatorHeaderBroadcastData) error
SetBroadcastHandlers(
mbBroadcast func(mbData map[uint32][]byte) error,
txBroadcast func(txData map[string][][]byte) error,
headerBroadcast func(header data.HeaderHandler) error,
mbBroadcast func(mbData map[uint32][]byte, pkBytes []byte) error,
txBroadcast func(txData map[string][][]byte, pkBytes []byte) error,
headerBroadcast func(header data.HeaderHandler, pkBytes []byte) error,
) error
Close()
}
Expand All @@ -39,25 +39,25 @@ type commonMessenger struct {
marshalizer marshal.Marshalizer
hasher hashing.Hasher
messenger consensus.P2PMessenger
privateKey crypto.PrivateKey
shardCoordinator sharding.Coordinator
peerSignatureHandler crypto.PeerSignatureHandler
delayedBlockBroadcaster delayedBroadcaster
keysHandler consensus.KeysHandler
}

// CommonMessengerArgs holds the arguments for creating commonMessenger instance
type CommonMessengerArgs struct {
Marshalizer marshal.Marshalizer
Hasher hashing.Hasher
Messenger consensus.P2PMessenger
PrivateKey crypto.PrivateKey
ShardCoordinator sharding.Coordinator
PeerSignatureHandler crypto.PeerSignatureHandler
HeadersSubscriber consensus.HeadersPoolSubscriber
InterceptorsContainer process.InterceptorsContainer
MaxDelayCacheSize uint32
MaxValidatorDelayCacheSize uint32
AlarmScheduler core.TimersScheduler
KeysHandler consensus.KeysHandler
}

func checkCommonMessengerNilParameters(
Expand All @@ -72,9 +72,6 @@ func checkCommonMessengerNilParameters(
if check.IfNil(args.Messenger) {
return spos.ErrNilMessenger
}
if check.IfNil(args.PrivateKey) {
return spos.ErrNilPrivateKey
}
if check.IfNil(args.ShardCoordinator) {
return spos.ErrNilShardCoordinator
}
Expand All @@ -93,13 +90,17 @@ func checkCommonMessengerNilParameters(
if args.MaxDelayCacheSize == 0 || args.MaxValidatorDelayCacheSize == 0 {
return spos.ErrInvalidCacheSize
}
if check.IfNil(args.KeysHandler) {
return ErrNilKeysHandler
}

return nil
}

// BroadcastConsensusMessage will send on consensus topic the consensus message
func (cm *commonMessenger) BroadcastConsensusMessage(message *consensus.Message) error {
signature, err := cm.peerSignatureHandler.GetPeerSignature(cm.privateKey, message.OriginatorPid)
privateKey := cm.keysHandler.GetHandledPrivateKey(message.PubKey)
signature, err := cm.peerSignatureHandler.GetPeerSignature(privateKey, message.OriginatorPid)
if err != nil {
return err
}
Expand All @@ -114,18 +115,18 @@ func (cm *commonMessenger) BroadcastConsensusMessage(message *consensus.Message)
consensusTopic := common.ConsensusTopic +
cm.shardCoordinator.CommunicationIdentifier(cm.shardCoordinator.SelfId())

cm.messenger.Broadcast(consensusTopic, buff)
cm.broadcast(consensusTopic, buff, message.PubKey)

return nil
}

// BroadcastMiniBlocks will send on miniblocks topic the cross-shard miniblocks
func (cm *commonMessenger) BroadcastMiniBlocks(miniBlocks map[uint32][]byte) error {
func (cm *commonMessenger) BroadcastMiniBlocks(miniBlocks map[uint32][]byte, pkBytes []byte) error {
for k, v := range miniBlocks {
miniBlocksTopic := factory.MiniBlocksTopic +
cm.shardCoordinator.CommunicationIdentifier(k)

cm.messenger.Broadcast(miniBlocksTopic, v)
cm.broadcast(miniBlocksTopic, v, pkBytes)
}

if len(miniBlocks) > 0 {
Expand All @@ -138,7 +139,7 @@ func (cm *commonMessenger) BroadcastMiniBlocks(miniBlocks map[uint32][]byte) err
}

// BroadcastTransactions will send on transaction topic the transactions
func (cm *commonMessenger) BroadcastTransactions(transactions map[string][][]byte) error {
func (cm *commonMessenger) BroadcastTransactions(transactions map[string][][]byte, pkBytes []byte) error {
dataPacker, err := partitioning.NewSimpleDataPacker(cm.marshalizer)
if err != nil {
return err
Expand All @@ -155,7 +156,7 @@ func (cm *commonMessenger) BroadcastTransactions(transactions map[string][][]byt
}

for _, buff := range packets {
cm.messenger.Broadcast(topic, buff)
cm.broadcast(topic, buff, pkBytes)
}
}

Expand All @@ -172,12 +173,13 @@ func (cm *commonMessenger) BroadcastTransactions(transactions map[string][][]byt
func (cm *commonMessenger) BroadcastBlockData(
miniBlocks map[uint32][]byte,
transactions map[string][][]byte,
pkBytes []byte,
extraDelayForBroadcast time.Duration,
) {
time.Sleep(extraDelayForBroadcast)

if len(miniBlocks) > 0 {
err := cm.BroadcastMiniBlocks(miniBlocks)
err := cm.BroadcastMiniBlocks(miniBlocks, pkBytes)
if err != nil {
log.Warn("commonMessenger.BroadcastBlockData: broadcast miniblocks", "error", err.Error())
}
Expand All @@ -186,7 +188,7 @@ func (cm *commonMessenger) BroadcastBlockData(
time.Sleep(common.ExtraDelayBetweenBroadcastMbsAndTxs)

if len(transactions) > 0 {
err := cm.BroadcastTransactions(transactions)
err := cm.BroadcastTransactions(transactions, pkBytes)
if err != nil {
log.Warn("commonMessenger.BroadcastBlockData: broadcast transactions", "error", err.Error())
}
Expand Down Expand Up @@ -223,3 +225,19 @@ func (cm *commonMessenger) extractMetaMiniBlocksAndTransactions(

return metaMiniBlocks, metaTransactions
}

func (cm *commonMessenger) broadcast(topic string, data []byte, pkBytes []byte) {
if cm.keysHandler.IsOriginalPublicKeyOfTheNode(pkBytes) {
cm.messenger.Broadcast(topic, data)
return
}

skBytes, pid, err := cm.keysHandler.GetP2PIdentity(pkBytes)
if err != nil {
log.Error("setup error in commonMessenger.broadcast - public key is managed but does not contain p2p sign info",
"pk", pkBytes, "error", err)
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking if it is better to call cm.messenger.Broadcast(topic, data) before return. Exactly as the approach used in getPrivateKey method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought about this. I think part of the messages will be discarded because it will broadcast data as an observer on topics on which data needs to be sent only by validators.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you need to broadcast on behalf of someone else, but you cannot do it since you don't have the required p2p data, so I think it is fine to simply return. Otherwise the message would anyway fail on verification and be dropped.

}

cm.messenger.BroadcastUsingPrivateKey(topic, data, pid, skBytes)
}