Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure broadcast happens #1871

Merged
merged 26 commits into from
Jun 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d53def6
core: alarm implementation
AdoAdoAdo May 28, 2020
1b80cad
core: add unit tests for alarm
AdoAdoAdo May 28, 2020
692f10a
consensus, process, core, node: add validator delayed broadcast when …
AdoAdoAdo May 29, 2020
91145fb
consensus, integrationTests: fix unit tests
AdoAdoAdo Jun 1, 2020
db0ae3e
Merge branch 'development' into make-sure-broadcast-happens
AdoAdoAdo Jun 1, 2020
5fea955
fix validator broadcast preparation and integratin tests
AdoAdoAdo Jun 1, 2020
6516122
consensus: unit tests and fixes
AdoAdoAdo Jun 3, 2020
2291c59
consensus: add more unit tests for validator broadcast delay data
AdoAdoAdo Jun 4, 2020
624500d
consensus: refactor delayed broadcast into subcomponent
AdoAdoAdo Jun 5, 2020
0ad97e9
consensus: fix shardChainMessenger init
AdoAdoAdo Jun 5, 2020
a60da3b
consensus: fix race in tests
AdoAdoAdo Jun 5, 2020
f3a1bea
Merge branch 'development' into make-sure-broadcast-happens
AdoAdoAdo Jun 5, 2020
edb0718
process: add topic on interceptor processor save
AdoAdoAdo Jun 10, 2020
3473b01
consesus: add validator header broadcast as fallback
AdoAdoAdo Jun 10, 2020
ed6d31c
consensus, integrationTests: fix unit/integration tests
AdoAdoAdo Jun 11, 2020
71dd083
consensus: add more unit tests
AdoAdoAdo Jun 12, 2020
e9689e4
Merge branch 'development' into make-sure-broadcast-happens
AdoAdoAdo Jun 15, 2020
48b1160
fix after merge
AdoAdoAdo Jun 15, 2020
10aa71e
consensus: fixes after system test
AdoAdoAdo Jun 16, 2020
7ba309d
Merge branch 'development' into make-sure-broadcast-happens
AdoAdoAdo Jun 16, 2020
ccb8b01
consensus, process: set alarm for header broadcast only once
AdoAdoAdo Jun 16, 2020
9222365
consensus: fixes for validator fallback broadcast
AdoAdoAdo Jun 17, 2020
b25e2ba
consensus: fix after review
AdoAdoAdo Jun 18, 2020
6688c39
consensus: fix index out of bounds
AdoAdoAdo Jun 18, 2020
1795815
Merge branch 'development' into make-sure-broadcast-happens
AdoAdoAdo Jun 18, 2020
734ca3a
Merge branch 'development' into make-sure-broadcast-happens
LucianMincu Jun 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
112 changes: 102 additions & 10 deletions consensus/broadcast/commonMessenger.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,61 @@
package broadcast

import (
"strings"
"time"

logger "github.com/ElrondNetwork/elrond-go-logger"
"github.com/ElrondNetwork/elrond-go/consensus"
"github.com/ElrondNetwork/elrond-go/consensus/spos"
"github.com/ElrondNetwork/elrond-go/core"
"github.com/ElrondNetwork/elrond-go/core/check"
"github.com/ElrondNetwork/elrond-go/core/partitioning"
"github.com/ElrondNetwork/elrond-go/crypto"
"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/hashing"
"github.com/ElrondNetwork/elrond-go/marshal"
"github.com/ElrondNetwork/elrond-go/process"
"github.com/ElrondNetwork/elrond-go/process/factory"
"github.com/ElrondNetwork/elrond-go/sharding"
)

var log = logger.GetOrCreate("consensus/broadcast")

// delayedBroadcaster exposes functionality for handling the consensus members broadcasting of delay data
type delayedBroadcaster interface {
SetLeaderData(data *delayedBroadcastData) error
SetValidatorData(data *delayedBroadcastData) error
SetHeaderForValidator(vData *validatorHeaderBroadcastData) error
SetBroadcastHandlers(
mbBroadcast func(mbData map[uint32][]byte) error,
txBroadcast func(txData map[string][][]byte) error,
headerBroadcast func(header data.HeaderHandler) error,
) error
Close()
}

type commonMessenger struct {
marshalizer marshal.Marshalizer
messenger consensus.P2PMessenger
privateKey crypto.PrivateKey
shardCoordinator sharding.Coordinator
singleSigner crypto.SingleSigner
marshalizer marshal.Marshalizer
hasher hashing.Hasher
messenger consensus.P2PMessenger
privateKey crypto.PrivateKey
shardCoordinator sharding.Coordinator
singleSigner crypto.SingleSigner
delayedBlockBroadcaster delayedBroadcaster
}

// CommonMessengerArgs holds the arguments for creating commonMessenger instance
type CommonMessengerArgs struct {
Marshalizer marshal.Marshalizer
Messenger consensus.P2PMessenger
PrivateKey crypto.PrivateKey
ShardCoordinator sharding.Coordinator
SingleSigner crypto.SingleSigner
Marshalizer marshal.Marshalizer
Hasher hashing.Hasher
Messenger consensus.P2PMessenger
PrivateKey crypto.PrivateKey
ShardCoordinator sharding.Coordinator
SingleSigner crypto.SingleSigner
HeadersSubscriber consensus.HeadersPoolSubscriber
InterceptorsContainer process.InterceptorsContainer
MaxDelayCacheSize uint32
MaxValidatorDelayCacheSize uint32
}

func checkCommonMessengerNilParameters(
Expand All @@ -38,6 +64,9 @@ func checkCommonMessengerNilParameters(
if check.IfNil(args.Marshalizer) {
return spos.ErrNilMarshalizer
}
if check.IfNil(args.Hasher) {
return spos.ErrNilHasher
}
if check.IfNil(args.Messenger) {
return spos.ErrNilMessenger
}
Expand All @@ -50,6 +79,15 @@ func checkCommonMessengerNilParameters(
if check.IfNil(args.SingleSigner) {
return spos.ErrNilSingleSigner
}
if check.IfNil(args.InterceptorsContainer) {
return spos.ErrNilInterceptorsContainer
}
if check.IfNil(args.HeadersSubscriber) {
return spos.ErrNilHeadersSubscriber
}
if args.MaxDelayCacheSize == 0 || args.MaxValidatorDelayCacheSize == 0 {
return spos.ErrInvalidCacheSize
}

return nil
}
Expand Down Expand Up @@ -138,3 +176,57 @@ func (cm *commonMessenger) BroadcastTransactions(transactions map[string][][]byt

return nil
}

// BroadcastBlockData broadcasts the miniblocks and transactions
func (cm *commonMessenger) BroadcastBlockData(
miniBlocks map[uint32][]byte,
transactions map[string][][]byte,
extraDelayForBroadcast time.Duration,
) {
time.Sleep(extraDelayForBroadcast)

if len(miniBlocks) > 0 {
err := cm.BroadcastMiniBlocks(miniBlocks)
if err != nil {
log.Warn("broadcast.BroadcastMiniBlocks", "error", err.Error())
}
}

if len(transactions) > 0 {
err := cm.BroadcastTransactions(transactions)
if err != nil {
log.Warn("broadcast.BroadcastTransactions", "error", err.Error())
}
}
}

func (cm *commonMessenger) extractMetaMiniBlocksAndTransactions(
miniBlocks map[uint32][]byte,
transactions map[string][][]byte,
) (map[uint32][]byte, map[string][][]byte) {

metaMiniBlocks := make(map[uint32][]byte, 0)
metaTransactions := make(map[string][][]byte, 0)

for shardID, mbsMarshalized := range miniBlocks {
if shardID != core.MetachainShardId {
continue
}

metaMiniBlocks[shardID] = mbsMarshalized
delete(miniBlocks, shardID)
}

identifier := cm.shardCoordinator.CommunicationIdentifier(core.MetachainShardId)

for broadcastTopic, txsMarshalized := range transactions {
if !strings.Contains(broadcastTopic, identifier) {
continue
}

metaTransactions[broadcastTopic] = txsMarshalized
delete(transactions, broadcastTopic)
}

return metaMiniBlocks, metaTransactions
}
107 changes: 107 additions & 0 deletions consensus/broadcast/commonMessenger_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package broadcast_test

import (
"sync"
"testing"
"time"

"github.com/ElrondNetwork/elrond-go/consensus"
"github.com/ElrondNetwork/elrond-go/consensus/broadcast"
"github.com/ElrondNetwork/elrond-go/consensus/mock"
"github.com/ElrondNetwork/elrond-go/core"
"github.com/ElrondNetwork/elrond-go/crypto"
"github.com/ElrondNetwork/elrond-go/data/block"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func newTestBlockBody() *block.Body {
Expand Down Expand Up @@ -126,3 +130,106 @@ func TestCommonMessenger_SignMessageShouldErrWhenSignFail(t *testing.T) {
_, err2 := cm.SignMessage(msg)
assert.Equal(t, err, err2)
}

func TestSubroundEndRound_ExtractMiniBlocksAndTransactionsShouldWork(t *testing.T) {
t.Parallel()

miniBlocks := make(map[uint32][]byte, 0)
transactions := make(map[string][][]byte, 0)

miniBlocks[1] = []byte("mbs_shard_1")
miniBlocks[core.MetachainShardId] = []byte("mbs_shard_meta")
miniBlocks[2] = []byte("mbs_shard_2")

transactions["transactions_0_1"] = [][]byte{
[]byte("tx1_shard_1"),
[]byte("tx2_shard_1"),
[]byte("tx3_shard_1"),
}

transactions["transactions_0_META"] = [][]byte{
[]byte("tx1_shard_meta"),
[]byte("tx2_shard_meta"),
[]byte("tx3_shard_meta"),
}

transactions["transactions_0_2"] = [][]byte{
[]byte("tx1_shard_2"),
[]byte("tx2_shard_2"),
[]byte("tx3_shard_2"),
}

marshalizerMock := &mock.MarshalizerMock{}
messengerMock := &mock.MessengerStub{
BroadcastCalled: func(topic string, buff []byte) {
},
}
privateKeyMock := &mock.PrivateKeyMock{}
shardCoordinatorMock := &mock.ShardCoordinatorMock{}
singleSignerMock := &mock.SingleSignerMock{
SignStub: func(private crypto.PrivateKey, msg []byte) ([]byte, error) {
return []byte(""), nil
},
}

cm, _ := broadcast.NewCommonMessenger(
marshalizerMock,
messengerMock,
privateKeyMock,
shardCoordinatorMock,
singleSignerMock,
)

metaMiniBlocks, metaTransactions := cm.ExtractMetaMiniBlocksAndTransactions(miniBlocks, transactions)

require.Equal(t, 2, len(miniBlocks))
require.Equal(t, 2, len(transactions))
require.Equal(t, 1, len(metaMiniBlocks))
require.Equal(t, 1, len(metaTransactions))

assert.Nil(t, miniBlocks[core.MetachainShardId])
assert.Nil(t, transactions["transactions_0_META"])
assert.NotNil(t, metaMiniBlocks[core.MetachainShardId])
assert.NotNil(t, metaTransactions["transactions_0_META"])
}

func TestCommonMessenger_BroadcastBlockData(t *testing.T) {
marshalizerMock := &mock.MarshalizerMock{}
countersBroadcast := make(map[string]int)
mutCounters := &sync.Mutex{}

messengerMock := &mock.MessengerStub{
BroadcastCalled: func(topic string, buff []byte) {
mutCounters.Lock()
countersBroadcast[topic]++
mutCounters.Unlock()
},
}
privateKeyMock := &mock.PrivateKeyMock{}
shardCoordinatorMock := &mock.ShardCoordinatorMock{}
singleSignerMock := &mock.SingleSignerMock{
SignStub: func(private crypto.PrivateKey, msg []byte) ([]byte, error) {
return []byte(""), nil
},
}

cm, _ := broadcast.NewCommonMessenger(
marshalizerMock,
messengerMock,
privateKeyMock,
shardCoordinatorMock,
singleSignerMock,
)

miniBlocks := map[uint32][]byte{0: []byte("mbs data1"), 1: []byte("mbs data2")}
transactions := map[string][][]byte{"topic1": {[]byte("txdata1"), []byte("txdata2")}, "topic2": {[]byte("txdata3")}}
delay := time.Millisecond * 10
cm.BroadcastBlockData(miniBlocks, transactions, delay)
time.Sleep(delay * 2)

mutCounters.Lock()
defer mutCounters.Unlock()

assert.Equal(t, len(miniBlocks), countersBroadcast["txBlockBodies_0"]+countersBroadcast["txBlockBodies_0_1"])
assert.Equal(t, len(transactions), countersBroadcast["topic1"]+countersBroadcast["topic2"])
}