Skip to content

Commit

Permalink
Merge branch 'peerDAS' into peerdas-check-kzg-sampling
Browse files Browse the repository at this point in the history
  • Loading branch information
nalepae committed Jul 8, 2024
2 parents b4ad914 + d57db9b commit e6ca62d
Show file tree
Hide file tree
Showing 32 changed files with 90 additions and 61 deletions.
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ func missingDataColumns(bs *filesystem.BlobStorage, root [32]byte, expected map[
// sidecars are missing, it will then read from the blobNotifier channel for the given root until the channel is
// closed, the context hits cancellation/timeout, or notifications have been received for all the missing sidecars.
func (s *Service) isDataAvailable(ctx context.Context, root [32]byte, signed interfaces.ReadOnlySignedBeaconBlock) error {
if features.Get().EnablePeerDAS {
if coreTime.PeerDASIsActive(signed.Block().Slot()) {
return s.isDataAvailableDataColumns(ctx, root, signed)
}
if signed.Version() < version.Deneb {
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/core/time/slot_epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func HigherEqualThanAltairVersionAndEpoch(s state.BeaconState, e primitives.Epoc
return s.Version() >= version.Altair && e >= params.BeaconConfig().AltairForkEpoch
}

// PeerDASIsActive checks whether peerDAS is active at the provided slot.
func PeerDASIsActive(slot primitives.Slot) bool {
return params.PeerDASEnabled() && slots.ToEpoch(slot) >= params.BeaconConfig().Eip7594ForkEpoch
}

// CanUpgradeToAltair returns true if the input `slot` can upgrade to Altair.
// Spec code:
// If state.slot % SLOTS_PER_EPOCH == 0 and compute_epoch_at_slot(state.slot) == ALTAIR_FORK_EPOCH
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ go_test(
"//beacon-chain/p2p/types:go_default_library",
"//beacon-chain/startup:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/features:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (s *Service) createLocalNode(
localNode.Set(quicEntry)
}

if features.Get().EnablePeerDAS {
if params.PeerDASEnabled() {
localNode.Set(peerdas.Csc(peerdas.CustodySubnetCount()))
}

Expand Down
10 changes: 4 additions & 6 deletions beacon-chain/p2p/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/scorers"
testp2p "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/wrapper"
leakybucket "github.com/prysmaticlabs/prysm/v5/container/leaky-bucket"
Expand Down Expand Up @@ -134,11 +133,10 @@ func TestStartDiscV5_DiscoverAllPeers(t *testing.T) {
}

func TestCreateLocalNode(t *testing.T) {
resetFn := features.InitWithReset(&features.Flags{
EnablePeerDAS: true,
})
defer resetFn()

params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig()
cfg.Eip7594ForkEpoch = 1
params.OverrideBeaconConfig(cfg)
testCases := []struct {
name string
cfg *Config
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/wrapper"
ecdsaprysm "github.com/prysmaticlabs/prysm/v5/crypto/ecdsa"
"github.com/prysmaticlabs/prysm/v5/io/file"
Expand Down Expand Up @@ -78,7 +78,7 @@ func privKey(cfg *Config) (*ecdsa.PrivateKey, error) {
}

// If the StaticPeerID flag is not set and if peerDAS is not enabled, return the private key.
if !(cfg.StaticPeerID || features.Get().EnablePeerDAS) {
if !(cfg.StaticPeerID || params.PeerDASEnabled()) {
return ecdsaprysm.ConvertFromInterfacePrivKey(priv)
}

Expand Down
5 changes: 4 additions & 1 deletion beacon-chain/rpc/eth/config/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func TestGetSpec(t *testing.T) {
config.DenebForkEpoch = 105
config.ElectraForkVersion = []byte("ElectraForkVersion")
config.ElectraForkEpoch = 107
config.Eip7594ForkEpoch = 109
config.BLSWithdrawalPrefixByte = byte('b')
config.ETH1AddressWithdrawalPrefixByte = byte('c')
config.GenesisDelay = 24
Expand Down Expand Up @@ -193,7 +194,7 @@ func TestGetSpec(t *testing.T) {
data, ok := resp.Data.(map[string]interface{})
require.Equal(t, true, ok)

assert.Equal(t, 155, len(data))
assert.Equal(t, 156, len(data))
for k, v := range data {
t.Run(k, func(t *testing.T) {
switch k {
Expand Down Expand Up @@ -271,6 +272,8 @@ func TestGetSpec(t *testing.T) {
assert.Equal(t, "0x"+hex.EncodeToString([]byte("ElectraForkVersion")), v)
case "ELECTRA_FORK_EPOCH":
assert.Equal(t, "107", v)
case "EIP7594_FORK_EPOCH":
assert.Equal(t, "109", v)
case "MIN_ANCHOR_POW_BLOCK_DIFFICULTY":
assert.Equal(t, "1000", v)
case "BLS_WITHDRAWAL_PREFIX":
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/kv"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
Expand Down Expand Up @@ -272,8 +273,6 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
dataColumnSideCars []*ethpb.DataColumnSidecar
)

isPeerDASEnabled := features.Get().EnablePeerDAS

ctx, span := trace.StartSpan(ctx, "ProposerServer.ProposeBeaconBlock")
defer span.End()

Expand All @@ -285,6 +284,7 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%s: %v", "decode block failed", err)
}
isPeerDASEnabled := coreTime.PeerDASIsActive(block.Block().Slot())

if block.IsBlinded() {
block, blobSidecars, dataColumnSideCars, err = vs.handleBlindedBlock(ctx, block, isPeerDASEnabled)
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ go_library(
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/core/transition/interop:go_default_library",
"//beacon-chain/db:go_default_library",
Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/sync/data_columns_sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
"github.com/sirupsen/logrus"

"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed"
Expand Down Expand Up @@ -412,6 +413,11 @@ func (s *Service) processEvent(e *feed.Event, nonCustodyColums map[uint64]bool,
return
}

if coreTime.PeerDASIsActive(data.Slot) {
// We do not trigger sampling if peerDAS is not active yet.
return
}

// Get the commitments for this block.
commitments, err := data.SignedBlock.Block().Body().BlobKzgCommitments()
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions beacon-chain/sync/fork_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ func (s *Service) registerForUpcomingFork(currEpoch primitives.Epoch) error {
s.registerRPCHandlersDeneb()
}
}
// Specially handle peerDAS
if params.PeerDASEnabled() && currEpoch+1 == params.BeaconConfig().Eip7594ForkEpoch {
s.registerRPCHandlersPeerDAS()
}

return nil
}

Expand Down Expand Up @@ -121,5 +126,9 @@ func (s *Service) deregisterFromPastFork(currEpoch primitives.Epoch) error {
}
}
}
// Handle PeerDAS as its a special case.
if params.PeerDASEnabled() && currEpoch > 0 && (currEpoch-1) == params.BeaconConfig().Eip7594ForkEpoch {
s.unregisterBlobHandlers()
}
return nil
}
2 changes: 1 addition & 1 deletion beacon-chain/sync/initial-sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//beacon-chain/core/feed/block:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/das:go_default_library",
"//beacon-chain/db:go_default_library",
Expand All @@ -33,7 +34,6 @@ go_library(
"//beacon-chain/sync/verify:go_default_library",
"//beacon-chain/verification:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/features:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/initial-sync/blocks_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opencensus.io/trace"

"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
Expand All @@ -22,7 +23,6 @@ import (
prysmsync "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/verify"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
blocks2 "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
Expand Down Expand Up @@ -319,7 +319,7 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start primitives.Slot

response.bwb, response.pid, response.err = f.fetchBlocksFromPeer(ctx, start, count, peers)
if response.err == nil {
if features.Get().EnablePeerDAS {
if coreTime.PeerDASIsActive(start) {
bwb, err := f.fetchColumnsFromPeer(ctx, response.bwb, response.pid, peers)
if err != nil {
response.err = err
Expand Down
6 changes: 3 additions & 3 deletions beacon-chain/sync/initial-sync/blocks_fetcher_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (

"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
p2pTypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
Expand Down Expand Up @@ -280,7 +280,7 @@ func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot
return nil, errors.Wrap(err, "invalid blocks received in findForkWithPeer")
}
var bwb []blocks.BlockWithROBlobs
if features.Get().EnablePeerDAS {
if coreTime.PeerDASIsActive(block.Block().Slot()) {
bwb, err = f.fetchColumnsFromPeer(ctx, altBlocks, pid, []peer.ID{pid})
if err != nil {
return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findForkWithPeer")
Expand Down Expand Up @@ -312,7 +312,7 @@ func (f *blocksFetcher) findAncestor(ctx context.Context, pid peer.ID, b interfa
if err != nil {
return nil, errors.Wrap(err, "received invalid blocks in findAncestor")
}
if features.Get().EnablePeerDAS {
if coreTime.PeerDASIsActive(b.Block().Slot()) {
bwb, err = f.fetchColumnsFromPeer(ctx, bwb, pid, []peer.ID{pid})
if err != nil {
return nil, errors.Wrap(err, "unable to retrieve columns for blocks found in findAncestor")
Expand Down
6 changes: 3 additions & 3 deletions beacon-chain/sync/initial-sync/round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/paulbellamy/ratecounter"
"github.com/pkg/errors"
coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
Expand Down Expand Up @@ -173,7 +173,7 @@ func (s *Service) processFetchedDataRegSync(
if len(bwb) == 0 {
return
}
if features.Get().EnablePeerDAS {
if coreTime.PeerDASIsActive(startSlot) {
avs := das.NewLazilyPersistentStoreColumn(s.cfg.BlobStorage, emptyVerifier{}, s.cfg.P2P.NodeID())
batchFields := logrus.Fields{
"firstSlot": data.bwb[0].Block.Block().Slot(),
Expand Down Expand Up @@ -357,7 +357,7 @@ func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time,
errParentDoesNotExist, first.Block().ParentRoot(), first.Block().Slot())
}
var aStore das.AvailabilityStore
if features.Get().EnablePeerDAS {
if coreTime.PeerDASIsActive(first.Block().Slot()) {
avs := das.NewLazilyPersistentStoreColumn(s.cfg.BlobStorage, emptyVerifier{}, s.cfg.P2P.NodeID())
s.logBatchSyncStatus(genesis, first, len(bwb))
for _, bb := range bwb {
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/initial-sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
blockfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/block"
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
Expand All @@ -27,7 +28,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/crypto/rand"
Expand Down Expand Up @@ -187,7 +187,7 @@ func (s *Service) Start() {
log.WithError(err).Error("Error waiting for minimum number of peers")
return
}
if features.Get().EnablePeerDAS {
if coreTime.PeerDASIsActive(s.cfg.Chain.HeadSlot()) {
if err := s.fetchOriginColumns(peers); err != nil {
log.WithError(err).Error("Failed to fetch missing columns for checkpoint origin")
return
Expand Down
5 changes: 3 additions & 2 deletions beacon-chain/sync/pending_blocks_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/async"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
Expand Down Expand Up @@ -204,7 +204,8 @@ func (s *Service) processAndBroadcastBlock(ctx context.Context, b interfaces.Rea
return err
}
}
if features.Get().EnablePeerDAS {

if coreTime.PeerDASIsActive(b.Block().Slot()) {
request, err := s.pendingDataColumnRequestForBlock(blkRoot, b)
if err != nil {
return err
Expand Down
31 changes: 19 additions & 12 deletions beacon-chain/sync/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
ssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing"
"github.com/prysmaticlabs/prysm/v5/time/slots"
Expand Down Expand Up @@ -99,17 +98,6 @@ func (s *Service) registerRPCHandlersAltair() {
}

func (s *Service) registerRPCHandlersDeneb() {
if features.Get().EnablePeerDAS {
s.registerRPC(
p2p.RPCDataColumnSidecarsByRootTopicV1,
s.dataColumnSidecarByRootRPCHandler,
)
s.registerRPC(
p2p.RPCDataColumnSidecarsByRangeTopicV1,
s.dataColumnSidecarsByRangeRPCHandler,
)
return
}
s.registerRPC(
p2p.RPCBlobSidecarsByRangeTopicV1,
s.blobSidecarsByRangeRPCHandler,
Expand All @@ -120,6 +108,17 @@ func (s *Service) registerRPCHandlersDeneb() {
)
}

func (s *Service) registerRPCHandlersPeerDAS() {
s.registerRPC(
p2p.RPCDataColumnSidecarsByRootTopicV1,
s.dataColumnSidecarByRootRPCHandler,
)
s.registerRPC(
p2p.RPCDataColumnSidecarsByRangeTopicV1,
s.dataColumnSidecarsByRangeRPCHandler,
)
}

// Remove all v1 Stream handlers that are no longer supported
// from altair onwards.
func (s *Service) unregisterPhase0Handlers() {
Expand All @@ -132,6 +131,14 @@ func (s *Service) unregisterPhase0Handlers() {
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullMetadataTopic))
}

func (s *Service) unregisterBlobHandlers() {
fullBlobRangeTopic := p2p.RPCBlobSidecarsByRangeTopicV1 + s.cfg.p2p.Encoding().ProtocolSuffix()
fullBlobRootTopic := p2p.RPCBlobSidecarsByRootTopicV1 + s.cfg.p2p.Encoding().ProtocolSuffix()

s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullBlobRangeTopic))
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullBlobRootTopic))
}

// registerRPC for a given topic with an expected protobuf message type.
func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
topic := baseTopic + s.cfg.p2p.Encoding().ProtocolSuffix()
Expand Down
Loading

0 comments on commit e6ca62d

Please sign in to comment.