Skip to content

Commit

Permalink
Use forkchoice to validate sync messages faster
Browse files Browse the repository at this point in the history
  • Loading branch information
potuz committed May 19, 2023
1 parent aeaa72f commit 235f733
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 20 deletions.
18 changes: 16 additions & 2 deletions beacon-chain/operations/synccommittee/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,22 @@ func (s *Store) SaveSyncCommitteeMessage(msg *ethpb.SyncCommitteeMessage) error
return errors.New("not typed []ethpb.SyncCommitteeMessage")
}

messages = append(messages, copied)
savedSyncCommitteeMessageTotal.Inc()
idx := -1
for i, msg := range messages {
if msg.ValidatorIndex == copied.ValidatorIndex {
idx = i
break
}
}
if idx >= 0 {
// Override the existing messages with a new one
messages[idx] = copied
} else {
// Append the new message
messages = append(messages, copied)
savedSyncCommitteeMessageTotal.Inc()
}

return s.messageCache.Push(&queue.Item{
Key: syncCommitteeKey(msg.Slot),
Value: messages,
Expand Down
39 changes: 29 additions & 10 deletions beacon-chain/sync/validate_sync_committee_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (s *Service) validateSyncCommitteeMessage(
ctx,
ignoreEmptyCommittee(committeeIndices),
s.rejectIncorrectSyncCommittee(committeeIndices, *msg.Topic),
s.ignoreHasSeenSyncMsg(m, committeeIndices),
s.ignoreHasSeenSyncMsg(ctx, m, committeeIndices),
s.rejectInvalidSyncCommitteeSignature(m),
); result != pubsub.ValidationAccept {
return result, err
Expand Down Expand Up @@ -123,24 +123,43 @@ func (s *Service) markSyncCommitteeMessagesSeen(committeeIndices []primitives.Co
subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount
for _, idx := range committeeIndices {
subnet := uint64(idx) / subCommitteeSize
s.setSeenSyncMessageIndexSlot(m.Slot, m.ValidatorIndex, subnet)
s.setSeenSyncMessageIndexSlot(m, subnet)
}
}

// Returns true if the node has received sync committee for the validator with index and slot.
func (s *Service) hasSeenSyncMessageIndexSlot(slot primitives.Slot, valIndex primitives.ValidatorIndex, subCommitteeIndex uint64) bool {
func (s *Service) hasSeenSyncMessageIndexSlot(ctx context.Context, m *ethpb.SyncCommitteeMessage, subCommitteeIndex uint64) bool {
s.seenSyncMessageLock.RLock()
defer s.seenSyncMessageLock.RUnlock()
_, seen := s.seenSyncMessageCache.Get(seenSyncCommitteeKey(slot, valIndex, subCommitteeIndex))
return seen
rt, seen := s.seenSyncMessageCache.Get(seenSyncCommitteeKey(m.Slot, m.ValidatorIndex, subCommitteeIndex))
if !seen {
// return early if this is the first message
return false
}
root, ok := rt.([32]byte)
if !ok {
return true // Impossible. Return true to be safe
}
if !s.cfg.chain.InForkchoice(root) {
return true
}
msgRoot := [32]byte(m.BlockRoot)
if !s.cfg.chain.InForkchoice(msgRoot) && !s.cfg.beaconDB.HasBlock(ctx, msgRoot) {
return false
}
headRoot := s.cfg.chain.CachedHeadRoot()
if root == headRoot {
return true
}
return msgRoot != headRoot
}

// Set sync committee message validator index and slot as seen.
func (s *Service) setSeenSyncMessageIndexSlot(slot primitives.Slot, valIndex primitives.ValidatorIndex, subCommitteeIndex uint64) {
func (s *Service) setSeenSyncMessageIndexSlot(m *ethpb.SyncCommitteeMessage, subCommitteeIndex uint64) {
s.seenSyncMessageLock.Lock()
defer s.seenSyncMessageLock.Unlock()
key := seenSyncCommitteeKey(slot, valIndex, subCommitteeIndex)
s.seenSyncMessageCache.Add(key, true)
key := seenSyncCommitteeKey(m.Slot, m.ValidatorIndex, subCommitteeIndex)
s.seenSyncMessageCache.Add(key, [32]byte(m.BlockRoot))
}

// The `subnet_id` is valid for the given validator. This implies the validator is part of the broader
Expand Down Expand Up @@ -184,15 +203,15 @@ func (s *Service) rejectIncorrectSyncCommittee(
// There has been no other valid sync committee signature for the declared `slot`, `validator_index`,
// and `subcommittee_index`. In the event of `validator_index` belongs to multiple subnets, as long
// as one subnet has not been seen, we should let it in.
func (s *Service) ignoreHasSeenSyncMsg(
func (s *Service) ignoreHasSeenSyncMsg(ctx context.Context,
m *ethpb.SyncCommitteeMessage, committeeIndices []primitives.CommitteeIndex,
) validationFn {
return func(ctx context.Context) (pubsub.ValidationResult, error) {
var isValid bool
subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount
for _, idx := range committeeIndices {
subnet := uint64(idx) / subCommitteeSize
if !s.hasSeenSyncMessageIndexSlot(m.Slot, m.ValidatorIndex, subnet) {
if !s.hasSeenSyncMessageIndexSlot(ctx, m, subnet) {
isValid = true
break
}
Expand Down
32 changes: 24 additions & 8 deletions beacon-chain/sync/validate_sync_committee_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,12 @@ func TestService_ValidateSyncCommitteeMessage(t *testing.T) {
s.cfg.stateGen = stategen.New(beaconDB, doublylinkedtree.New())
s.cfg.beaconDB = beaconDB
s.initCaches()

s.setSeenSyncMessageIndexSlot(1, 1, 0)
m := &ethpb.SyncCommitteeMessage{
Slot: 1,
ValidatorIndex: 1,
BlockRoot: params.BeaconConfig().ZeroHash[:],
}
s.setSeenSyncMessageIndexSlot(m, 0)
return s, topic, startup.NewClock(time.Now(), [32]byte{})
},
args: args{
Expand Down Expand Up @@ -441,30 +445,42 @@ func TestService_ignoreHasSeenSyncMsg(t *testing.T) {
name: "has seen",
setupSvc: func(s *Service, msg *ethpb.SyncCommitteeMessage, topic string) (*Service, string) {
s.initCaches()
s.setSeenSyncMessageIndexSlot(1, 0, 0)
m := &ethpb.SyncCommitteeMessage{
Slot: 1,
BlockRoot: params.BeaconConfig().ZeroHash[:],
}
s.setSeenSyncMessageIndexSlot(m, 0)
return s, ""
},
msg: &ethpb.SyncCommitteeMessage{ValidatorIndex: 0, Slot: 1},
msg: &ethpb.SyncCommitteeMessage{ValidatorIndex: 0, Slot: 1,
BlockRoot: params.BeaconConfig().ZeroHash[:]},
committee: []primitives.CommitteeIndex{1, 2, 3},
want: pubsub.ValidationIgnore,
},
{
name: "has not seen",
setupSvc: func(s *Service, msg *ethpb.SyncCommitteeMessage, topic string) (*Service, string) {
s.initCaches()
s.setSeenSyncMessageIndexSlot(1, 0, 0)
m := &ethpb.SyncCommitteeMessage{
Slot: 1,
BlockRoot: params.BeaconConfig().ZeroHash[:],
}
s.setSeenSyncMessageIndexSlot(m, 0)
return s, ""
},
msg: &ethpb.SyncCommitteeMessage{ValidatorIndex: 1, Slot: 1},
msg: &ethpb.SyncCommitteeMessage{ValidatorIndex: 1, Slot: 1,
BlockRoot: bytesutil.PadTo([]byte{'A'}, 32)},
committee: []primitives.CommitteeIndex{1, 2, 3},
want: pubsub.ValidationAccept,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &Service{}
s := &Service{
cfg: &config{chain: &mockChain.ChainService{}},
}
s, _ = tt.setupSvc(s, tt.msg, "")
f := s.ignoreHasSeenSyncMsg(tt.msg, tt.committee)
f := s.ignoreHasSeenSyncMsg(context.Background(), tt.msg, tt.committee)
result, err := f(context.Background())
_ = err
require.Equal(t, tt.want, result)
Expand Down

0 comments on commit 235f733

Please sign in to comment.