Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reject not whitelisted cross txs dest me #2031

Merged
merged 6 commits into from
Jun 27, 2020
2 changes: 1 addition & 1 deletion cmd/node/factory/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func ProcessComponentsFactory(args *processComponentsFactoryArgs) (*Process, err

txsPoolsCleaner.StartCleaning()

_, err = track.NewMiniBlockTrack(args.data.Datapool, args.shardCoordinator)
_, err = track.NewMiniBlockTrack(args.data.Datapool, args.shardCoordinator, args.whiteListHandler)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions consensus/broadcast/commonMessenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ func (cm *commonMessenger) BroadcastBlockData(
}
}

time.Sleep(core.ExtraDelayBetweenBroadcastMbsAndTxs)

if len(transactions) > 0 {
err := cm.BroadcastTransactions(transactions)
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions consensus/broadcast/delayedBroadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,13 +441,15 @@ func (dbb *delayedBlockBroadcaster) headerAlarmExpired(alarmID string) {
log.Debug("delayedBroadcast.headerAlarmExpired - validator broadcasting meta miniblocks and transactions",
"headerHash", headerHash,
)
dbb.broadcastBlockData(vHeader.metaMiniBlocksData, vHeader.metaTransactionsData, core.ExtraDelayForBroadcastBlockInfo)
go dbb.broadcastBlockData(vHeader.metaMiniBlocksData, vHeader.metaTransactionsData, core.ExtraDelayForBroadcastBlockInfo)
}
}

func (dbb *delayedBlockBroadcaster) broadcastDelayedData(broadcastData []*delayedBroadcastData) {
for _, bData := range broadcastData {
dbb.broadcastBlockData(bData.miniBlocksData, bData.transactions, 0)
go func(miniBlocks map[uint32][]byte, transactions map[string][][]byte) {
dbb.broadcastBlockData(miniBlocks, transactions, 0)
}(bData.miniBlocksData, bData.transactions)
}
}

Expand All @@ -463,6 +465,8 @@ func (dbb *delayedBlockBroadcaster) broadcastBlockData(
log.Error("broadcastBlockData miniblocks", "error", err.Error())
}

time.Sleep(core.ExtraDelayBetweenBroadcastMbsAndTxs)

err = dbb.broadcastTxsData(transactions)
if err != nil {
log.Error("broadcastBlockData transactions", "error", err.Error())
Expand Down
63 changes: 44 additions & 19 deletions consensus/broadcast/delayedBroadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,10 @@ func TestDelayedBlockBroadcaster_HeaderReceivedForRegisteredDelayedDataShouldBro
assert.False(t, txBroadcastCalled.IsSet())

dbb.HeaderReceived(metaBlock, []byte("meta hash"))
time.Sleep(core.ExtraDelayForBroadcastBlockInfo + 10*time.Millisecond)
sleepTime := core.ExtraDelayForBroadcastBlockInfo +
core.ExtraDelayBetweenBroadcastMbsAndTxs +
10*time.Millisecond
time.Sleep(sleepTime)
assert.True(t, mbBroadcastCalled.IsSet())
assert.True(t, txBroadcastCalled.IsSet())
}
Expand Down Expand Up @@ -313,7 +316,10 @@ func TestDelayedBlockBroadcaster_HeaderReceivedForNextRegisteredDelayedDataShoul
metaBlock.ShardInfo[0].HeaderHash = headerHash2

dbb.HeaderReceived(metaBlock, []byte("meta hash"))
time.Sleep(core.ExtraDelayForBroadcastBlockInfo + 10*time.Millisecond)
sleepTime := core.ExtraDelayForBroadcastBlockInfo +
core.ExtraDelayBetweenBroadcastMbsAndTxs +
10*time.Millisecond
time.Sleep(sleepTime)
assert.Equal(t, int64(2), mbBroadcastCalled.Get())
assert.Equal(t, int64(2), txBroadcastCalled.Get())

Expand Down Expand Up @@ -430,7 +436,8 @@ func TestDelayedBlockBroadcaster_SetHeaderForValidatorShouldSetAlarmAndBroadcast
require.Equal(t, int64(0), mbBroadcastCalled.Get())
require.Equal(t, int64(0), txBroadcastCalled.Get())

sleepTime := broadcast.ValidatorDelayPerOrder()*time.Duration(vArgs.order) + time.Millisecond*100
sleepTime := broadcast.ValidatorDelayPerOrder()*time.Duration(vArgs.order) +
time.Millisecond*100
time.Sleep(sleepTime)

// alarm expired and sent header
Expand Down Expand Up @@ -493,6 +500,7 @@ func TestDelayedBlockBroadcaster_SetValidatorDataFinalizedMetaHeaderShouldSetAla

sleepTime := broadcast.ValidatorDelayPerOrder()*time.Duration(vArgs.order) +
core.ExtraDelayForBroadcastBlockInfo +
core.ExtraDelayBetweenBroadcastMbsAndTxs +
time.Millisecond*100
time.Sleep(sleepTime)

Expand Down Expand Up @@ -555,7 +563,8 @@ func TestDelayedBlockBroadcaster_InterceptedHeaderShouldCancelAlarm(t *testing.T
require.Equal(t, int64(0), mbBroadcastCalled.Get())
require.Equal(t, int64(0), txBroadcastCalled.Get())

sleepTime := broadcast.ValidatorDelayPerOrder()*time.Duration(vArgs.order) + time.Second
sleepTime := broadcast.ValidatorDelayPerOrder()*time.Duration(vArgs.order) +
time.Second
// should cancel alarm
dbb.InterceptedHeaderData("headerTopic", vArgs.headerHash, vArgs.header)
time.Sleep(sleepTime)
Expand Down Expand Up @@ -632,6 +641,7 @@ func TestDelayedBlockBroadcaster_InterceptedHeaderInvalidOrDifferentShouldIgnore
dbb.InterceptedMiniBlockData("headerTopic", headerHash, invalidHeader)
sleepTime := broadcast.ValidatorDelayPerOrder()*time.Duration(vArgs.order) +
core.ExtraDelayForBroadcastBlockInfo +
core.ExtraDelayBetweenBroadcastMbsAndTxs +
time.Millisecond*100
time.Sleep(sleepTime)

Expand Down Expand Up @@ -724,8 +734,9 @@ func TestDelayedBlockBroadcaster_ScheduleValidatorBroadcastDifferentHeaderRoundS
}

dbb.ScheduleValidatorBroadcast([]*broadcast.HeaderDataForValidator{hdfv})
timeToWait := time.Duration(vArgs.order) * broadcast.ValidatorDelayPerOrder()
time.Sleep(timeToWait + 100*time.Millisecond)
sleepTime := time.Duration(vArgs.order)*broadcast.ValidatorDelayPerOrder() +
100*time.Millisecond
time.Sleep(sleepTime)

// check there was no broadcast and validator delay data still present
require.Equal(t, int64(0), mbBroadcastCalled.Get())
Expand Down Expand Up @@ -783,8 +794,9 @@ func TestDelayedBlockBroadcaster_ScheduleValidatorBroadcastDifferentPrevRandShou
}

dbb.ScheduleValidatorBroadcast([]*broadcast.HeaderDataForValidator{hdfv})
timeToWait := time.Duration(vArgs.order) * broadcast.ValidatorDelayPerOrder()
time.Sleep(timeToWait + 100*time.Millisecond)
sleepTime := time.Duration(vArgs.order)*broadcast.ValidatorDelayPerOrder() +
100*time.Millisecond
time.Sleep(sleepTime)

// check there was no broadcast and validator delay data still present
require.Equal(t, int64(0), mbBroadcastCalled.Get())
Expand Down Expand Up @@ -839,8 +851,11 @@ func TestDelayedBlockBroadcaster_ScheduleValidatorBroadcastSameRoundAndPrevRandS
}

dbb.ScheduleValidatorBroadcast([]*broadcast.HeaderDataForValidator{hdfv})
timeToWait := time.Duration(vArgs.order)*broadcast.ValidatorDelayPerOrder() + core.ExtraDelayForBroadcastBlockInfo
time.Sleep(timeToWait + 100*time.Millisecond)
sleepTime := time.Duration(vArgs.order)*broadcast.ValidatorDelayPerOrder() +
core.ExtraDelayForBroadcastBlockInfo +
core.ExtraDelayBetweenBroadcastMbsAndTxs +
100*time.Millisecond
time.Sleep(sleepTime)

// check there was a broadcast and validator delay data empty
require.Equal(t, int64(1), mbBroadcastCalled.Get())
Expand Down Expand Up @@ -891,7 +906,9 @@ func TestDelayedBlockBroadcaster_AlarmExpiredShouldBroadcastTheDataForRegistered
require.Equal(t, 1, len(vbd))

dbb.AlarmExpired(hex.EncodeToString(vArgs.headerHash))
time.Sleep(time.Millisecond * 100)
sleepTime := core.ExtraDelayBetweenBroadcastMbsAndTxs +
time.Millisecond*100
time.Sleep(sleepTime)

// check there was a broadcast and validator delay data empty
require.Equal(t, int64(1), mbBroadcastCalled.Get())
Expand Down Expand Up @@ -1097,8 +1114,11 @@ func TestDelayedBlockBroadcaster_InterceptedMiniBlockForNotSetValDataShouldBroad
require.Equal(t, 1, len(vbd))

dbb.ScheduleValidatorBroadcast([]*broadcast.HeaderDataForValidator{hdfv})
timeToWait := time.Duration(vArgs.order)*broadcast.ValidatorDelayPerOrder() + core.ExtraDelayForBroadcastBlockInfo
time.Sleep(timeToWait + 100*time.Millisecond)
sleepTime := time.Duration(vArgs.order)*broadcast.ValidatorDelayPerOrder() +
core.ExtraDelayForBroadcastBlockInfo +
core.ExtraDelayBetweenBroadcastMbsAndTxs +
100*time.Millisecond
time.Sleep(sleepTime)

// check there was a broadcast and validator delay data empty
require.Equal(t, int64(1), mbBroadcastCalled.Get())
Expand Down Expand Up @@ -1158,8 +1178,11 @@ func TestDelayedBlockBroadcaster_InterceptedMiniBlockOutOfManyForSetValDataShoul

dbb.ScheduleValidatorBroadcast([]*broadcast.HeaderDataForValidator{hdfv})
dbb.InterceptedMiniBlockData("txBlockBodies_0_"+strconv.Itoa(destShardID), miniBlockHashToNotify, &block.MiniBlock{})
timeToWait := time.Duration(vArgs.order)*broadcast.ValidatorDelayPerOrder() + core.ExtraDelayForBroadcastBlockInfo
time.Sleep(timeToWait + 100*time.Millisecond)
sleepTime := time.Duration(vArgs.order)*broadcast.ValidatorDelayPerOrder() +
core.ExtraDelayForBroadcastBlockInfo +
core.ExtraDelayBetweenBroadcastMbsAndTxs +
100*time.Millisecond
time.Sleep(sleepTime)

// check there was a broadcast and validator delay data empty
require.Equal(t, int64(1), mbBroadcastCalled.Get())
Expand Down Expand Up @@ -1221,8 +1244,9 @@ func TestDelayedBlockBroadcaster_InterceptedMiniBlockFinalForSetValDataShouldNot
dbb.InterceptedMiniBlockData(destShardID, hash, &block.MiniBlock{})
}
}
timeToWait := time.Duration(vArgs.order) * broadcast.ValidatorDelayPerOrder()
time.Sleep(timeToWait + 100*time.Millisecond)
sleepTime := time.Duration(vArgs.order)*broadcast.ValidatorDelayPerOrder() +
100*time.Millisecond
time.Sleep(sleepTime)

// check there was no broadcast and validator delay data empty
require.Equal(t, int64(0), mbBroadcastCalled.Get())
Expand Down Expand Up @@ -1280,8 +1304,9 @@ func TestDelayedBlockBroadcaster_Close(t *testing.T) {
dbb.ScheduleValidatorBroadcast([]*broadcast.HeaderDataForValidator{hdfv})
dbb.Close()

timeToWait := time.Duration(vArgs.order) * broadcast.ValidatorDelayPerOrder()
time.Sleep(timeToWait + 100*time.Millisecond)
sleepTime := time.Duration(vArgs.order)*broadcast.ValidatorDelayPerOrder() +
100*time.Millisecond
time.Sleep(sleepTime)

// check there was no broadcast
require.Equal(t, int64(0), mbBroadcastCalled.Get())
Expand Down
5 changes: 4 additions & 1 deletion consensus/broadcast/metaChainMessenger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,10 @@ func TestMetaChainMessenger_BroadcastBlockDataLeader(t *testing.T) {

err := mcm.BroadcastBlockDataLeader(nil, miniBlocks, transactions)
require.Nil(t, err)
time.Sleep(core.ExtraDelayForBroadcastBlockInfo + time.Millisecond*100)
sleepTime := core.ExtraDelayBetweenBroadcastMbsAndTxs +
core.ExtraDelayForBroadcastBlockInfo +
time.Millisecond*100
time.Sleep(sleepTime)

mutCounters.Lock()
defer mutCounters.Unlock()
Expand Down
3 changes: 2 additions & 1 deletion consensus/spos/bls/subroundEndRound.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,8 @@ func (sr *subroundEndRound) broadcastBlockDataLeader() error {
func (sr *subroundEndRound) setHeaderForValidator(header data.HeaderHandler) error {
idx, err := sr.SelfConsensusGroupIndex()
if err != nil {
return err
log.Trace("setHeaderForValidator", "error", err.Error())
return nil
}

// todo: avoid calling MarshalizeDataToBroadcast twice for validators
Expand Down
6 changes: 5 additions & 1 deletion core/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,9 +439,13 @@ const MaxSoftwareVersionLengthInBytes = 10
// moment when its components, like mini blocks and transactions, would be broadcast too
const ExtraDelayForBroadcastBlockInfo = 1 * time.Second

// ExtraDelayBetweenBroadcastMbsAndTxs represents the number of seconds to wait since miniblocks have been broadcast
// and the moment when theirs transactions would be broadcast too
const ExtraDelayBetweenBroadcastMbsAndTxs = 1 * time.Second

// ExtraDelayForRequestBlockInfo represents the number of seconds to wait since a block has been received and the
// moment when its components, like mini blocks and transactions, would be requested too if they are still missing
const ExtraDelayForRequestBlockInfo = 2 * time.Second
const ExtraDelayForRequestBlockInfo = ExtraDelayForBroadcastBlockInfo + ExtraDelayBetweenBroadcastMbsAndTxs + time.Second

// CommitMaxTime represents max time accepted for a commit action, after which a warn message is displayed
const CommitMaxTime = 3 * time.Second
Expand Down
6 changes: 1 addition & 5 deletions epochStart/bootstrap/epochStartMetaBlockProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,7 @@ func (e *epochStartMetaBlockProcessor) Save(data process.InterceptedData, fromCo
return nil
}

mbHash, err := core.CalculateHash(e.marshalizer, e.hasher, metaBlock)
if err != nil {
log.Warn("saving epoch start meta block error", "error", err)
return nil
}
mbHash := interceptedHdr.Hash()

log.Debug("received epoch start meta", "epoch", metaBlock.GetEpoch(), "from peer", fromConnectedPeer.Pretty())
e.mutReceivedMetaBlocks.Lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/ElrondNetwork/elrond-go/crypto"
"github.com/ElrondNetwork/elrond-go/data/transaction"
"github.com/ElrondNetwork/elrond-go/dataRetriever/resolvers"
"github.com/ElrondNetwork/elrond-go/integrationTests"
"github.com/ElrondNetwork/elrond-go/process/factory"
Expand Down Expand Up @@ -64,7 +65,13 @@ func TestNode_InterceptorBulkTxsSentFromSameShardShouldRemainInSenderShard(t *te
senderPrivateKeys := []crypto.PrivateKey{nodes[idxSender].OwnAccount.SkTxSign}
integrationTests.CreateMintingForSenders(nodes, shardId, senderPrivateKeys, balanceValue)

_ = nodes[idxSender].Node.GenerateAndSendBulkTransactions(addrInShardFive, transactionValue, uint64(txToSend), nodes[idxSender].OwnAccount.SkTxSign)
_ = nodes[idxSender].Node.GenerateAndSendBulkTransactions(
addrInShardFive,
transactionValue,
uint64(txToSend),
nodes[idxSender].OwnAccount.SkTxSign,
nil,
)

time.Sleep(time.Second * 10)

Expand Down Expand Up @@ -142,7 +149,13 @@ func TestNode_InterceptorBulkTxsSentFromOtherShardShouldBeRoutedInSenderShard(t
senderPrivateKeys := []crypto.PrivateKey{nodes[idxSender].OwnAccount.SkTxSign}
integrationTests.CreateMintingForSenders(nodes, shardId, senderPrivateKeys, mintingValue)

_ = nodes[idxSender].Node.GenerateAndSendBulkTransactions(addrInShardFive, txValue, uint64(txToSend), nodes[idxSender].OwnAccount.SkTxSign)
_ = nodes[idxSender].Node.GenerateAndSendBulkTransactions(
addrInShardFive,
txValue,
uint64(txToSend),
nodes[idxSender].OwnAccount.SkTxSign,
nil,
)

//display, can be removed
for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -244,7 +257,17 @@ func TestNode_InterceptorBulkTxsSentFromOtherShardShouldBeRoutedInSenderShardAnd
senderPrivateKeys := []crypto.PrivateKey{nodes[idxSender].OwnAccount.SkTxSign}
integrationTests.CreateMintingForSenders(nodes, shardId, senderPrivateKeys, mintingValue)

_ = nodes[0].Node.GenerateAndSendBulkTransactions(addrInShardFive, txValue, uint64(txToSend), nodes[0].OwnAccount.SkTxSign)
whiteListTxs := func(txs []*transaction.Transaction) {
integrationTests.WhiteListTxs(nodes, txs)
}

_ = nodes[0].Node.GenerateAndSendBulkTransactions(
addrInShardFive,
txValue,
uint64(txToSend),
nodes[0].OwnAccount.SkTxSign,
whiteListTxs,
)

fmt.Println("Waiting for senders to fetch generated transactions...")
time.Sleep(time.Second * 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func TestRoutingOfTransactionsInShards(t *testing.T) {
txs := generateTransactionsInAllConfigurations(nodes, uint32(numOfShards))
require.Equal(t, numOfShards*numOfShards, len(txs))

integrationTests.WhiteListTxs(nodes, txs)

dispatchNode := getNodeOnShard(uint32(i), nodes)

_, err := dispatchNode.Node.SendBulkTransactions(txs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func TestNode_GenerateSendInterceptBulkTransactionsWithMessenger(t *testing.T) {
big.NewInt(1),
uint64(noOfTx),
n.OwnAccount.SkTxSign,
nil,
)

assert.Nil(t, err)
Expand Down
24 changes: 24 additions & 0 deletions integrationTests/testInitializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2115,3 +2115,27 @@ func AddSelfNotarizedHeaderByMetachain(nodes []*TestProcessorNode) {
n.BlockTracker.AddSelfNotarizedHeader(core.MetachainShardId, header, nil)
}
}

// WhiteListTxs -
func WhiteListTxs(nodes []*TestProcessorNode, txs []*transaction.Transaction) {
txHashes := make([][]byte, 0)
for _, tx := range txs {
txHash, err := core.CalculateHash(TestMarshalizer, TestHasher, tx)
if err != nil {
return
}

txHashes = append(txHashes, txHash)
}

for _, n := range nodes {
for index, txHash := range txHashes {
senderShardID := n.ShardCoordinator.ComputeId(txs[index].SndAddr)
receiverShardID := n.ShardCoordinator.ComputeId(txs[index].RcvAddr)
if senderShardID == n.ShardCoordinator.SelfId() ||
receiverShardID == n.ShardCoordinator.SelfId() {
n.WhiteListHandler.Add([][]byte{txHash})
}
}
}
}
6 changes: 4 additions & 2 deletions integrationTests/testProcessorNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1586,19 +1586,21 @@ func (tpn *TestProcessorNode) WhiteListBody(nodes []*TestProcessorNode, bodyHand
}

mbHashes := make([][]byte, 0)
txHashes := make([][]byte, 0)
for _, miniBlock := range body.MiniBlocks {
mbMarshalized, err := TestMarshalizer.Marshal(miniBlock)
mbHash, err := core.CalculateHash(TestMarshalizer, TestHasher, miniBlock)
if err != nil {
continue
}

mbHash := TestHasher.Compute(string(mbMarshalized))
mbHashes = append(mbHashes, mbHash)
txHashes = append(txHashes, miniBlock.TxHashes...)
}

if len(mbHashes) > 0 {
for _, n := range nodes {
n.WhiteListHandler.Add(mbHashes)
n.WhiteListHandler.Add(txHashes)
}
}
}
Expand Down