Skip to content

Commit

Permalink
Merge pull request #1871 from ElrondNetwork/make-sure-broadcast-happens
Browse files Browse the repository at this point in the history
Make sure broadcast happens
  • Loading branch information
LucianMincu committed Jun 21, 2020
2 parents de8d042 + 734ca3a commit ba8995b
Show file tree
Hide file tree
Showing 49 changed files with 3,504 additions and 503 deletions.
112 changes: 102 additions & 10 deletions consensus/broadcast/commonMessenger.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,61 @@
package broadcast

import (
"strings"
"time"

logger "github.com/ElrondNetwork/elrond-go-logger"
"github.com/ElrondNetwork/elrond-go/consensus"
"github.com/ElrondNetwork/elrond-go/consensus/spos"
"github.com/ElrondNetwork/elrond-go/core"
"github.com/ElrondNetwork/elrond-go/core/check"
"github.com/ElrondNetwork/elrond-go/core/partitioning"
"github.com/ElrondNetwork/elrond-go/crypto"
"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/hashing"
"github.com/ElrondNetwork/elrond-go/marshal"
"github.com/ElrondNetwork/elrond-go/process"
"github.com/ElrondNetwork/elrond-go/process/factory"
"github.com/ElrondNetwork/elrond-go/sharding"
)

var log = logger.GetOrCreate("consensus/broadcast")

// delayedBroadcaster exposes functionality for handling the consensus members broadcasting of delay data
type delayedBroadcaster interface {
SetLeaderData(data *delayedBroadcastData) error
SetValidatorData(data *delayedBroadcastData) error
SetHeaderForValidator(vData *validatorHeaderBroadcastData) error
SetBroadcastHandlers(
mbBroadcast func(mbData map[uint32][]byte) error,
txBroadcast func(txData map[string][][]byte) error,
headerBroadcast func(header data.HeaderHandler) error,
) error
Close()
}

type commonMessenger struct {
marshalizer marshal.Marshalizer
messenger consensus.P2PMessenger
privateKey crypto.PrivateKey
shardCoordinator sharding.Coordinator
singleSigner crypto.SingleSigner
marshalizer marshal.Marshalizer
hasher hashing.Hasher
messenger consensus.P2PMessenger
privateKey crypto.PrivateKey
shardCoordinator sharding.Coordinator
singleSigner crypto.SingleSigner
delayedBlockBroadcaster delayedBroadcaster
}

// CommonMessengerArgs holds the arguments for creating commonMessenger instance
type CommonMessengerArgs struct {
Marshalizer marshal.Marshalizer
Messenger consensus.P2PMessenger
PrivateKey crypto.PrivateKey
ShardCoordinator sharding.Coordinator
SingleSigner crypto.SingleSigner
Marshalizer marshal.Marshalizer
Hasher hashing.Hasher
Messenger consensus.P2PMessenger
PrivateKey crypto.PrivateKey
ShardCoordinator sharding.Coordinator
SingleSigner crypto.SingleSigner
HeadersSubscriber consensus.HeadersPoolSubscriber
InterceptorsContainer process.InterceptorsContainer
MaxDelayCacheSize uint32
MaxValidatorDelayCacheSize uint32
}

func checkCommonMessengerNilParameters(
Expand All @@ -38,6 +64,9 @@ func checkCommonMessengerNilParameters(
if check.IfNil(args.Marshalizer) {
return spos.ErrNilMarshalizer
}
if check.IfNil(args.Hasher) {
return spos.ErrNilHasher
}
if check.IfNil(args.Messenger) {
return spos.ErrNilMessenger
}
Expand All @@ -50,6 +79,15 @@ func checkCommonMessengerNilParameters(
if check.IfNil(args.SingleSigner) {
return spos.ErrNilSingleSigner
}
if check.IfNil(args.InterceptorsContainer) {
return spos.ErrNilInterceptorsContainer
}
if check.IfNil(args.HeadersSubscriber) {
return spos.ErrNilHeadersSubscriber
}
if args.MaxDelayCacheSize == 0 || args.MaxValidatorDelayCacheSize == 0 {
return spos.ErrInvalidCacheSize
}

return nil
}
Expand Down Expand Up @@ -138,3 +176,57 @@ func (cm *commonMessenger) BroadcastTransactions(transactions map[string][][]byt

return nil
}

// BroadcastBlockData broadcasts the miniblocks and transactions
func (cm *commonMessenger) BroadcastBlockData(
miniBlocks map[uint32][]byte,
transactions map[string][][]byte,
extraDelayForBroadcast time.Duration,
) {
time.Sleep(extraDelayForBroadcast)

if len(miniBlocks) > 0 {
err := cm.BroadcastMiniBlocks(miniBlocks)
if err != nil {
log.Warn("broadcast.BroadcastMiniBlocks", "error", err.Error())
}
}

if len(transactions) > 0 {
err := cm.BroadcastTransactions(transactions)
if err != nil {
log.Warn("broadcast.BroadcastTransactions", "error", err.Error())
}
}
}

func (cm *commonMessenger) extractMetaMiniBlocksAndTransactions(
miniBlocks map[uint32][]byte,
transactions map[string][][]byte,
) (map[uint32][]byte, map[string][][]byte) {

metaMiniBlocks := make(map[uint32][]byte, 0)
metaTransactions := make(map[string][][]byte, 0)

for shardID, mbsMarshalized := range miniBlocks {
if shardID != core.MetachainShardId {
continue
}

metaMiniBlocks[shardID] = mbsMarshalized
delete(miniBlocks, shardID)
}

identifier := cm.shardCoordinator.CommunicationIdentifier(core.MetachainShardId)

for broadcastTopic, txsMarshalized := range transactions {
if !strings.Contains(broadcastTopic, identifier) {
continue
}

metaTransactions[broadcastTopic] = txsMarshalized
delete(transactions, broadcastTopic)
}

return metaMiniBlocks, metaTransactions
}
107 changes: 107 additions & 0 deletions consensus/broadcast/commonMessenger_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package broadcast_test

import (
"sync"
"testing"
"time"

"github.com/ElrondNetwork/elrond-go/consensus"
"github.com/ElrondNetwork/elrond-go/consensus/broadcast"
"github.com/ElrondNetwork/elrond-go/consensus/mock"
"github.com/ElrondNetwork/elrond-go/core"
"github.com/ElrondNetwork/elrond-go/crypto"
"github.com/ElrondNetwork/elrond-go/data/block"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func newTestBlockBody() *block.Body {
Expand Down Expand Up @@ -126,3 +130,106 @@ func TestCommonMessenger_SignMessageShouldErrWhenSignFail(t *testing.T) {
_, err2 := cm.SignMessage(msg)
assert.Equal(t, err, err2)
}

func TestSubroundEndRound_ExtractMiniBlocksAndTransactionsShouldWork(t *testing.T) {
t.Parallel()

miniBlocks := make(map[uint32][]byte, 0)
transactions := make(map[string][][]byte, 0)

miniBlocks[1] = []byte("mbs_shard_1")
miniBlocks[core.MetachainShardId] = []byte("mbs_shard_meta")
miniBlocks[2] = []byte("mbs_shard_2")

transactions["transactions_0_1"] = [][]byte{
[]byte("tx1_shard_1"),
[]byte("tx2_shard_1"),
[]byte("tx3_shard_1"),
}

transactions["transactions_0_META"] = [][]byte{
[]byte("tx1_shard_meta"),
[]byte("tx2_shard_meta"),
[]byte("tx3_shard_meta"),
}

transactions["transactions_0_2"] = [][]byte{
[]byte("tx1_shard_2"),
[]byte("tx2_shard_2"),
[]byte("tx3_shard_2"),
}

marshalizerMock := &mock.MarshalizerMock{}
messengerMock := &mock.MessengerStub{
BroadcastCalled: func(topic string, buff []byte) {
},
}
privateKeyMock := &mock.PrivateKeyMock{}
shardCoordinatorMock := &mock.ShardCoordinatorMock{}
singleSignerMock := &mock.SingleSignerMock{
SignStub: func(private crypto.PrivateKey, msg []byte) ([]byte, error) {
return []byte(""), nil
},
}

cm, _ := broadcast.NewCommonMessenger(
marshalizerMock,
messengerMock,
privateKeyMock,
shardCoordinatorMock,
singleSignerMock,
)

metaMiniBlocks, metaTransactions := cm.ExtractMetaMiniBlocksAndTransactions(miniBlocks, transactions)

require.Equal(t, 2, len(miniBlocks))
require.Equal(t, 2, len(transactions))
require.Equal(t, 1, len(metaMiniBlocks))
require.Equal(t, 1, len(metaTransactions))

assert.Nil(t, miniBlocks[core.MetachainShardId])
assert.Nil(t, transactions["transactions_0_META"])
assert.NotNil(t, metaMiniBlocks[core.MetachainShardId])
assert.NotNil(t, metaTransactions["transactions_0_META"])
}

func TestCommonMessenger_BroadcastBlockData(t *testing.T) {
marshalizerMock := &mock.MarshalizerMock{}
countersBroadcast := make(map[string]int)
mutCounters := &sync.Mutex{}

messengerMock := &mock.MessengerStub{
BroadcastCalled: func(topic string, buff []byte) {
mutCounters.Lock()
countersBroadcast[topic]++
mutCounters.Unlock()
},
}
privateKeyMock := &mock.PrivateKeyMock{}
shardCoordinatorMock := &mock.ShardCoordinatorMock{}
singleSignerMock := &mock.SingleSignerMock{
SignStub: func(private crypto.PrivateKey, msg []byte) ([]byte, error) {
return []byte(""), nil
},
}

cm, _ := broadcast.NewCommonMessenger(
marshalizerMock,
messengerMock,
privateKeyMock,
shardCoordinatorMock,
singleSignerMock,
)

miniBlocks := map[uint32][]byte{0: []byte("mbs data1"), 1: []byte("mbs data2")}
transactions := map[string][][]byte{"topic1": {[]byte("txdata1"), []byte("txdata2")}, "topic2": {[]byte("txdata3")}}
delay := time.Millisecond * 10
cm.BroadcastBlockData(miniBlocks, transactions, delay)
time.Sleep(delay * 2)

mutCounters.Lock()
defer mutCounters.Unlock()

assert.Equal(t, len(miniBlocks), countersBroadcast["txBlockBodies_0"]+countersBroadcast["txBlockBodies_0_1"])
assert.Equal(t, len(transactions), countersBroadcast["topic1"]+countersBroadcast["topic2"])
}

0 comments on commit ba8995b

Please sign in to comment.