/
broadcast_bls_changes.go
88 lines (81 loc) · 2.38 KB
/
broadcast_bls_changes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package sync
import (
"context"
"time"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/v4/config/params"
types "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/crypto/rand"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/time/slots"
)
const broadcastBLSChangesRateLimit = 128
// This routine broadcasts known BLS changes at the Capella fork.
func (s *Service) broadcastBLSChanges(currSlot types.Slot) {
capellaSlotStart, err := slots.EpochStart(params.BeaconConfig().CapellaForkEpoch)
if err != nil {
// only possible error is an overflow, so we exit early from the method
return
}
if currSlot != capellaSlotStart {
return
}
changes, err := s.cfg.blsToExecPool.PendingBLSToExecChanges()
if err != nil {
log.WithError(err).Error("could not get BLS to execution changes")
}
if len(changes) == 0 {
return
}
source := rand.NewGenerator()
length := len(changes)
broadcastChanges := make([]*ethpb.SignedBLSToExecutionChange, length)
for i := 0; i < length; i++ {
idx := source.Intn(len(changes))
broadcastChanges[i] = changes[idx]
changes = append(changes[:idx], changes[idx+1:]...)
}
go s.rateBLSChanges(s.ctx, broadcastChanges)
}
func (s *Service) broadcastBLSBatch(ctx context.Context, ptr *[]*ethpb.SignedBLSToExecutionChange) {
limit := broadcastBLSChangesRateLimit
if len(*ptr) < broadcastBLSChangesRateLimit {
limit = len(*ptr)
}
st, err := s.cfg.chain.HeadStateReadOnly(ctx)
if err != nil {
log.WithError(err).Error("could not get head state")
return
}
for _, ch := range (*ptr)[:limit] {
if ch != nil {
_, err := blocks.ValidateBLSToExecutionChange(st, ch)
if err != nil {
log.WithError(err).Error("could not validate BLS to execution change")
continue
}
if err := s.cfg.p2p.Broadcast(ctx, ch); err != nil {
log.WithError(err).Error("could not broadcast BLS to execution changes.")
}
}
}
*ptr = (*ptr)[limit:]
}
func (s *Service) rateBLSChanges(ctx context.Context, changes []*ethpb.SignedBLSToExecutionChange) {
s.broadcastBLSBatch(ctx, &changes)
if len(changes) == 0 {
return
}
ticker := time.NewTicker(500 * time.Millisecond)
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
s.broadcastBLSBatch(ctx, &changes)
if len(changes) == 0 {
return
}
}
}
}