Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new slot ticker and use it on attestation aggregation #12377

Merged
merged 13 commits into from
May 10, 2023
3 changes: 2 additions & 1 deletion beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,8 @@ func (b *BeaconNode) fetchBuilderService() *builder.Service {

func (b *BeaconNode) registerAttestationPool() error {
s, err := attestations.NewService(b.ctx, &attestations.Config{
Pool: b.attestationPool,
Pool: b.attestationPool,
InitialSyncComplete: b.initialSyncComplete,
})
if err != nil {
return errors.Wrap(err, "could not register atts pool service")
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/operations/attestations/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
deps = [
"//beacon-chain/operations/attestations/kv:go_default_library",
"//cache/lru:go_default_library",
"//config/features:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//crypto/hash:go_default_library",
Expand Down
24 changes: 18 additions & 6 deletions beacon-chain/operations/attestations/prepare_forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,39 @@ import (
"time"

"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v4/config/features"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/crypto/hash"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
attaggregation "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1/attestation/aggregation/attestations"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"go.opencensus.io/trace"
)

// Prepare attestations for fork choice three times per slot.
var prepareForkChoiceAttsPeriod = slots.DivideSlotBy(3 /* times-per-slot */)

// This prepares fork choice attestations by running batchForkChoiceAtts
// every prepareForkChoiceAttsPeriod.
func (s *Service) prepareForkChoiceAtts() {
ticker := time.NewTicker(prepareForkChoiceAttsPeriod)
defer ticker.Stop()
intervals := features.Get().AggregateIntervals
slotDuration := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
// Adjust intervals for networks with a lower slot duration (Hive, e2e, etc)
for {
if intervals[len(intervals)-1] >= slotDuration {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: This will panic if len(intervals) == 0. That might be OK, but consider a meaningful panic like

if len(intervals) == 0 {
  panic("unable to create timers for attestation aggregation with no intervals provided")
}

In my opinion, that is better than index out of range panic. Ideally, we never panic and simply return an error here though

If this code is temporary for the feature flag, then it might be OK as is. I'm thinking about future proofing so if there is not much future for this then you can dismiss this suggestion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

panic is good here IMO since the intervals are hardcoded configuration constants with defaults. Someone would have to remove the code with the configuration constants for this to panic.

for i, offset := range intervals {
intervals[i] = offset / 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this halved?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the intervals are set by default to be something that works for 12 seconds per slot, but special networks use other parameters. We are halving them to deal with the most common scenario of 6 seconds per slot in Hive. If someone wants to try 4 seconds per slot (which doesn't work even on interop) at least this prevents the panic

}
} else {
break
}
}
ticker := slots.NewSlotTickerWithIntervals(time.Unix(int64(s.genesisTime), 0), intervals)
for {
select {
case <-ticker.C:
case <-ticker.C():
t := time.Now()
if err := s.batchForkChoiceAtts(s.ctx); err != nil {
log.WithError(err).Error("Could not prepare attestations for fork choice")
}
log.WithField("latency", time.Since(t).Milliseconds()).Debug("batched forkchoice attestations")
case <-s.ctx.Done():
log.Debug("Context closed, exiting routine")
return
Expand Down
20 changes: 18 additions & 2 deletions beacon-chain/operations/attestations/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package attestations

import (
"context"
"errors"
"time"

lru "github.com/hashicorp/golang-lru"
Expand All @@ -26,8 +27,9 @@ type Service struct {

// Config options for the service.
type Config struct {
Pool Pool
pruneInterval time.Duration
Pool Pool
pruneInterval time.Duration
InitialSyncComplete chan struct{}
}

// NewService instantiates a new attestation pool service instance that will
Expand All @@ -51,10 +53,24 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {

// Start an attestation pool service's main event loop.
func (s *Service) Start() {
if err := s.waitForSync(s.cfg.InitialSyncComplete); err != nil {
log.WithError(err).Error("failed to wait for initial sync")
return
}
go s.prepareForkChoiceAtts()
go s.pruneAttsPool()
}

// waitForSync waits until the beacon node is synced to the latest head.
func (s *Service) waitForSync(syncChan chan struct{}) error {
select {
case <-syncChan:
return nil
case <-s.ctx.Done():
return errors.New("context closed, exiting goroutine")
}
}

// Stop the beacon block attestation pool service's main event loop
// and associated goroutines.
func (s *Service) Stop() error {
Expand Down
4 changes: 4 additions & 0 deletions config/features/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type Flags struct {
// KeystoreImportDebounceInterval specifies the time duration the validator waits to reload new keys if they have
// changed on disk. This feature is for advanced use cases only.
KeystoreImportDebounceInterval time.Duration

// AggregateIntervals specifies the time durations at which we aggregate attestations preparing for forkchoice.
AggregateIntervals []time.Duration
}

var featureConfig *Flags
Expand Down Expand Up @@ -218,6 +221,7 @@ func ConfigureBeaconChain(ctx *cli.Context) error {
logEnabled(buildBlockParallel)
cfg.BuildBlockParallel = true
}
cfg.AggregateIntervals = []time.Duration{aggregateFirstInterval.Value, aggregateSecondInterval.Value, aggregateThirdInterval.Value}
Init(cfg)
return nil
}
Expand Down
21 changes: 21 additions & 0 deletions config/features/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,24 @@ var (
Usage: "(Danger): Writes the wallet password to the wallet directory on completing Prysm web onboarding. " +
"We recommend against this flag unless you are an advanced user.",
}
aggregateFirstInterval = &cli.DurationFlag{
Name: "aggregate-first-interval",
Usage: "(Advanced): Specifies the first interval in which attestations are aggregated in the slot (typically unnaggregated attestations are aggregated in this interval)",
Value: 7 * time.Second,
Hidden: true,
}
aggregateSecondInterval = &cli.DurationFlag{
Name: "aggregate-second-interval",
Usage: "(Advanced): Specifies the second interval in which attestations are aggregated in the slot",
Value: 9 * time.Second,
Hidden: true,
}
aggregateThirdInterval = &cli.DurationFlag{
Name: "aggregate-third-interval",
Usage: "(Advanced): Specifies the third interval in which attestations are aggregated in the slot",
Value: 11 * time.Second,
Hidden: true,
}
dynamicKeyReloadDebounceInterval = &cli.DurationFlag{
Name: "dynamic-key-reload-debounce-interval",
Usage: "(Advanced): Specifies the time duration the validator waits to reload new keys if they have " +
Expand Down Expand Up @@ -168,6 +186,9 @@ var BeaconChainFlags = append(deprecatedBeaconFlags, append(deprecatedFlags, []c
enableVerboseSigVerification,
enableOptionalEngineMethods,
prepareAllPayloads,
aggregateFirstInterval,
aggregateSecondInterval,
aggregateThirdInterval,
buildBlockParallel,
}...)...)

Expand Down
1 change: 1 addition & 0 deletions time/slots/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ go_test(
"//time:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@com_github_stretchr_testify//require:go_default_library",
],
)
63 changes: 63 additions & 0 deletions time/slots/slotticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package slots
import (
"time"

"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
prysmTime "github.com/prysmaticlabs/prysm/v4/time"
)
Expand Down Expand Up @@ -104,3 +105,65 @@ func (s *SlotTicker) start(
}
}()
}

// startWithIntervals starts a ticker that emits a tick every slot at the
// prescribed intervals. The caller is responsible to make these intervals increasing and
// less than secondsPerSlot
func (s *SlotTicker) startWithIntervals(
genesisTime time.Time,
until func(time.Time) time.Duration,
after func(time.Duration) <-chan time.Time,
intervals []time.Duration) {
go func() {
slot := Since(genesisTime)
slot++
interval := 0
nextTickTime := startFromTime(genesisTime, slot).Add(intervals[0])

for {
waitTime := until(nextTickTime)
select {
case <-after(waitTime):
s.c <- slot
interval++
if interval == len(intervals) {
interval = 0
slot++
}
nextTickTime = startFromTime(genesisTime, slot).Add(intervals[interval])
case <-s.done:
return
}
}
}()
}

// NewSlotTickerWithIntervals starts and returns a SlotTicker instance that allows
// several offsets of time from genesis,
// Caller is responsible to input the intervals in increasing order and none bigger or equal than
// SecondsPerSlot
func NewSlotTickerWithIntervals(genesisTime time.Time, intervals []time.Duration) *SlotTicker {
if genesisTime.Unix() == 0 {
panic("zero genesis time")
}
if len(intervals) == 0 {
panic("at least one interval has to be entered")
}
slotDuration := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
lastOffset := time.Duration(0)
for _, offset := range intervals {
if offset < lastOffset {
panic("invalid decreasing offsets")
}
if offset >= slotDuration {
panic("invalid ticker offset")
}
lastOffset = offset
}
ticker := &SlotTicker{
c: make(chan primitives.Slot),
done: make(chan struct{}),
}
ticker.startWithIntervals(genesisTime, prysmTime.Until, time.After, intervals)
return ticker
}
48 changes: 48 additions & 0 deletions time/slots/slotticker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"testing"
"time"

"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/stretchr/testify/require"
)

var _ Ticker = (*SlotTicker)(nil)
Expand Down Expand Up @@ -136,3 +138,49 @@ func TestGetSlotTickerWithOffset_OK(t *testing.T) {
}
}
}

func TestGetSlotTickerWitIntervals(t *testing.T) {
genesisTime := time.Now()
offset := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second / 3
intervals := []time.Duration{offset, 2 * offset}

intervalTicker := NewSlotTickerWithIntervals(genesisTime, intervals)
normalTicker := NewSlotTicker(genesisTime, params.BeaconConfig().SecondsPerSlot)

firstTicked := 0
for {
select {
case <-intervalTicker.C():
// interval ticks starts in second slot
if firstTicked < 2 {
t.Fatal("Expected other ticker to tick first")
}
return
case <-normalTicker.C():
if firstTicked > 1 {
t.Fatal("Expected normal ticker to tick first")
}
firstTicked++
}
}
}

func TestSlotTickerWithIntervalsInputValidation(t *testing.T) {
var genesisTime time.Time
offset := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second / 3
intervals := make([]time.Duration, 0)
panicCall := func() {
NewSlotTickerWithIntervals(genesisTime, intervals)
}
require.Panics(t, panicCall, "zero genesis time")
genesisTime = time.Now()
require.Panics(t, panicCall, "at least one interval has to be entered")
intervals = []time.Duration{2 * offset, offset}
require.Panics(t, panicCall, "invalid decreasing offsets")
intervals = []time.Duration{offset, 4 * offset}
require.Panics(t, panicCall, "invalid ticker offset")
intervals = []time.Duration{4 * offset, offset}
require.Panics(t, panicCall, "invalid ticker offset")
intervals = []time.Duration{offset, 2 * offset}
require.NotPanics(t, panicCall)
}
11 changes: 8 additions & 3 deletions time/slots/slottime.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ import (
// incoming objects. (24 mins with mainnet spec)
const MaxSlotBuffer = uint64(1 << 7)

// startFromTime returns the slot start in terms of genesis time.Time
func startFromTime(genesis time.Time, slot primitives.Slot) time.Time {
duration := time.Second * time.Duration(slot.Mul(params.BeaconConfig().SecondsPerSlot))
return genesis.Add(duration) // lint:ignore uintcast -- Genesis timestamp will not exceed int64 in your lifetime.
}

// StartTime returns the start time in terms of its unix epoch
// value.
func StartTime(genesis uint64, slot primitives.Slot) time.Time {
duration := time.Second * time.Duration(slot.Mul(params.BeaconConfig().SecondsPerSlot))
startTime := time.Unix(int64(genesis), 0).Add(duration) // lint:ignore uintcast -- Genesis timestamp will not exceed int64 in your lifetime.
return startTime
genesisTime := time.Unix(int64(genesis), 0) // lint:ignore uintcast -- Genesis timestamp will not exceed int64 in your lifetime.
return startFromTime(genesisTime, slot)
}

// SinceGenesis returns the number of slots since
Expand Down