diff --git a/beacon-chain/core/peerdas/BUILD.bazel b/beacon-chain/core/peerdas/BUILD.bazel index 6cb8765fa5a..62b82f5fa83 100644 --- a/beacon-chain/core/peerdas/BUILD.bazel +++ b/beacon-chain/core/peerdas/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//proto/prysm/v1alpha1:go_default_library", "@com_github_ethereum_c_kzg_4844//bindings/go:go_default_library", "@com_github_ethereum_go_ethereum//p2p/enode:go_default_library", + "@com_github_ethereum_go_ethereum//p2p/enr:go_default_library", "@com_github_holiman_uint256//:go_default_library", "@com_github_pkg_errors//:go_default_library", ], diff --git a/beacon-chain/core/peerdas/helpers.go b/beacon-chain/core/peerdas/helpers.go index a551aad6195..743ff8bd926 100644 --- a/beacon-chain/core/peerdas/helpers.go +++ b/beacon-chain/core/peerdas/helpers.go @@ -3,9 +3,11 @@ package peerdas import ( "encoding/binary" "math" + "math/big" cKzg4844 "github.com/ethereum/c-kzg-4844/bindings/go" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" "github.com/holiman/uint256" errors "github.com/pkg/errors" @@ -19,13 +21,24 @@ import ( ) // Bytes per cell -const bytesPerCell = cKzg4844.FieldElementsPerCell * cKzg4844.BytesPerFieldElement +const ( + CustodySubnetCountEnrKey = "csc" + + bytesPerCell = cKzg4844.FieldElementsPerCell * cKzg4844.BytesPerFieldElement +) + +// https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/p2p-interface.md#the-discovery-domain-discv5 +type Csc uint64 + +func (Csc) ENRKey() string { return CustodySubnetCountEnrKey } var ( // Custom errors - errCustodySubnetCountTooLarge = errors.New("custody subnet count larger than data column sidecar subnet count") - errIndexTooLarge = errors.New("column index is larger than the specified number of columns") - errMismatchLength = errors.New("mismatch in the length of the commitments and proofs") + errCustodySubnetCountTooLarge = errors.New("custody subnet count larger than data column sidecar subnet count") + errIndexTooLarge = errors.New("column index is larger than the specified columns count") + errMismatchLength = errors.New("mismatch in the length of the commitments and proofs") + errRecordNil = errors.New("record is nil") + errCannotLoadCustodySubnetCount = errors.New("cannot load the custody subnet count from peer") // maxUint256 is the maximum value of a uint256. maxUint256 = &uint256.Int{math.MaxUint64, math.MaxUint64, math.MaxUint64, math.MaxUint64} @@ -311,3 +324,67 @@ func CustodySubnetCount() uint64 { } return count } + +// HypergeomCDF computes the hypergeometric cumulative distribution function. +// https://en.wikipedia.org/wiki/Hypergeometric_distribution +func HypergeomCDF(k, M, n, N uint64) float64 { + denominatorInt := new(big.Int).Binomial(int64(M), int64(N)) // lint:ignore uintcast + denominator := new(big.Float).SetInt(denominatorInt) + + rBig := big.NewFloat(0) + + for i := uint64(0); i < k+1; i++ { + a := new(big.Int).Binomial(int64(n), int64(i)) // lint:ignore uintcast + b := new(big.Int).Binomial(int64(M-n), int64(N-i)) + numeratorInt := new(big.Int).Mul(a, b) + numerator := new(big.Float).SetInt(numeratorInt) + item := new(big.Float).Quo(numerator, denominator) + rBig.Add(rBig, item) + } + + r, _ := rBig.Float64() + + return r +} + +// ExtendedSampleCount computes, for a given number of samples per slot and allowed failures the +// number of samples we should actually query from peers. +// TODO: Add link to the specification once it is available. +func ExtendedSampleCount(samplesPerSlot, allowedFailures uint64) uint64 { + // Retrieve the columns count + columnsCount := params.BeaconConfig().NumberOfColumns + + // If half of the columns are missing, we are able to reconstruct the data. + // If half of the columns + 1 are missing, we are not able to reconstruct the data. + // This is the smallest worst case. + worstCaseMissing := columnsCount/2 + 1 + + // Compute the false positive threshold. + falsePositiveThreshold := HypergeomCDF(0, columnsCount, worstCaseMissing, samplesPerSlot) + + var sampleCount uint64 + + // Finally, compute the extended sample count. + for sampleCount = samplesPerSlot; sampleCount < columnsCount+1; sampleCount++ { + if HypergeomCDF(allowedFailures, columnsCount, worstCaseMissing, sampleCount) <= falsePositiveThreshold { + break + } + } + + return sampleCount +} + +func CustodyCountFromRecord(record *enr.Record) (uint64, error) { + // By default, we assume the peer custodies the minimum number of subnets. + if record == nil { + return 0, errRecordNil + } + + // Load the `custody_subnet_count` + var csc Csc + if err := record.Load(&csc); err != nil { + return 0, errCannotLoadCustodySubnetCount + } + + return uint64(csc), nil +} diff --git a/beacon-chain/core/peerdas/helpers_test.go b/beacon-chain/core/peerdas/helpers_test.go index 4a798590c9e..401fb9c0033 100644 --- a/beacon-chain/core/peerdas/helpers_test.go +++ b/beacon-chain/core/peerdas/helpers_test.go @@ -89,3 +89,55 @@ func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) { require.Equal(t, true, verified, fmt.Sprintf("sidecar %d failed", i)) } } + +func TestHypergeomCDF(t *testing.T) { + // Test case from https://en.wikipedia.org/wiki/Hypergeometric_distribution + // Population size: 1000, number of successes in population: 500, sample size: 10, number of successes in sample: 5 + // Expected result: 0.072 + const ( + expected = 0.0796665913283742 + margin = 0.000001 + ) + + actual := peerdas.HypergeomCDF(5, 128, 65, 16) + require.Equal(t, true, expected-margin <= actual && actual <= expected+margin) +} + +func TestExtendedSampleCount(t *testing.T) { + const samplesPerSlot = 16 + + testCases := []struct { + name string + allowedMissings uint64 + extendedSampleCount uint64 + }{ + {name: "allowedMissings=0", allowedMissings: 0, extendedSampleCount: 16}, + {name: "allowedMissings=1", allowedMissings: 1, extendedSampleCount: 20}, + {name: "allowedMissings=2", allowedMissings: 2, extendedSampleCount: 24}, + {name: "allowedMissings=3", allowedMissings: 3, extendedSampleCount: 27}, + {name: "allowedMissings=4", allowedMissings: 4, extendedSampleCount: 29}, + {name: "allowedMissings=5", allowedMissings: 5, extendedSampleCount: 32}, + {name: "allowedMissings=6", allowedMissings: 6, extendedSampleCount: 35}, + {name: "allowedMissings=7", allowedMissings: 7, extendedSampleCount: 37}, + {name: "allowedMissings=8", allowedMissings: 8, extendedSampleCount: 40}, + {name: "allowedMissings=9", allowedMissings: 9, extendedSampleCount: 42}, + {name: "allowedMissings=10", allowedMissings: 10, extendedSampleCount: 44}, + {name: "allowedMissings=11", allowedMissings: 11, extendedSampleCount: 47}, + {name: "allowedMissings=12", allowedMissings: 12, extendedSampleCount: 49}, + {name: "allowedMissings=13", allowedMissings: 13, extendedSampleCount: 51}, + {name: "allowedMissings=14", allowedMissings: 14, extendedSampleCount: 53}, + {name: "allowedMissings=15", allowedMissings: 15, extendedSampleCount: 55}, + {name: "allowedMissings=16", allowedMissings: 16, extendedSampleCount: 57}, + {name: "allowedMissings=17", allowedMissings: 17, extendedSampleCount: 59}, + {name: "allowedMissings=18", allowedMissings: 18, extendedSampleCount: 61}, + {name: "allowedMissings=19", allowedMissings: 19, extendedSampleCount: 63}, + {name: "allowedMissings=20", allowedMissings: 20, extendedSampleCount: 65}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := peerdas.ExtendedSampleCount(samplesPerSlot, tc.allowedMissings) + require.Equal(t, tc.extendedSampleCount, result) + }) + } +} diff --git a/beacon-chain/p2p/BUILD.bazel b/beacon-chain/p2p/BUILD.bazel index 0299c276c1a..864ffe1f067 100644 --- a/beacon-chain/p2p/BUILD.bazel +++ b/beacon-chain/p2p/BUILD.bazel @@ -120,6 +120,7 @@ go_test( "addr_factory_test.go", "broadcaster_test.go", "connection_gater_test.go", + "custody_test.go", "dial_relay_node_test.go", "discovery_test.go", "fork_test.go", diff --git a/beacon-chain/p2p/custody.go b/beacon-chain/p2p/custody.go index 04cd0e26742..bf7227217b0 100644 --- a/beacon-chain/p2p/custody.go +++ b/beacon-chain/p2p/custody.go @@ -9,68 +9,95 @@ import ( "github.com/prysmaticlabs/prysm/v5/config/params" ) +// GetValidCustodyPeers returns a list of peers that custody a super set of the local node's custody columns. func (s *Service) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) { - custodiedColumns, err := peerdas.CustodyColumns(s.NodeID(), peerdas.CustodySubnetCount()) + // Get the total number of columns. + numberOfColumns := params.BeaconConfig().NumberOfColumns + + localCustodySubnetCount := peerdas.CustodySubnetCount() + localCustodyColumns, err := peerdas.CustodyColumns(s.NodeID(), localCustodySubnetCount) if err != nil { - return nil, err + return nil, errors.Wrap(err, "custody columns for local node") } - var validPeers []peer.ID + + localCustotyColumnsCount := uint64(len(localCustodyColumns)) + + // Find the valid peers. + validPeers := make([]peer.ID, 0, len(peers)) + +loop: for _, pid := range peers { - remoteCount := s.CustodyCountFromRemotePeer(pid) + // Get the custody subnets count of the remote peer. + remoteCustodySubnetCount := s.CustodyCountFromRemotePeer(pid) - nodeId, err := ConvertPeerIDToNodeID(pid) + // Get the remote node ID from the peer ID. + remoteNodeID, err := ConvertPeerIDToNodeID(pid) if err != nil { return nil, errors.Wrap(err, "convert peer ID to node ID") } - remoteCustodiedColumns, err := peerdas.CustodyColumns(nodeId, remoteCount) + + // Get the custody columns of the remote peer. + remoteCustodyColumns, err := peerdas.CustodyColumns(remoteNodeID, remoteCustodySubnetCount) if err != nil { return nil, errors.Wrap(err, "custody columns") } - invalidPeer := false - for c := range custodiedColumns { - if !remoteCustodiedColumns[c] { - invalidPeer = true - break - } + + remoteCustodyColumnsCount := uint64(len(remoteCustodyColumns)) + + // If the remote peer custodies less columns than the local node, skip it. + if remoteCustodyColumnsCount < localCustotyColumnsCount { + continue } - if invalidPeer { + + // If the remote peers custodies all the possible columns, add it to the list. + if remoteCustodyColumnsCount == numberOfColumns { + copiedId := pid + validPeers = append(validPeers, copiedId) continue } + + // Filter out invalid peers. + for c := range localCustodyColumns { + if !remoteCustodyColumns[c] { + continue loop + } + } + copiedId := pid + // Add valid peer to list validPeers = append(validPeers, copiedId) } + return validPeers, nil } +// CustodyCountFromRemotePeer retrieves the custody count from a remote peer. func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) uint64 { // By default, we assume the peer custodies the minimum number of subnets. - peerCustodyCountCount := params.BeaconConfig().CustodyRequirement + custodyRequirement := params.BeaconConfig().CustodyRequirement // Retrieve the ENR of the peer. - peerRecord, err := s.peers.ENR(pid) + record, err := s.peers.ENR(pid) if err != nil { - log.WithError(err).WithField("peerID", pid).Error("Failed to retrieve ENR for peer") - return peerCustodyCountCount - } + log.WithError(err).WithFields(logrus.Fields{ + "peerID": pid, + "defaultValue": custodyRequirement, + }).Error("Failed to retrieve ENR for peer, defaulting to the default value") - if peerRecord == nil { - // This is the case for inbound peers. So we don't log an error for this. - log.WithField("peerID", pid).Debug("No ENR found for peer") - return peerCustodyCountCount + return custodyRequirement } - // Load the `custody_subnet_count` - var csc CustodySubnetCount - if err := peerRecord.Load(&csc); err != nil { - log.WithField("peerID", pid).Error("Cannot load the custody_subnet_count from peer") - return peerCustodyCountCount - } + // Retrieve the custody subnets count from the ENR. + custodyCount, err := peerdas.CustodyCountFromRecord(record) + if err != nil { + log.WithError(err).WithFields(logrus.Fields{ + "peerID": pid, + "defaultValue": custodyRequirement, + }).Error("Failed to retrieve custody count from ENR for peer, defaulting to the default value") - log.WithFields(logrus.Fields{ - "peerID": pid, - "custodyCount": csc, - }).Debug("Custody count read from peer's ENR") + return custodyRequirement + } - return uint64(csc) + return custodyCount } diff --git a/beacon-chain/p2p/custody_test.go b/beacon-chain/p2p/custody_test.go new file mode 100644 index 00000000000..450f2538918 --- /dev/null +++ b/beacon-chain/p2p/custody_test.go @@ -0,0 +1,163 @@ +package p2p + +import ( + "context" + "crypto/ecdsa" + "net" + "testing" + "time" + + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/scorers" + "github.com/prysmaticlabs/prysm/v5/config/params" + ecdsaprysm "github.com/prysmaticlabs/prysm/v5/crypto/ecdsa" + prysmNetwork "github.com/prysmaticlabs/prysm/v5/network" + "github.com/prysmaticlabs/prysm/v5/testing/require" +) + +func createPeer(t *testing.T, privateKeyOffset int, custodyCount uint64) (*enr.Record, peer.ID, *ecdsa.PrivateKey) { + privateKeyBytes := make([]byte, 32) + for i := 0; i < 32; i++ { + privateKeyBytes[i] = byte(privateKeyOffset + i) + } + + unmarshalledPrivateKey, err := crypto.UnmarshalSecp256k1PrivateKey(privateKeyBytes) + require.NoError(t, err) + + privateKey, err := ecdsaprysm.ConvertFromInterfacePrivKey(unmarshalledPrivateKey) + require.NoError(t, err) + + peerID, err := peer.IDFromPrivateKey(unmarshalledPrivateKey) + require.NoError(t, err) + + record := &enr.Record{} + record.Set(peerdas.Csc(custodyCount)) + record.Set(enode.Secp256k1(privateKey.PublicKey)) + + return record, peerID, privateKey +} + +func TestGetValidCustodyPeers(t *testing.T) { + genesisValidatorRoot := make([]byte, 32) + + for i := 0; i < 32; i++ { + genesisValidatorRoot[i] = byte(i) + } + + service := &Service{ + cfg: &Config{}, + genesisTime: time.Now(), + genesisValidatorsRoot: genesisValidatorRoot, + peers: peers.NewStatus(context.Background(), &peers.StatusConfig{ + ScorerParams: &scorers.Config{}, + }), + } + + ipAddrString, err := prysmNetwork.ExternalIPv4() + require.NoError(t, err) + ipAddr := net.ParseIP(ipAddrString) + + custodyRequirement := params.BeaconConfig().CustodyRequirement + dataColumnSidecarSubnetCount := params.BeaconConfig().DataColumnSidecarSubnetCount + + // Peer 1 custodies exactly the same columns than us. + // (We use the same keys pair than ours for simplicity) + peer1Record, peer1ID, localPrivateKey := createPeer(t, 1, custodyRequirement) + + // Peer 2 custodies all the columns. + peer2Record, peer2ID, _ := createPeer(t, 2, dataColumnSidecarSubnetCount) + + // Peer 3 custodies different columns than us (but the same count). + // (We use the same public key than peer 2 for simplicity) + peer3Record, peer3ID, _ := createPeer(t, 3, custodyRequirement) + + // Peer 4 custodies less columns than us. + peer4Record, peer4ID, _ := createPeer(t, 4, custodyRequirement-1) + + listener, err := service.createListener(ipAddr, localPrivateKey) + require.NoError(t, err) + + service.dv5Listener = listener + + service.peers.Add(peer1Record, peer1ID, nil, network.DirOutbound) + service.peers.Add(peer2Record, peer2ID, nil, network.DirOutbound) + service.peers.Add(peer3Record, peer3ID, nil, network.DirOutbound) + service.peers.Add(peer4Record, peer4ID, nil, network.DirOutbound) + + actual, err := service.GetValidCustodyPeers([]peer.ID{peer1ID, peer2ID, peer3ID, peer4ID}) + require.NoError(t, err) + + expected := []peer.ID{peer1ID, peer2ID} + require.DeepSSZEqual(t, expected, actual) +} + +func TestCustodyCountFromRemotePeer(t *testing.T) { + const ( + expected uint64 = 7 + pid = "test-id" + ) + + csc := peerdas.Csc(expected) + + // Define a nil record + var nilRecord *enr.Record = nil + + // Define an empty record (record with non `csc` entry) + emptyRecord := &enr.Record{} + + // Define a nominal record + nominalRecord := &enr.Record{} + nominalRecord.Set(csc) + + testCases := []struct { + name string + record *enr.Record + expected uint64 + }{ + { + name: "nominal", + record: nominalRecord, + expected: expected, + }, + { + name: "nil", + record: nilRecord, + expected: params.BeaconConfig().CustodyRequirement, + }, + { + name: "empty", + record: emptyRecord, + expected: params.BeaconConfig().CustodyRequirement, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create peers status. + peers := peers.NewStatus(context.Background(), &peers.StatusConfig{ + ScorerParams: &scorers.Config{}, + }) + + // Add a new peer with the record. + peers.Add(tc.record, pid, nil, network.DirOutbound) + + // Create a new service. + service := &Service{ + peers: peers, + } + + // Retrieve the custody count from the remote peer. + actual := service.CustodyCountFromRemotePeer(pid) + + // Verify the result. + require.Equal(t, tc.expected, actual) + }) + } + +} diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index 5828e61176e..4cb1416870b 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -44,22 +44,13 @@ const ( udp6 ) -const ( - quickProtocolEnrKey = "quic" - custodySubnetCountEnrKey = "csc" -) +const quickProtocolEnrKey = "quic" -type ( - quicProtocol uint16 - CustodySubnetCount uint64 -) +type quicProtocol uint16 // quicProtocol is the "quic" key, which holds the QUIC port of the node. func (quicProtocol) ENRKey() string { return quickProtocolEnrKey } -// https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/p2p-interface.md#the-discovery-domain-discv5 -func (CustodySubnetCount) ENRKey() string { return custodySubnetCountEnrKey } - // RefreshPersistentSubnets checks that we are tracking our local persistent subnets for a variety of gossip topics. // This routine checks for our attestation, sync committee and data column subnets and updates them if they have // been rotated. @@ -276,7 +267,7 @@ func (s *Service) createLocalNode( } if features.Get().EnablePeerDAS { - localNode.Set(CustodySubnetCount(peerdas.CustodySubnetCount())) + localNode.Set(peerdas.Csc(peerdas.CustodySubnetCount())) } localNode.SetFallbackIP(ipAddr) diff --git a/beacon-chain/p2p/discovery_test.go b/beacon-chain/p2p/discovery_test.go index 47737996524..f187d53f99b 100644 --- a/beacon-chain/p2p/discovery_test.go +++ b/beacon-chain/p2p/discovery_test.go @@ -24,6 +24,7 @@ import ( mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing" "github.com/prysmaticlabs/prysm/v5/beacon-chain/cache" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/peerdata" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/scorers" @@ -237,7 +238,7 @@ func TestCreateLocalNode(t *testing.T) { // Check custody_subnet_count config. custodySubnetCount := new(uint64) - require.NoError(t, localNode.Node().Record().Load(enr.WithEntry(custodySubnetCountEnrKey, custodySubnetCount))) + require.NoError(t, localNode.Node().Record().Load(enr.WithEntry(peerdas.CustodySubnetCountEnrKey, custodySubnetCount))) require.Equal(t, uint64(1), *custodySubnetCount) }) } diff --git a/beacon-chain/p2p/testing/BUILD.bazel b/beacon-chain/p2p/testing/BUILD.bazel index e6c2e8fe903..47a5055545b 100644 --- a/beacon-chain/p2p/testing/BUILD.bazel +++ b/beacon-chain/p2p/testing/BUILD.bazel @@ -17,9 +17,11 @@ go_library( "//beacon-chain:__subpackages__", ], deps = [ + "//beacon-chain/core/peerdas:go_default_library", "//beacon-chain/p2p/encoder:go_default_library", "//beacon-chain/p2p/peers:go_default_library", "//beacon-chain/p2p/peers/scorers:go_default_library", + "//config/params:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//proto/prysm/v1alpha1/metadata:go_default_library", "@com_github_ethereum_go_ethereum//crypto:go_default_library", diff --git a/beacon-chain/p2p/testing/fuzz_p2p.go b/beacon-chain/p2p/testing/fuzz_p2p.go index 0f358d5b8a1..0bc3f9646a3 100644 --- a/beacon-chain/p2p/testing/fuzz_p2p.go +++ b/beacon-chain/p2p/testing/fuzz_p2p.go @@ -28,166 +28,166 @@ func NewFuzzTestP2P() *FakeP2P { } // Encoding -- fake. -func (_ *FakeP2P) Encoding() encoder.NetworkEncoding { +func (*FakeP2P) Encoding() encoder.NetworkEncoding { return &encoder.SszNetworkEncoder{} } // AddConnectionHandler -- fake. -func (_ *FakeP2P) AddConnectionHandler(_, _ func(ctx context.Context, id peer.ID) error) { +func (*FakeP2P) AddConnectionHandler(_, _ func(ctx context.Context, id peer.ID) error) { } // AddDisconnectionHandler -- fake. -func (_ *FakeP2P) AddDisconnectionHandler(_ func(ctx context.Context, id peer.ID) error) { +func (*FakeP2P) AddDisconnectionHandler(_ func(ctx context.Context, id peer.ID) error) { } // AddPingMethod -- fake. -func (_ *FakeP2P) AddPingMethod(_ func(ctx context.Context, id peer.ID) error) { +func (*FakeP2P) AddPingMethod(_ func(ctx context.Context, id peer.ID) error) { } // PeerID -- fake. -func (_ *FakeP2P) PeerID() peer.ID { +func (*FakeP2P) PeerID() peer.ID { return "fake" } // ENR returns the enr of the local peer. -func (_ *FakeP2P) ENR() *enr.Record { +func (*FakeP2P) ENR() *enr.Record { return new(enr.Record) } // NodeID returns the node id of the local peer. -func (_ *FakeP2P) NodeID() enode.ID { +func (*FakeP2P) NodeID() enode.ID { return [32]byte{} } // DiscoveryAddresses -- fake -func (_ *FakeP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { +func (*FakeP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { return nil, nil } // FindPeersWithSubnet mocks the p2p func. -func (_ *FakeP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) { +func (*FakeP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) { return false, nil } // RefreshENR mocks the p2p func. -func (_ *FakeP2P) RefreshPersistentSubnets() {} +func (*FakeP2P) RefreshPersistentSubnets() {} // LeaveTopic -- fake. -func (_ *FakeP2P) LeaveTopic(_ string) error { +func (*FakeP2P) LeaveTopic(_ string) error { return nil } // Metadata -- fake. -func (_ *FakeP2P) Metadata() metadata.Metadata { +func (*FakeP2P) Metadata() metadata.Metadata { return nil } // Peers -- fake. -func (_ *FakeP2P) Peers() *peers.Status { +func (*FakeP2P) Peers() *peers.Status { return nil } // PublishToTopic -- fake. -func (_ *FakeP2P) PublishToTopic(_ context.Context, _ string, _ []byte, _ ...pubsub.PubOpt) error { +func (*FakeP2P) PublishToTopic(_ context.Context, _ string, _ []byte, _ ...pubsub.PubOpt) error { return nil } // Send -- fake. -func (_ *FakeP2P) Send(_ context.Context, _ interface{}, _ string, _ peer.ID) (network.Stream, error) { +func (*FakeP2P) Send(_ context.Context, _ interface{}, _ string, _ peer.ID) (network.Stream, error) { return nil, nil } // PubSub -- fake. -func (_ *FakeP2P) PubSub() *pubsub.PubSub { +func (*FakeP2P) PubSub() *pubsub.PubSub { return nil } // MetadataSeq -- fake. -func (_ *FakeP2P) MetadataSeq() uint64 { +func (*FakeP2P) MetadataSeq() uint64 { return 0 } // SetStreamHandler -- fake. -func (_ *FakeP2P) SetStreamHandler(_ string, _ network.StreamHandler) { +func (*FakeP2P) SetStreamHandler(_ string, _ network.StreamHandler) { } // SubscribeToTopic -- fake. -func (_ *FakeP2P) SubscribeToTopic(_ string, _ ...pubsub.SubOpt) (*pubsub.Subscription, error) { +func (*FakeP2P) SubscribeToTopic(_ string, _ ...pubsub.SubOpt) (*pubsub.Subscription, error) { return nil, nil } // JoinTopic -- fake. -func (_ *FakeP2P) JoinTopic(_ string, _ ...pubsub.TopicOpt) (*pubsub.Topic, error) { +func (*FakeP2P) JoinTopic(_ string, _ ...pubsub.TopicOpt) (*pubsub.Topic, error) { return nil, nil } // Host -- fake. -func (_ *FakeP2P) Host() host.Host { +func (*FakeP2P) Host() host.Host { return nil } // Disconnect -- fake. -func (_ *FakeP2P) Disconnect(_ peer.ID) error { +func (*FakeP2P) Disconnect(_ peer.ID) error { return nil } // Broadcast -- fake. -func (_ *FakeP2P) Broadcast(_ context.Context, _ proto.Message) error { +func (*FakeP2P) Broadcast(_ context.Context, _ proto.Message) error { return nil } // BroadcastAttestation -- fake. -func (_ *FakeP2P) BroadcastAttestation(_ context.Context, _ uint64, _ ethpb.Att) error { +func (*FakeP2P) BroadcastAttestation(_ context.Context, _ uint64, _ ethpb.Att) error { return nil } // BroadcastSyncCommitteeMessage -- fake. -func (_ *FakeP2P) BroadcastSyncCommitteeMessage(_ context.Context, _ uint64, _ *ethpb.SyncCommitteeMessage) error { +func (*FakeP2P) BroadcastSyncCommitteeMessage(_ context.Context, _ uint64, _ *ethpb.SyncCommitteeMessage) error { return nil } // BroadcastBlob -- fake. -func (_ *FakeP2P) BroadcastBlob(_ context.Context, _ uint64, _ *ethpb.BlobSidecar) error { +func (*FakeP2P) BroadcastBlob(_ context.Context, _ uint64, _ *ethpb.BlobSidecar) error { return nil } // BroadcastDataColumn -- fake. -func (_ *FakeP2P) BroadcastDataColumn(_ context.Context, _ uint64, _ *ethpb.DataColumnSidecar) error { +func (*FakeP2P) BroadcastDataColumn(_ context.Context, _ uint64, _ *ethpb.DataColumnSidecar) error { return nil } // InterceptPeerDial -- fake. -func (_ *FakeP2P) InterceptPeerDial(peer.ID) (allow bool) { +func (*FakeP2P) InterceptPeerDial(peer.ID) (allow bool) { return true } // InterceptAddrDial -- fake. -func (_ *FakeP2P) InterceptAddrDial(peer.ID, multiaddr.Multiaddr) (allow bool) { +func (*FakeP2P) InterceptAddrDial(peer.ID, multiaddr.Multiaddr) (allow bool) { return true } // InterceptAccept -- fake. -func (_ *FakeP2P) InterceptAccept(_ network.ConnMultiaddrs) (allow bool) { +func (*FakeP2P) InterceptAccept(_ network.ConnMultiaddrs) (allow bool) { return true } // InterceptSecured -- fake. -func (_ *FakeP2P) InterceptSecured(network.Direction, peer.ID, network.ConnMultiaddrs) (allow bool) { +func (*FakeP2P) InterceptSecured(network.Direction, peer.ID, network.ConnMultiaddrs) (allow bool) { return true } // InterceptUpgraded -- fake. -func (_ *FakeP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.DisconnectReason) { +func (*FakeP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.DisconnectReason) { return true, 0 } -func (_ *FakeP2P) CustodyCountFromRemotePeer(peer.ID) uint64 { +func (*FakeP2P) CustodyCountFromRemotePeer(peer.ID) uint64 { return 0 } -func (_ *FakeP2P) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) { +func (*FakeP2P) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) { return peers, nil } diff --git a/beacon-chain/p2p/testing/mock_host.go b/beacon-chain/p2p/testing/mock_host.go index 38d66533f3c..88c75930a65 100644 --- a/beacon-chain/p2p/testing/mock_host.go +++ b/beacon-chain/p2p/testing/mock_host.go @@ -18,12 +18,12 @@ type MockHost struct { } // ID -- -func (_ *MockHost) ID() peer.ID { +func (*MockHost) ID() peer.ID { return "" } // Peerstore -- -func (_ *MockHost) Peerstore() peerstore.Peerstore { +func (*MockHost) Peerstore() peerstore.Peerstore { return nil } @@ -33,46 +33,46 @@ func (m *MockHost) Addrs() []ma.Multiaddr { } // Network -- -func (_ *MockHost) Network() network.Network { +func (*MockHost) Network() network.Network { return nil } // Mux -- -func (_ *MockHost) Mux() protocol.Switch { +func (*MockHost) Mux() protocol.Switch { return nil } // Connect -- -func (_ *MockHost) Connect(_ context.Context, _ peer.AddrInfo) error { +func (*MockHost) Connect(_ context.Context, _ peer.AddrInfo) error { return nil } // SetStreamHandler -- -func (_ *MockHost) SetStreamHandler(_ protocol.ID, _ network.StreamHandler) {} +func (*MockHost) SetStreamHandler(_ protocol.ID, _ network.StreamHandler) {} // SetStreamHandlerMatch -- -func (_ *MockHost) SetStreamHandlerMatch(protocol.ID, func(id protocol.ID) bool, network.StreamHandler) { +func (*MockHost) SetStreamHandlerMatch(protocol.ID, func(id protocol.ID) bool, network.StreamHandler) { } // RemoveStreamHandler -- -func (_ *MockHost) RemoveStreamHandler(_ protocol.ID) {} +func (*MockHost) RemoveStreamHandler(_ protocol.ID) {} // NewStream -- -func (_ *MockHost) NewStream(_ context.Context, _ peer.ID, _ ...protocol.ID) (network.Stream, error) { +func (*MockHost) NewStream(_ context.Context, _ peer.ID, _ ...protocol.ID) (network.Stream, error) { return nil, nil } // Close -- -func (_ *MockHost) Close() error { +func (*MockHost) Close() error { return nil } // ConnManager -- -func (_ *MockHost) ConnManager() connmgr.ConnManager { +func (*MockHost) ConnManager() connmgr.ConnManager { return nil } // EventBus -- -func (_ *MockHost) EventBus() event.Bus { +func (*MockHost) EventBus() event.Bus { return nil } diff --git a/beacon-chain/p2p/testing/mock_peermanager.go b/beacon-chain/p2p/testing/mock_peermanager.go index 15a6fa266dd..84431d593db 100644 --- a/beacon-chain/p2p/testing/mock_peermanager.go +++ b/beacon-chain/p2p/testing/mock_peermanager.go @@ -21,7 +21,7 @@ type MockPeerManager struct { } // Disconnect . -func (_ *MockPeerManager) Disconnect(peer.ID) error { +func (*MockPeerManager) Disconnect(peer.ID) error { return nil } @@ -54,12 +54,12 @@ func (m MockPeerManager) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { } // RefreshENR . -func (_ MockPeerManager) RefreshPersistentSubnets() {} +func (MockPeerManager) RefreshPersistentSubnets() {} // FindPeersWithSubnet . -func (_ MockPeerManager) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) { +func (MockPeerManager) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) { return true, nil } // AddPingMethod . -func (_ MockPeerManager) AddPingMethod(_ func(ctx context.Context, id peer.ID) error) {} +func (MockPeerManager) AddPingMethod(_ func(ctx context.Context, id peer.ID) error) {} diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index 98db07c4ea7..a15d15bdfc5 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -23,9 +23,11 @@ import ( swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" "github.com/multiformats/go-multiaddr" ssz "github.com/prysmaticlabs/fastssz" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/scorers" + "github.com/prysmaticlabs/prysm/v5/config/params" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/metadata" "github.com/sirupsen/logrus" @@ -51,9 +53,10 @@ type TestP2P struct { } // NewTestP2P initializes a new p2p test service. -func NewTestP2P(t *testing.T) *TestP2P { +func NewTestP2P(t *testing.T, opts ...swarmt.Option) *TestP2P { + opts = append(opts, swarmt.OptDisableQUIC) ctx := context.Background() - h := bhost.NewBlankHost(swarmt.GenSwarm(t, swarmt.OptDisableQUIC)) + h := bhost.NewBlankHost(swarmt.GenSwarm(t, opts...)) ps, err := pubsub.NewFloodSub(ctx, h, pubsub.WithMessageSigning(false), pubsub.WithStrictSignatureVerification(false), @@ -239,7 +242,7 @@ func (p *TestP2P) LeaveTopic(topic string) error { } // Encoding returns ssz encoding. -func (_ *TestP2P) Encoding() encoder.NetworkEncoding { +func (*TestP2P) Encoding() encoder.NetworkEncoding { return &encoder.SszNetworkEncoder{} } @@ -266,17 +269,17 @@ func (p *TestP2P) Host() host.Host { } // ENR returns the enr of the local peer. -func (_ *TestP2P) ENR() *enr.Record { +func (*TestP2P) ENR() *enr.Record { return new(enr.Record) } // NodeID returns the node id of the local peer. -func (_ *TestP2P) NodeID() enode.ID { +func (*TestP2P) NodeID() enode.ID { return [32]byte{} } // DiscoveryAddresses -- -func (_ *TestP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { +func (*TestP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { return nil, nil } @@ -358,7 +361,7 @@ func (p *TestP2P) Send(ctx context.Context, msg interface{}, topic string, pid p } // Started always returns true. -func (_ *TestP2P) Started() bool { +func (*TestP2P) Started() bool { return true } @@ -368,12 +371,12 @@ func (p *TestP2P) Peers() *peers.Status { } // FindPeersWithSubnet mocks the p2p func. -func (_ *TestP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) { +func (*TestP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) { return false, nil } // RefreshENR mocks the p2p func. -func (_ *TestP2P) RefreshPersistentSubnets() {} +func (*TestP2P) RefreshPersistentSubnets() {} // ForkDigest mocks the p2p func. func (p *TestP2P) ForkDigest() ([4]byte, error) { @@ -391,39 +394,54 @@ func (p *TestP2P) MetadataSeq() uint64 { } // AddPingMethod mocks the p2p func. -func (_ *TestP2P) AddPingMethod(_ func(ctx context.Context, id peer.ID) error) { +func (*TestP2P) AddPingMethod(_ func(ctx context.Context, id peer.ID) error) { // no-op } // InterceptPeerDial . -func (_ *TestP2P) InterceptPeerDial(peer.ID) (allow bool) { +func (*TestP2P) InterceptPeerDial(peer.ID) (allow bool) { return true } // InterceptAddrDial . -func (_ *TestP2P) InterceptAddrDial(peer.ID, multiaddr.Multiaddr) (allow bool) { +func (*TestP2P) InterceptAddrDial(peer.ID, multiaddr.Multiaddr) (allow bool) { return true } // InterceptAccept . -func (_ *TestP2P) InterceptAccept(_ network.ConnMultiaddrs) (allow bool) { +func (*TestP2P) InterceptAccept(_ network.ConnMultiaddrs) (allow bool) { return true } // InterceptSecured . -func (_ *TestP2P) InterceptSecured(network.Direction, peer.ID, network.ConnMultiaddrs) (allow bool) { +func (*TestP2P) InterceptSecured(network.Direction, peer.ID, network.ConnMultiaddrs) (allow bool) { return true } // InterceptUpgraded . -func (_ *TestP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.DisconnectReason) { +func (*TestP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.DisconnectReason) { return true, 0 } -func (_ *TestP2P) CustodyCountFromRemotePeer(peer.ID) uint64 { - return 0 +func (s *TestP2P) CustodyCountFromRemotePeer(pid peer.ID) uint64 { + // By default, we assume the peer custodies the minimum number of subnets. + custodyRequirement := params.BeaconConfig().CustodyRequirement + + // Retrieve the ENR of the peer. + record, err := s.peers.ENR(pid) + if err != nil { + return custodyRequirement + } + + // Retrieve the custody subnets count from the ENR. + custodyCount, err := peerdas.CustodyCountFromRecord(record) + if err != nil { + return custodyRequirement + } + + return custodyCount } -func (_ *TestP2P) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) { +func (*TestP2P) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) { return peers, nil } diff --git a/beacon-chain/p2p/utils_test.go b/beacon-chain/p2p/utils_test.go index 7cbb4d40abe..fe9b2246afc 100644 --- a/beacon-chain/p2p/utils_test.go +++ b/beacon-chain/p2p/utils_test.go @@ -6,6 +6,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/libp2p/go-libp2p/core/peer" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/testing/assert" "github.com/prysmaticlabs/prysm/v5/testing/require" @@ -64,3 +65,19 @@ func TestSerializeENR(t *testing.T) { assert.ErrorContains(t, "could not serialize nil record", err) }) } + +func TestConvertPeerIDToNodeID(t *testing.T) { + const ( + peerIDStr = "16Uiu2HAmRrhnqEfybLYimCiAYer2AtZKDGamQrL1VwRCyeh2YiFc" + expectedNodeIDStr = "eed26c5d2425ab95f57246a5dca87317c41cacee4bcafe8bbe57e5965527c290" + ) + + peerID, err := peer.Decode(peerIDStr) + require.NoError(t, err) + + actualNodeID, err := ConvertPeerIDToNodeID(peerID) + require.NoError(t, err) + + actualNodeIDStr := actualNodeID.String() + require.Equal(t, expectedNodeIDStr, actualNodeIDStr) +} diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index a7fffb46241..611a1c13005 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -126,11 +126,8 @@ go_library( "//runtime/version:go_default_library", "//time:go_default_library", "//time/slots:go_default_library", - "@com_github_btcsuite_btcd_btcec_v2//:go_default_library", "@com_github_ethereum_c_kzg_4844//bindings/go:go_default_library", "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", - "@com_github_ethereum_go_ethereum//common/math:go_default_library", - "@com_github_ethereum_go_ethereum//crypto:go_default_library", "@com_github_hashicorp_golang_lru//:go_default_library", "@com_github_libp2p_go_libp2p//core:go_default_library", "@com_github_libp2p_go_libp2p//core/host:go_default_library", @@ -161,6 +158,7 @@ go_test( "block_batcher_test.go", "broadcast_bls_changes_test.go", "context_test.go", + "data_columns_sampling_test.go", "decode_pubsub_test.go", "error_test.go", "fork_watcher_test.go", @@ -208,6 +206,7 @@ go_test( "//beacon-chain/core/feed:go_default_library", "//beacon-chain/core/feed/operation:go_default_library", "//beacon-chain/core/helpers:go_default_library", + "//beacon-chain/core/peerdas:go_default_library", "//beacon-chain/core/signing:go_default_library", "//beacon-chain/core/time:go_default_library", "//beacon-chain/core/transition:go_default_library", @@ -252,6 +251,7 @@ go_test( "//proto/prysm/v1alpha1:go_default_library", "//proto/prysm/v1alpha1/attestation:go_default_library", "//proto/prysm/v1alpha1/metadata:go_default_library", + "//runtime/version:go_default_library", "//testing/assert:go_default_library", "//testing/require:go_default_library", "//testing/util:go_default_library", @@ -263,9 +263,11 @@ go_test( "@com_github_ethereum_go_ethereum//p2p/enr:go_default_library", "@com_github_golang_snappy//:go_default_library", "@com_github_libp2p_go_libp2p//core:go_default_library", + "@com_github_libp2p_go_libp2p//core/crypto:go_default_library", "@com_github_libp2p_go_libp2p//core/network:go_default_library", "@com_github_libp2p_go_libp2p//core/peer:go_default_library", "@com_github_libp2p_go_libp2p//core/protocol:go_default_library", + "@com_github_libp2p_go_libp2p//p2p/net/swarm/testing:go_default_library", "@com_github_libp2p_go_libp2p_pubsub//:go_default_library", "@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library", "@com_github_patrickmn_go_cache//:go_default_library", diff --git a/beacon-chain/sync/data_columns_sampling.go b/beacon-chain/sync/data_columns_sampling.go index 5c601928b52..156d608289d 100644 --- a/beacon-chain/sync/data_columns_sampling.go +++ b/beacon-chain/sync/data_columns_sampling.go @@ -5,9 +5,6 @@ import ( "fmt" "sort" - "github.com/btcsuite/btcd/btcec/v2" - "github.com/ethereum/go-ethereum/common/math" - "github.com/ethereum/go-ethereum/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -15,6 +12,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed" statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" "github.com/prysmaticlabs/prysm/v5/config/params" @@ -23,21 +21,29 @@ import ( "github.com/prysmaticlabs/prysm/v5/runtime/version" ) -// reandomIntegers returns a map of `count` random integers in the range [0, max[. -func randomIntegers(count uint64, max uint64) map[uint64]bool { - result := make(map[uint64]bool, count) - randGenerator := rand.NewGenerator() +type roundSummary struct { + RequestedColumns []uint64 + MissingColumns map[uint64]bool +} - for uint64(len(result)) < count { - n := randGenerator.Uint64() % max - result[n] = true +// randomizeColumns returns a slice containing all columns in a random order. +func randomizeColumns(columns map[uint64]bool) []uint64 { + // Create a slice from columns. + randomized := make([]uint64, 0, len(columns)) + for column := range columns { + randomized = append(randomized, column) } - return result + // Shuffle the slice. + rand.NewGenerator().Shuffle(len(randomized), func(i, j int) { + randomized[i], randomized[j] = randomized[j], randomized[i] + }) + + return randomized } -// sortedListFromMap returns a sorted list of keys from a map. -func sortedListFromMap(m map[uint64]bool) []uint64 { +// sortedSliceFromMap returns a sorted slices of keys from a map. +func sortedSliceFromMap(m map[uint64]bool) []uint64 { result := make([]uint64, 0, len(m)) for k := range m { result = append(result, k) @@ -50,198 +56,245 @@ func sortedListFromMap(m map[uint64]bool) []uint64 { return result } -// extractNodeID extracts the node ID from a peer ID. -func extractNodeID(pid peer.ID) ([32]byte, error) { - var nodeID [32]byte - - // Retrieve the public key object of the peer under "crypto" form. - pubkeyObjCrypto, err := pid.ExtractPublicKey() - if err != nil { - return nodeID, errors.Wrap(err, "extract public key") - } - - // Extract the bytes representation of the public key. - compressedPubKeyBytes, err := pubkeyObjCrypto.Raw() - if err != nil { - return nodeID, errors.Wrap(err, "public key raw") - } - - // Retrieve the public key object of the peer under "SECP256K1" form. - pubKeyObjSecp256k1, err := btcec.ParsePubKey(compressedPubKeyBytes) - if err != nil { - return nodeID, errors.Wrap(err, "parse public key") - } - - // Concatenate the X and Y coordinates represented in bytes. - buf := make([]byte, 64) - math.ReadBits(pubKeyObjSecp256k1.X(), buf[:32]) - math.ReadBits(pubKeyObjSecp256k1.Y(), buf[32:]) - - // Get the node ID by hashing the concatenated X and Y coordinates. - nodeIDBytes := crypto.Keccak256(buf) - copy(nodeID[:], nodeIDBytes) - - return nodeID, nil -} - -// sampleDataColumnFromPeer samples data columns from a peer. -// It returns the missing columns after sampling. -func (s *Service) sampleDataColumnFromPeer( - pid peer.ID, - columnsToSample map[uint64]bool, - requestedRoot [fieldparams.RootLength]byte, -) (map[uint64]bool, error) { - // Define missing columns. - missingColumns := make(map[uint64]bool, len(columnsToSample)) - for index := range columnsToSample { - missingColumns[index] = true - } - +// custodyColumnsFromPeer returns the columns the peer should custody. +func (s *Service) custodyColumnsFromPeer(pid peer.ID) (map[uint64]bool, error) { // Retrieve the custody count of the peer. - peerCustodiedSubnetCount := s.cfg.p2p.CustodyCountFromRemotePeer(pid) + custodySubnetCount := s.cfg.p2p.CustodyCountFromRemotePeer(pid) // Extract the node ID from the peer ID. - nodeID, err := extractNodeID(pid) + nodeID, err := p2p.ConvertPeerIDToNodeID(pid) if err != nil { return nil, errors.Wrap(err, "extract node ID") } // Determine which columns the peer should custody. - peerCustodiedColumns, err := peerdas.CustodyColumns(nodeID, peerCustodiedSubnetCount) + custodyColumns, err := peerdas.CustodyColumns(nodeID, custodySubnetCount) if err != nil { return nil, errors.Wrap(err, "custody columns") } - peerCustodiedColumnsList := sortedListFromMap(peerCustodiedColumns) - - // Compute the intersection of the columns to sample and the columns the peer should custody. - peerRequestedColumns := make(map[uint64]bool, len(columnsToSample)) - for column := range columnsToSample { - if peerCustodiedColumns[column] { - peerRequestedColumns[column] = true - } - } - - peerRequestedColumnsList := sortedListFromMap(peerRequestedColumns) + return custodyColumns, nil +} - // Get the data column identifiers to sample from this peer. - dataColumnIdentifiers := make(types.DataColumnSidecarsByRootReq, 0, len(peerRequestedColumns)) - for index := range peerRequestedColumns { +// sampleDataColumnsFromPeer samples data columns from a peer. +// It filters out columns that were not requested and columns with incorrect root. +// It returns the retrieved columns. +func (s *Service) sampleDataColumnsFromPeer( + pid peer.ID, + requestedColumns map[uint64]bool, + root [fieldparams.RootLength]byte, +) (map[uint64]bool, error) { + // Build the data column identifiers. + dataColumnIdentifiers := make(types.DataColumnSidecarsByRootReq, 0, len(requestedColumns)) + for index := range requestedColumns { dataColumnIdentifiers = append(dataColumnIdentifiers, ð.DataColumnIdentifier{ - BlockRoot: requestedRoot[:], + BlockRoot: root[:], ColumnIndex: index, }) } - // Return early if there are no data columns to sample. - if len(dataColumnIdentifiers) == 0 { - log.WithFields(logrus.Fields{ - "peerID": pid, - "custodiedColumns": peerCustodiedColumnsList, - "requestedColumns": peerRequestedColumnsList, - }).Debug("Peer does not custody any of the requested columns") - return columnsToSample, nil - } - - // Sample data columns. + // Send the request. roDataColumns, err := SendDataColumnSidecarByRoot(s.ctx, s.cfg.clock, s.cfg.p2p, pid, s.ctxMap, &dataColumnIdentifiers) if err != nil { return nil, errors.Wrap(err, "send data column sidecar by root") } - peerRetrievedColumns := make(map[uint64]bool, len(roDataColumns)) + retrievedColumns := make(map[uint64]bool, len(roDataColumns)) // Remove retrieved items from rootsByDataColumnIndex. for _, roDataColumn := range roDataColumns { retrievedColumn := roDataColumn.ColumnIndex actualRoot := roDataColumn.BlockRoot() - if actualRoot != requestedRoot { + + // Filter out columns with incorrect root. + if actualRoot != root { // TODO: Should we decrease the peer score here? log.WithFields(logrus.Fields{ "peerID": pid, - "requestedRoot": fmt.Sprintf("%#x", requestedRoot), + "requestedRoot": fmt.Sprintf("%#x", root), "actualRoot": fmt.Sprintf("%#x", actualRoot), }).Warning("Actual root does not match requested root") continue } - peerRetrievedColumns[retrievedColumn] = true - - if !columnsToSample[retrievedColumn] { + // Filter out columns that were not requested. + if !requestedColumns[retrievedColumn] { // TODO: Should we decrease the peer score here? + columnsToSampleList := sortedSliceFromMap(requestedColumns) + log.WithFields(logrus.Fields{ "peerID": pid, + "requestedColumns": columnsToSampleList, "retrievedColumn": retrievedColumn, - "requestedColumns": peerRequestedColumnsList, - }).Warning("Retrieved column is was not requested") + }).Warning("Retrieved column was not requested") + + continue } - delete(missingColumns, retrievedColumn) + retrievedColumns[retrievedColumn] = true } - peerRetrievedColumnsList := sortedListFromMap(peerRetrievedColumns) - remainingMissingColumnsList := sortedListFromMap(missingColumns) + if len(retrievedColumns) == len(requestedColumns) { + // This is the happy path. + log.WithFields(logrus.Fields{ + "peerID": pid, + "root": fmt.Sprintf("%#x", root), + "requestedColumns": sortedSliceFromMap(requestedColumns), + }).Debug("All requested columns were successfully sampled from peer") + return retrievedColumns, nil + } + + // Some columns are missing. log.WithFields(logrus.Fields{ - "peerID": pid, - "custodiedColumns": peerCustodiedColumnsList, - "requestedColumns": peerRequestedColumnsList, - "retrievedColumns": peerRetrievedColumnsList, - "remainingMissingColumns": remainingMissingColumnsList, - }).Debug("Peer data column sampling summary") - - return missingColumns, nil -} + "peerID": pid, + "root": fmt.Sprintf("%#x", root), + "requestedColumns": sortedSliceFromMap(requestedColumns), + "retrievedColumns": sortedSliceFromMap(retrievedColumns), + }).Warning("Some requested columns were not sampled from peer") -// sampleDataColumns samples data columns from active peers. -func (s *Service) sampleDataColumns(requestedRoot [fieldparams.RootLength]byte, samplesCount uint64) error { - // Determine `samplesCount` random column indexes. - requestedColumns := randomIntegers(samplesCount, params.BeaconConfig().NumberOfColumns) + return retrievedColumns, nil +} - missingColumns := make(map[uint64]bool, len(requestedColumns)) - for index := range requestedColumns { - missingColumns[index] = true +// sampleDataColumnsFromPeers samples data columns from active peers. +// It returns the retrieved columns count. +// If one peer fails to return a column it should custody, the column is considered as missing. +func (s *Service) sampleDataColumnsFromPeers( + columnsToSample []uint64, + root [fieldparams.RootLength]byte, +) (map[uint64]bool, error) { + // Build all remaining columns to sample. + remainingColumnsToSample := make(map[uint64]bool, len(columnsToSample)) + for _, column := range columnsToSample { + remainingColumnsToSample[column] = true } // Get the active peers from the p2p service. - activePeers := s.cfg.p2p.Peers().Active() + activePids := s.cfg.p2p.Peers().Active() - var err error + retrievedColumns := make(map[uint64]bool, len(columnsToSample)) - // Sampling is done sequentially peer by peer. - // TODO: Add parallelism if (probably) needed. - for _, pid := range activePeers { - // Early exit if all needed columns are already sampled. (This is the happy path.) - if len(missingColumns) == 0 { - break + // Query all peers until either all columns to request are retrieved or all active peers are queried (whichever comes first). + for i := 0; len(remainingColumnsToSample) > 0 && i < len(activePids); i++ { + // Get the peer ID. + pid := activePids[i] + + // Get the custody columns of the peer. + peerCustodyColumns, err := s.custodyColumnsFromPeer(pid) + if err != nil { + return nil, errors.Wrap(err, "custody columns from peer") + } + + // Compute the intersection of the peer custody columns and the remaining columns to request. + peerRequestedColumns := make(map[uint64]bool, len(peerCustodyColumns)) + for column := range remainingColumnsToSample { + if peerCustodyColumns[column] { + peerRequestedColumns[column] = true + } + } + + // Remove the newsly requested columns from the remaining columns to request. + for column := range peerRequestedColumns { + delete(remainingColumnsToSample, column) } // Sample data columns from the peer. - missingColumns, err = s.sampleDataColumnFromPeer(pid, missingColumns, requestedRoot) + peerRetrievedColumns, err := s.sampleDataColumnsFromPeer(pid, peerRequestedColumns, root) if err != nil { - return errors.Wrap(err, "sample data column from peer") + return nil, errors.Wrap(err, "sample data columns from peer") + } + + // Update the retrieved columns. + for column := range peerRetrievedColumns { + retrievedColumns[column] = true } } - requestedColumnsList := sortedListFromMap(requestedColumns) + return retrievedColumns, nil +} - if len(missingColumns) == 0 { - log.WithField("requestedColumns", requestedColumnsList).Debug("Successfully sampled all requested columns") - return nil - } +// incrementalDAS samples data columns from active peers using incremental DAS. +// https://ethresear.ch/t/lossydas-lossy-incremental-and-diagonal-sampling-for-data-availability/18963#incrementaldas-dynamically-increase-the-sample-size-10 +func (s *Service) incrementalDAS( + root [fieldparams.RootLength]byte, + columns []uint64, + sampleCount uint64, +) (bool, []roundSummary, error) { + columnsCount, missingColumnsCount := uint64(len(columns)), uint64(0) + firstColumnToSample, extendedSampleCount := uint64(0), peerdas.ExtendedSampleCount(sampleCount, 0) + + roundSummaries := make([]roundSummary, 0, 1) // We optimistically allocate only one round summary. + + for round := 1; ; /*No exit condition */ round++ { + if extendedSampleCount > columnsCount { + // We already tried to sample all possible columns, this is the unhappy path. + log.WithField("root", fmt.Sprintf("%#x", root)).Warning("Some columns are still missing after sampling all possible columns") + return false, roundSummaries, nil + } - missingColumnsList := sortedListFromMap(missingColumns) - log.WithFields(logrus.Fields{ - "requestedColumns": requestedColumnsList, - "missingColumns": missingColumnsList, - }).Warning("Failed to sample some requested columns") + // Get the columns to sample for this round. + columnsToSample := columns[firstColumnToSample:extendedSampleCount] + columnsToSampleCount := extendedSampleCount - firstColumnToSample + + // Sample the data columns from the peers. + retrievedSamples, err := s.sampleDataColumnsFromPeers(columnsToSample, root) + if err != nil { + return false, nil, errors.Wrap(err, "sample data columns from peers") + } + + // Compute the missing samples. + missingSamples := make(map[uint64]bool, max(0, len(columnsToSample)-len(retrievedSamples))) + for _, column := range columnsToSample { + if !retrievedSamples[column] { + missingSamples[column] = true + } + } + + roundSummaries = append(roundSummaries, roundSummary{ + RequestedColumns: columnsToSample, + MissingColumns: missingSamples, + }) - return nil + retrievedSampleCount := uint64(len(retrievedSamples)) + + if retrievedSampleCount == columnsToSampleCount { + // All columns were correctly sampled, this is the happy path. + log.WithFields(logrus.Fields{ + "root": fmt.Sprintf("%#x", root), + "roundsNeeded": round, + }).Debug("All columns were successfully sampled") + return true, roundSummaries, nil + } + + if retrievedSampleCount > columnsToSampleCount { + // This should never happen. + return false, nil, errors.New("retrieved more columns than requested") + } + + // Some columns are missing, we need to extend the sample size. + missingColumnsCount += columnsToSampleCount - retrievedSampleCount + + firstColumnToSample = extendedSampleCount + oldExtendedSampleCount := extendedSampleCount + extendedSampleCount = peerdas.ExtendedSampleCount(sampleCount, missingColumnsCount) + + log.WithFields(logrus.Fields{ + "root": fmt.Sprintf("%#x", root), + "round": round, + "missingColumnsCount": missingColumnsCount, + "currentSampleCount": oldExtendedSampleCount, + "nextSampleCount": extendedSampleCount, + }).Debug("Some columns are still missing after sampling this round.") + } } -func (s *Service) dataColumnSampling(ctx context.Context) { +// DataColumnSamplingRoutine runs incremental DAS on block when received. +func (s *Service) DataColumnSamplingRoutine(ctx context.Context) { + // Get the custody subnets count. + custodySubnetsCount := peerdas.CustodySubnetCount() + // Create a subscription to the state feed. stateChannel := make(chan *feed.Event, 1) stateSub := s.cfg.stateNotifier.StateFeed().Subscribe(stateChannel) @@ -249,6 +302,37 @@ func (s *Service) dataColumnSampling(ctx context.Context) { // Unsubscribe from the state feed when the function returns. defer stateSub.Unsubscribe() + // Retrieve the number of columns. + columnsCount := params.BeaconConfig().NumberOfColumns + + // Retrieve all columns we custody. + custodyColumns, err := peerdas.CustodyColumns(s.cfg.p2p.NodeID(), custodySubnetsCount) + if err != nil { + log.WithError(err).Error("Failed to get custody columns") + return + } + + custodyColumnsCount := uint64(len(custodyColumns)) + + // Compute the number of columns to sample. + if custodyColumnsCount >= columnsCount/2 { + log.WithFields(logrus.Fields{ + "custodyColumnsCount": custodyColumnsCount, + "columnsCount": columnsCount, + }).Debug("The node custodies at least the half the data columns, no need to sample") + return + } + + samplesCount := min(params.BeaconConfig().SamplesPerSlot, columnsCount/2-custodyColumnsCount) + + // Compute all the columns we do NOT custody. + nonCustodyColums := make(map[uint64]bool, columnsCount-custodyColumnsCount) + for i := uint64(0); i < columnsCount; i++ { + if !custodyColumns[i] { + nonCustodyColums[i] = true + } + } + for { select { case e := <-stateChannel: @@ -286,11 +370,27 @@ func (s *Service) dataColumnSampling(ctx context.Context) { continue } - dataColumnSamplingCount := params.BeaconConfig().SamplesPerSlot + // Ramdomize all columns. + randomizedColumns := randomizeColumns(nonCustodyColums) + + // Sample data columns with incremental DAS. + ok, _, err = s.incrementalDAS(data.BlockRoot, randomizedColumns, samplesCount) + if err != nil { + log.WithError(err).Error("Error during incremental DAS") + } - // Sample data columns. - if err := s.sampleDataColumns(data.BlockRoot, dataColumnSamplingCount); err != nil { - log.WithError(err).Error("Failed to sample data columns") + if ok { + log.WithFields(logrus.Fields{ + "root": fmt.Sprintf("%#x", data.BlockRoot), + "columns": randomizedColumns, + "sampleCount": samplesCount, + }).Debug("Data column sampling successful") + } else { + log.WithFields(logrus.Fields{ + "root": fmt.Sprintf("%#x", data.BlockRoot), + "columns": randomizedColumns, + "sampleCount": samplesCount, + }).Warning("Data column sampling failed") } case <-s.ctx.Done(): diff --git a/beacon-chain/sync/data_columns_sampling_test.go b/beacon-chain/sync/data_columns_sampling_test.go new file mode 100644 index 00000000000..436cb3068d1 --- /dev/null +++ b/beacon-chain/sync/data_columns_sampling_test.go @@ -0,0 +1,224 @@ +package sync + +import ( + "context" + "testing" + + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/network" + swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" + mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers" + p2ptest "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing" + p2pTypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" + fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" + ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v5/runtime/version" + "github.com/prysmaticlabs/prysm/v5/testing/require" +) + +func TestRandomizeColumns(t *testing.T) { + const count uint64 = 128 + + // Generate columns. + columns := make(map[uint64]bool, count) + for i := uint64(0); i < count; i++ { + columns[i] = true + } + + // Randomize columns. + randomizedColumns := randomizeColumns(columns) + + // Convert back to a map. + randomizedColumnsMap := make(map[uint64]bool, count) + for _, column := range randomizedColumns { + randomizedColumnsMap[column] = true + } + + // Check duplicates and missing columns. + require.Equal(t, len(columns), len(randomizedColumnsMap)) + + // Check the values. + for column := range randomizedColumnsMap { + require.Equal(t, true, column < count) + } +} + +// createAndConnectPeer creates a peer with a private key `offset` fixed. +// The peer is added and connected to `p2pService` +func createAndConnectPeer( + t *testing.T, + p2pService *p2ptest.TestP2P, + chainService *mock.ChainService, + header *ethpb.BeaconBlockHeader, + custodyCount uint64, + columnsNotToRespond map[uint64]bool, + offset int, +) { + emptyRoot := [fieldparams.RootLength]byte{} + emptySignature := [fieldparams.BLSSignatureLength]byte{} + emptyKzgCommitmentInclusionProof := [4][]byte{ + emptyRoot[:], emptyRoot[:], emptyRoot[:], emptyRoot[:], + } + + // Create the private key, depending on the offset. + privateKeyBytes := make([]byte, 32) + for i := 0; i < 32; i++ { + privateKeyBytes[i] = byte(offset + i) + } + + privateKey, err := crypto.UnmarshalSecp256k1PrivateKey(privateKeyBytes) + require.NoError(t, err) + + // Create the peer. + peer := p2ptest.NewTestP2P(t, swarmt.OptPeerPrivateKey(privateKey)) + + // TODO: Do not hardcode the topic. + peer.SetStreamHandler("/eth2/beacon_chain/req/data_column_sidecars_by_root/1/ssz_snappy", func(stream network.Stream) { + // Decode the request. + req := new(p2pTypes.DataColumnSidecarsByRootReq) + err := peer.Encoding().DecodeWithMaxLength(stream, req) + require.NoError(t, err) + + for _, identifier := range *req { + // Filter out the columns not to respond. + if columnsNotToRespond[identifier.ColumnIndex] { + continue + } + + // Create the response. + resp := ethpb.DataColumnSidecar{ + ColumnIndex: identifier.ColumnIndex, + SignedBlockHeader: ðpb.SignedBeaconBlockHeader{ + Header: header, + Signature: emptySignature[:], + }, + KzgCommitmentsInclusionProof: emptyKzgCommitmentInclusionProof[:], + } + + // Send the response. + err := WriteDataColumnSidecarChunk(stream, chainService, p2pService.Encoding(), &resp) + require.NoError(t, err) + } + + // Close the stream. + closeStream(stream, log) + }) + + // Create the record and set the custody count. + enr := &enr.Record{} + enr.Set(peerdas.Csc(custodyCount)) + + // Add the peer and connect it. + p2pService.Peers().Add(enr, peer.PeerID(), nil, network.DirOutbound) + p2pService.Peers().SetConnectionState(peer.PeerID(), peers.PeerConnected) + p2pService.Connect(peer) +} + +func TestIncrementalDAS(t *testing.T) { + const custodyRequirement uint64 = 1 + + emptyRoot := [fieldparams.RootLength]byte{} + emptyHeader := ðpb.BeaconBlockHeader{ + ParentRoot: emptyRoot[:], + StateRoot: emptyRoot[:], + BodyRoot: emptyRoot[:], + } + + emptyHeaderRoot, err := emptyHeader.HashTreeRoot() + require.NoError(t, err) + + testCases := []struct { + name string + samplesCount uint64 + possibleColumnsToRequest []uint64 + columnsNotToRespond map[uint64]bool + expectedSuccess bool + expectedRoundSummaries []roundSummary + }{ + { + name: "All columns are correctly sampled in a single round", + samplesCount: 5, + possibleColumnsToRequest: []uint64{70, 35, 99, 6, 38, 3, 67, 102, 12, 44, 76, 108}, + columnsNotToRespond: map[uint64]bool{}, + expectedSuccess: true, + expectedRoundSummaries: []roundSummary{ + { + RequestedColumns: []uint64{70, 35, 99, 6, 38}, + MissingColumns: map[uint64]bool{}, + }, + }, + }, + { + name: "Two missing columns in the first round, ok in the second round", + samplesCount: 5, + possibleColumnsToRequest: []uint64{70, 35, 99, 6, 38, 3, 67, 102, 12, 44, 76, 108}, + columnsNotToRespond: map[uint64]bool{6: true, 70: true}, + expectedSuccess: true, + expectedRoundSummaries: []roundSummary{ + { + RequestedColumns: []uint64{70, 35, 99, 6, 38}, + MissingColumns: map[uint64]bool{70: true, 6: true}, + }, + { + RequestedColumns: []uint64{3, 67, 102, 12, 44, 76}, + MissingColumns: map[uint64]bool{}, + }, + }, + }, + { + name: "Two missing columns in the first round, one missing in the second round. Fail to sample.", + samplesCount: 5, + possibleColumnsToRequest: []uint64{70, 35, 99, 6, 38, 3, 67, 102, 12, 44, 76, 108}, + columnsNotToRespond: map[uint64]bool{6: true, 70: true, 3: true}, + expectedSuccess: false, + expectedRoundSummaries: []roundSummary{ + { + RequestedColumns: []uint64{70, 35, 99, 6, 38}, + MissingColumns: map[uint64]bool{70: true, 6: true}, + }, + { + RequestedColumns: []uint64{3, 67, 102, 12, 44, 76}, + MissingColumns: map[uint64]bool{3: true}, + }, + }, + }, + } + + for _, tc := range testCases { + // Create a context. + ctx := context.Background() + + // Create the p2p service. + p2pService := p2ptest.NewTestP2P(t) + + // Create a peer custodying `custodyRequirement` subnets. + chainService, clock := defaultMockChain(t) + + // Custody columns: [6, 38, 70, 102] + createAndConnectPeer(t, p2pService, chainService, emptyHeader, custodyRequirement, tc.columnsNotToRespond, 1) + + // Custody columns: [3, 35, 67, 99] + createAndConnectPeer(t, p2pService, chainService, emptyHeader, custodyRequirement, tc.columnsNotToRespond, 2) + + // Custody columns: [12, 44, 76, 108] + createAndConnectPeer(t, p2pService, chainService, emptyHeader, custodyRequirement, tc.columnsNotToRespond, 3) + + service := &Service{ + cfg: &config{ + p2p: p2pService, + clock: clock, + }, + ctx: ctx, + ctxMap: map[[4]byte]int{{245, 165, 253, 66}: version.Deneb}, + } + + actualSuccess, actualRoundSummaries, err := service.incrementalDAS(emptyHeaderRoot, tc.possibleColumnsToRequest, tc.samplesCount) + + require.NoError(t, err) + require.Equal(t, tc.expectedSuccess, actualSuccess) + require.DeepEqual(t, tc.expectedRoundSummaries, actualRoundSummaries) + } +} diff --git a/beacon-chain/sync/rpc_data_column_sidecars_by_root.go b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go index 8ede2106fe3..4dbd344f9cd 100644 --- a/beacon-chain/sync/rpc_data_column_sidecars_by_root.go +++ b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go @@ -44,7 +44,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int } requestedColumnIdents := *ref - if err := validateDataColummnsByRootRequest(requestedColumnIdents); err != nil { + if err := validateDataColumnsByRootRequest(requestedColumnIdents); err != nil { s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream) return errors.Wrap(err, "validate data columns by root request") @@ -201,7 +201,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int return nil } -func validateDataColummnsByRootRequest(colIdents types.DataColumnSidecarsByRootReq) error { +func validateDataColumnsByRootRequest(colIdents types.DataColumnSidecarsByRootReq) error { if uint64(len(colIdents)) > params.BeaconConfig().MaxRequestDataColumnSidecars { return types.ErrMaxDataColumnReqExceeded } diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 7e195fc2fea..82cc534b7f0 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -254,7 +254,7 @@ func (s *Service) Start() { // Run data column sampling if features.Get().EnablePeerDAS { - go s.dataColumnSampling(s.ctx) + go s.DataColumnSamplingRoutine(s.ctx) } }