Skip to content

Commit

Permalink
Pending block queue caching with TTL (#7816)
Browse files Browse the repository at this point in the history
* Update pending blks queue to ttl one

* Update tests

* Comment

* Gazelle

* Fix fuzz

* More comments

* Fix fuxx import

* Nishant's feedback

* Happy lint

* Return error for len(blks) >= maxBlocksPerSlot

* Ensure proposer time conv

* don't use gcache's default exp time it's 0

* fix TestService_AddPeningBlockToQueueOverMax

* Update beacon-chain/sync/pending_blocks_queue.go

Co-authored-by: Nishant Das <nishdas93@gmail.com>

* Fix time conversion

Co-authored-by: Nishant Das <nishdas93@gmail.com>
  • Loading branch information
terencechain and nisdas committed Nov 19, 2020
1 parent eb7ab16 commit 4b6441f
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 81 deletions.
2 changes: 2 additions & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ go_library(
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
"@com_github_libp2p_go_libp2p_core//protocol:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_patrickmn_go_cache//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
Expand Down Expand Up @@ -176,6 +177,7 @@ go_test(
"@com_github_libp2p_go_libp2p_core//protocol:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
"@com_github_patrickmn_go_cache//:go_default_library",
"@com_github_protolambda_zssz//:go_default_library",
"@com_github_protolambda_zssz//types:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
Expand Down
4 changes: 3 additions & 1 deletion beacon-chain/sync/fuzz_exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ package sync

import (
"context"
"time"

"github.com/gogo/protobuf/proto"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
gcache "github.com/patrickmn/go-cache"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
)

Expand All @@ -26,7 +28,7 @@ func NewRegularSyncFuzz(cfg *Config) *Service {
chain: cfg.Chain,
initialSync: cfg.InitialSync,
attestationNotifier: cfg.AttestationNotifier,
slotToPendingBlocks: make(map[uint64][]*ethpb.SignedBeaconBlock),
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),
seenPendingBlocks: make(map[[32]byte]bool),
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof),
stateNotifier: cfg.StateNotifier,
Expand Down
116 changes: 92 additions & 24 deletions beacon-chain/sync/pending_blocks_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/hex"
"sort"
"sync"
"time"

"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
Expand All @@ -26,6 +27,7 @@ var processPendingBlocksPeriod = slotutil.DivideSlotBy(3 /* times per slot */)

const maxPeerRequest = 50
const numOfTries = 5
const maxBlocksPerSlot = 3

// processes pending blocks queue on every processPendingBlocksPeriod
func (s *Service) processPendingBlocksQueue() {
Expand Down Expand Up @@ -63,7 +65,7 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
span.AddAttributes(trace.Int64Attribute("slot", int64(slot)))

s.pendingQueueLock.RLock()
bs := s.slotToPendingBlocks[slot]
bs := s.pendingBlocksInCache(slot)
// Skip if there's no block in the queue.
if len(bs) == 0 {
s.pendingQueueLock.RUnlock()
Expand Down Expand Up @@ -99,7 +101,9 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
}
// Remove block from queue.
s.pendingQueueLock.Lock()
s.deleteBlockFromPendingQueue(slot, b, blkRoot)
if err := s.deleteBlockFromPendingQueue(slot, b, blkRoot); err != nil {
return err
}
s.pendingQueueLock.Unlock()
span.End()
continue
Expand Down Expand Up @@ -146,7 +150,9 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
}

s.pendingQueueLock.Lock()
s.deleteBlockFromPendingQueue(slot, b, blkRoot)
if err := s.deleteBlockFromPendingQueue(slot, b, blkRoot); err != nil {
return err
}
s.pendingQueueLock.Unlock()

log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -209,8 +215,11 @@ func (s *Service) sortedPendingSlots() []uint64 {
s.pendingQueueLock.RLock()
defer s.pendingQueueLock.RUnlock()

slots := make([]uint64, 0, len(s.slotToPendingBlocks))
for slot := range s.slotToPendingBlocks {
items := s.slotToPendingBlocks.Items()

slots := make([]uint64, 0, len(items))
for k := range items {
slot := cacheKeyToSlot(k)
slots = append(slots, slot)
}
sort.Slice(slots, func(i, j int) bool {
Expand All @@ -228,7 +237,13 @@ func (s *Service) validatePendingSlots() error {
oldBlockRoots := make(map[[32]byte]bool)

finalizedEpoch := s.chain.FinalizedCheckpt().Epoch
for slot, blks := range s.slotToPendingBlocks {
if s.slotToPendingBlocks == nil {
return errors.New("slotToPendingBlocks cache can't be nil")
}
items := s.slotToPendingBlocks.Items()
for k := range items {
slot := cacheKeyToSlot(k)
blks := s.pendingBlocksInCache(slot)
for _, b := range blks {
epoch := helpers.SlotToEpoch(slot)
// remove all descendant blocks of old blocks
Expand All @@ -238,7 +253,9 @@ func (s *Service) validatePendingSlots() error {
return err
}
oldBlockRoots[root] = true
s.deleteBlockFromPendingQueue(slot, b, root)
if err := s.deleteBlockFromPendingQueue(slot, b, root); err != nil {
return err
}
continue
}
// don't process old blocks
Expand All @@ -248,7 +265,9 @@ func (s *Service) validatePendingSlots() error {
return err
}
oldBlockRoots[blkRoot] = true
s.deleteBlockFromPendingQueue(slot, b, blkRoot)
if err := s.deleteBlockFromPendingQueue(slot, b, blkRoot); err != nil {
return err
}
}
}
}
Expand All @@ -258,19 +277,20 @@ func (s *Service) validatePendingSlots() error {
func (s *Service) clearPendingSlots() {
s.pendingQueueLock.Lock()
defer s.pendingQueueLock.Unlock()
s.slotToPendingBlocks = make(map[uint64][]*ethpb.SignedBeaconBlock)
s.slotToPendingBlocks.Flush()
s.seenPendingBlocks = make(map[[32]byte]bool)
}

// Delete block from the list from the pending queue using the slot as key.
// Note: this helper is not thread safe.
func (s *Service) deleteBlockFromPendingQueue(slot uint64, b *ethpb.SignedBeaconBlock, r [32]byte) {
func (s *Service) deleteBlockFromPendingQueue(slot uint64, b *ethpb.SignedBeaconBlock, r [32]byte) error {
mutexasserts.AssertRWMutexLocked(&s.pendingQueueLock)

blks, ok := s.slotToPendingBlocks[slot]
if !ok {
return
blks := s.pendingBlocksInCache(slot)
if len(blks) == 0 {
return nil
}

newBlks := make([]*ethpb.SignedBeaconBlock, 0, len(blks))
for _, blk := range blks {
if ssz.DeepEqual(blk, b) {
Expand All @@ -279,28 +299,76 @@ func (s *Service) deleteBlockFromPendingQueue(slot uint64, b *ethpb.SignedBeacon
newBlks = append(newBlks, blk)
}
if len(newBlks) == 0 {
delete(s.slotToPendingBlocks, slot)
return
s.slotToPendingBlocks.Delete(slotToCacheKey(slot))
return nil
}

// Decrease exp itme in proportion to how many blocks are still in the cache for slot key.
d := pendingBlockExpTime / time.Duration(len(newBlks))
if err := s.slotToPendingBlocks.Replace(slotToCacheKey(slot), newBlks, d); err != nil {
return err
}
s.slotToPendingBlocks[slot] = newBlks
delete(s.seenPendingBlocks, r)
return nil
}

// Insert block to the list in the pending queue using the slot as key.
// Note: this helper is not thread safe.
func (s *Service) insertBlockToPendingQueue(slot uint64, b *ethpb.SignedBeaconBlock, r [32]byte) {
func (s *Service) insertBlockToPendingQueue(slot uint64, b *ethpb.SignedBeaconBlock, r [32]byte) error {
mutexasserts.AssertRWMutexLocked(&s.pendingQueueLock)

if s.seenPendingBlocks[r] {
return
return nil
}

_, ok := s.slotToPendingBlocks[slot]
if ok {
blks := s.slotToPendingBlocks[slot]
s.slotToPendingBlocks[slot] = append(blks, b)
} else {
s.slotToPendingBlocks[slot] = []*ethpb.SignedBeaconBlock{b}
if err := s.addPendingBlockToCache(b); err != nil {
return err
}

s.seenPendingBlocks[r] = true
return nil
}

// This returns signed beacon blocks given input key from slotToPendingBlocks.
func (s *Service) pendingBlocksInCache(slot uint64) []*ethpb.SignedBeaconBlock {
k := slotToCacheKey(slot)
value, ok := s.slotToPendingBlocks.Get(k)
if !ok {
return []*ethpb.SignedBeaconBlock{}
}
blks, ok := value.([]*ethpb.SignedBeaconBlock)
if !ok {
return []*ethpb.SignedBeaconBlock{}
}
return blks
}

// This adds input signed beacon block to slotToPendingBlocks cache.
func (s *Service) addPendingBlockToCache(b *ethpb.SignedBeaconBlock) error {
if b == nil || b.Block == nil {
return errors.New("nil block")
}

blks := s.pendingBlocksInCache(b.Block.Slot)

if len(blks) >= maxBlocksPerSlot {
return nil
}

blks = append(blks, b)
k := slotToCacheKey(b.Block.Slot)
s.slotToPendingBlocks.Set(k, blks, pendingBlockExpTime)
return nil
}

// This converts input string to slot number in uint64.
func cacheKeyToSlot(s string) uint64 {
b := []byte(s)
return bytesutil.BytesToUint64BigEndian(b)
}

// This converts input slot number to a key to be used for slotToPendingBlocks cache.
func slotToCacheKey(s uint64) string {
b := bytesutil.Uint64ToBytesBigEndian(s)
return string(b)
}

0 comments on commit 4b6441f

Please sign in to comment.