Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve requesting of missing mini blocks dest me #5176

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
8386818
* Improved requesting for missing mini blocks with destination in sel…
SebastianMarian Apr 19, 2023
0fe5f12
* Fixed race condition in unit test
SebastianMarian Apr 19, 2023
e9c70a3
Merge branch 'rc/v1.6.0' into improve-requesting-of-missing-miniblock…
SebastianMarian Apr 19, 2023
6cee2bf
* Changed log level to Debug
SebastianMarian Apr 21, 2023
80506b2
Merge remote-tracking branch 'origin/improve-requesting-of-missing-mi…
SebastianMarian Apr 21, 2023
b6b7a79
* Added requested mini blocks hashes into requestedItemsHandler to be…
SebastianMarian Apr 21, 2023
58471d7
* Refactored and added log in transactionCoordinator.RequestMiniBlock…
SebastianMarian Apr 21, 2023
6a58519
Merge branch 'rc/v1.6.0' into improve-requesting-of-missing-miniblock…
SebastianMarian Apr 24, 2023
25b4247
Merge branch 'rc/v1.6.0' into improve-requesting-of-missing-miniblock…
SebastianMarian Apr 26, 2023
df85f69
* Fixed after review
SebastianMarian Apr 28, 2023
c8d11af
Merge remote-tracking branch 'origin/improve-requesting-of-missing-mi…
SebastianMarian Apr 28, 2023
3be0203
Merge branch 'rc/v1.6.0' into improve-requesting-of-missing-miniblock…
SebastianMarian Apr 28, 2023
fb2d97d
Merge branch 'rc/v1.6.0' into improve-requesting-of-missing-miniblock…
iulianpascalau Oct 3, 2023
c2221e4
Merge branch 'rc/v1.6.0' into improve-requesting-of-missing-miniblock…
iulianpascalau Oct 3, 2023
de19299
Merge branch 'rc/v1.6.0' into improve-requesting-of-missing-miniblock…
iulianpascalau Oct 11, 2023
fcb9359
Merge branch 'rc/v1.6.0' into improve-requesting-of-missing-miniblock…
iulianpascalau Oct 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 11 additions & 9 deletions process/block/shardblock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2760,15 +2760,17 @@ func TestShardProcessor_ReceivedMetaBlockShouldRequestMissingMiniBlocks(t *testi
miniBlockHash3Requested := int32(0)

requestHandler := &testscommon.RequestHandlerStub{
RequestMiniBlockHandlerCalled: func(destShardID uint32, miniblockHash []byte) {
if bytes.Equal(miniBlockHash1, miniblockHash) {
atomic.AddInt32(&miniBlockHash1Requested, 1)
}
if bytes.Equal(miniBlockHash2, miniblockHash) {
atomic.AddInt32(&miniBlockHash2Requested, 1)
}
if bytes.Equal(miniBlockHash3, miniblockHash) {
atomic.AddInt32(&miniBlockHash3Requested, 1)
RequestMiniBlocksHandlerCalled: func(destShardID uint32, miniblocksHashes [][]byte) {
for _, mbHash := range miniblocksHashes {
if bytes.Equal(miniBlockHash1, mbHash) {
atomic.AddInt32(&miniBlockHash1Requested, 1)
}
if bytes.Equal(miniBlockHash2, mbHash) {
atomic.AddInt32(&miniBlockHash2Requested, 1)
}
if bytes.Equal(miniBlockHash3, mbHash) {
atomic.AddInt32(&miniBlockHash3Requested, 1)
}
}
},
}
Expand Down
46 changes: 40 additions & 6 deletions process/coordinator/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type transactionCoordinator struct {
mutRequestedTxs sync.RWMutex
requestedTxs map[block.Type]int

onRequestMiniBlock func(shardId uint32, mbHash []byte)
onRequestMiniBlocks func(shardId uint32, mbHashes [][]byte)
gasHandler process.GasHandler
feeHandler process.TransactionFeeHandler
blockSizeComputation preprocess.BlockSizeComputationHandler
Expand Down Expand Up @@ -126,7 +126,7 @@ func NewTransactionCoordinator(args ArgTransactionCoordinator) (*transactionCoor
}

tc.miniBlockPool = args.MiniBlockPool
tc.onRequestMiniBlock = args.RequestHandler.RequestMiniBlock
tc.onRequestMiniBlocks = args.RequestHandler.RequestMiniBlocks
tc.requestedTxs = make(map[block.Type]int)
tc.txPreProcessors = make(map[block.Type]process.PreProcessor)
tc.interimProcessors = make(map[block.Type]process.IntermediateTransactionHandler)
Expand Down Expand Up @@ -604,6 +604,8 @@ func (tc *transactionCoordinator) CreateMbsAndProcessCrossShardTransactionsDstMe
"total gas penalized", tc.gasHandler.TotalGasPenalized())
}()

tc.requestMissingMiniBlocks(finalCrossMiniBlockInfos)

for _, miniBlockInfo := range finalCrossMiniBlockInfos {
if !haveTime() && !haveAdditionalTime() {
log.Debug("CreateMbsAndProcessCrossShardTransactionsDstMe",
Expand Down Expand Up @@ -643,7 +645,6 @@ func (tc *transactionCoordinator) CreateMbsAndProcessCrossShardTransactionsDstMe

miniVal, _ := tc.miniBlockPool.Peek(miniBlockInfo.Hash)
if miniVal == nil {
go tc.onRequestMiniBlock(miniBlockInfo.SenderShardID, miniBlockInfo.Hash)
shouldSkipShard[miniBlockInfo.SenderShardID] = true
log.Trace("transactionCoordinator.CreateMbsAndProcessCrossShardTransactionsDstMe: mini block not found and was requested",
"scheduled mode", scheduledMode,
Expand Down Expand Up @@ -748,6 +749,29 @@ func (tc *transactionCoordinator) CreateMbsAndProcessCrossShardTransactionsDstMe
return createMBDestMeExecutionInfo.miniBlocks, createMBDestMeExecutionInfo.numTxAdded, allMBsProcessed, nil
}

func (tc *transactionCoordinator) requestMissingMiniBlocks(mbsInfo []*data.MiniBlockInfo) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one other problem is to stop requesting transactions after the first miniblock where there are requestedTxs (line 689):
if requestedTxs > 0 { shouldSkipShard[miniBlockInfo.SenderShardID] = true

The requests could be done for all the miniblocks that could be processed in a round.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but this is solved with this new approach by calling requestMissingMiniBlocks above. For all these mbs requested in this method, once they will be received in L1138 -> method func (tc *transactionCoordinator) receivedMiniBlock(key []byte, value interface{}) we will finally know all the tx hashes and request them if missing in L1162 -> numTxsRequested := preproc.RequestTransactionsForMiniBlock(miniBlock). Actually you cannot request txs for the missing mbs as you don't know their hashes. But I can add the txs request for all the miniblocks which are not missing in the same method -> requestMissingMiniBlocks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

functionality of requestMissingMiniBlocks and RequestMiniBlocks is very similar, could the refactored with a method that gets a list of miniblockHashes and does the request for those and be called from both other methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, will do this refactor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

mapMissingMiniBlocksPerShard := make(map[uint32][][]byte)

tc.requestedItemsHandler.Sweep()

for _, mbInfo := range mbsInfo {
_, isMiniBlockFound := tc.miniBlockPool.Peek(mbInfo.Hash)
if !isMiniBlockFound {
log.Debug("transactionCoordinator.requestMissingMiniBlocks: mini block not found and was requested",
"sender shard", mbInfo.SenderShardID,
"hash", mbInfo.Hash,
"round", mbInfo.Round,
)
mapMissingMiniBlocksPerShard[mbInfo.SenderShardID] = append(mapMissingMiniBlocksPerShard[mbInfo.SenderShardID], mbInfo.Hash)
_ = tc.requestedItemsHandler.Add(string(mbInfo.Hash))
}
}

for senderShardID, mbsHashes := range mapMissingMiniBlocksPerShard {
go tc.onRequestMiniBlocks(senderShardID, mbsHashes)
}
}

func initMiniBlockDestMeExecutionInfo() *createMiniBlockDestMeExecutionInfo {
return &createMiniBlockDestMeExecutionInfo{
processedTxHashes: make([][]byte, 0),
Expand Down Expand Up @@ -1084,14 +1108,24 @@ func (tc *transactionCoordinator) RequestMiniBlocks(header data.HeaderHandler) {

tc.requestedItemsHandler.Sweep()

mapMissingMiniBlocksPerShard := make(map[uint32][][]byte)
finalCrossMiniBlockHashes := tc.getFinalCrossMiniBlockHashes(header)
for key, senderShardId := range finalCrossMiniBlockHashes {
obj, _ := tc.miniBlockPool.Peek([]byte(key))
if obj == nil {
go tc.onRequestMiniBlock(senderShardId, []byte(key))
_, isMiniBlockFound := tc.miniBlockPool.Peek([]byte(key))
if !isMiniBlockFound {
log.Debug("transactionCoordinator.RequestMiniBlocks: mini block not found and was requested",
"sender shard", senderShardId,
"hash", []byte(key),
"round", header.GetRound(),
)
mapMissingMiniBlocksPerShard[senderShardId] = append(mapMissingMiniBlocksPerShard[senderShardId], []byte(key))
_ = tc.requestedItemsHandler.Add(key)
}
}

for senderShardID, mbsHashes := range mapMissingMiniBlocksPerShard {
go tc.onRequestMiniBlocks(senderShardID, mbsHashes)
}
}

func (tc *transactionCoordinator) getFinalCrossMiniBlockHashes(headerHandler data.HeaderHandler) map[string]uint32 {
Expand Down
52 changes: 50 additions & 2 deletions process/coordinator/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1824,9 +1824,9 @@ func TestTransactionCoordinator_RequestMiniblocks(t *testing.T) {
mutex := sync.Mutex{}

requestHandler := &testscommon.RequestHandlerStub{
RequestMiniBlockHandlerCalled: func(destShardID uint32, miniblockHash []byte) {
RequestMiniBlocksHandlerCalled: func(destShardID uint32, miniblocksHashes [][]byte) {
mutex.Lock()
nrCalled++
nrCalled += len(miniblocksHashes)
mutex.Unlock()
},
}
Expand Down Expand Up @@ -4383,3 +4383,51 @@ func TestTransactionCoordinator_getIndexesOfLastTxProcessed(t *testing.T) {
assert.Equal(t, mbh.GetIndexOfLastTxProcessed(), pi.indexOfLastTxProcessedByProposer)
})
}

func TestTransactionCoordinator_requestMissingMiniBlocksShouldWork(t *testing.T) {
t.Parallel()

args := createMockTransactionCoordinatorArguments()
args.MiniBlockPool = &testscommon.CacherStub{
PeekCalled: func(key []byte) (value interface{}, ok bool) {
if bytes.Equal(key, []byte("hash0")) || bytes.Equal(key, []byte("hash1")) || bytes.Equal(key, []byte("hash2")) {
return &block.MiniBlock{}, true
}
return nil, false
},
}
tc, _ := NewTransactionCoordinator(args)

wg := sync.WaitGroup{}
wg.Add(3)
mapRequestedMiniBlocksPerShard := make(map[uint32]int)
mutMap := sync.RWMutex{}
tc.onRequestMiniBlocks = func(shardId uint32, mbHashes [][]byte) {
mutMap.Lock()
mapRequestedMiniBlocksPerShard[shardId] += len(mbHashes)
mutMap.Unlock()
wg.Done()
}

mbsInfo := []*data.MiniBlockInfo{
{SenderShardID: 0},
{SenderShardID: 1},
{SenderShardID: 2},
{SenderShardID: 0, Hash: []byte("hash0")},
{SenderShardID: 1, Hash: []byte("hash1")},
{SenderShardID: 2, Hash: []byte("hash2")},
{SenderShardID: 0},
{SenderShardID: 1},
{SenderShardID: 0},
}

tc.requestMissingMiniBlocks(mbsInfo)

wg.Wait()

mutMap.RLock()
assert.Equal(t, 3, mapRequestedMiniBlocksPerShard[0])
assert.Equal(t, 2, mapRequestedMiniBlocksPerShard[1])
assert.Equal(t, 1, mapRequestedMiniBlocksPerShard[2])
mutMap.RUnlock()
}