Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PeerDAS: Implement IncrementalDAS #14109

Merged
merged 12 commits into from
Jun 21, 2024
1 change: 1 addition & 0 deletions beacon-chain/core/peerdas/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//proto/prysm/v1alpha1:go_default_library",
"@com_github_ethereum_c_kzg_4844//bindings/go:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_holiman_uint256//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
],
Expand Down
85 changes: 81 additions & 4 deletions beacon-chain/core/peerdas/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package peerdas
import (
"encoding/binary"
"math"
"math/big"

cKzg4844 "github.com/ethereum/c-kzg-4844/bindings/go"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/holiman/uint256"
errors "github.com/pkg/errors"

Expand All @@ -19,13 +21,24 @@ import (
)

// Bytes per cell
const bytesPerCell = cKzg4844.FieldElementsPerCell * cKzg4844.BytesPerFieldElement
const (
CustodySubnetCountEnrKey = "csc"

bytesPerCell = cKzg4844.FieldElementsPerCell * cKzg4844.BytesPerFieldElement
)

// https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/p2p-interface.md#the-discovery-domain-discv5
type Csc uint64

func (Csc) ENRKey() string { return CustodySubnetCountEnrKey }

var (
// Custom errors
errCustodySubnetCountTooLarge = errors.New("custody subnet count larger than data column sidecar subnet count")
errIndexTooLarge = errors.New("column index is larger than the specified number of columns")
errMismatchLength = errors.New("mismatch in the length of the commitments and proofs")
errCustodySubnetCountTooLarge = errors.New("custody subnet count larger than data column sidecar subnet count")
errIndexTooLarge = errors.New("column index is larger than the specified columns count")
errMismatchLength = errors.New("mismatch in the length of the commitments and proofs")
errRecordNil = errors.New("record is nil")
errCannotLoadCustodySubnetCount = errors.New("cannot load the custody subnet count from peer")

// maxUint256 is the maximum value of a uint256.
maxUint256 = &uint256.Int{math.MaxUint64, math.MaxUint64, math.MaxUint64, math.MaxUint64}
Expand Down Expand Up @@ -311,3 +324,67 @@ func CustodySubnetCount() uint64 {
}
return count
}

// HypergeomCDF computes the hypergeometric cumulative distribution function.
// https://en.wikipedia.org/wiki/Hypergeometric_distribution
func HypergeomCDF(k, M, n, N uint64) float64 {
denominatorInt := new(big.Int).Binomial(int64(M), int64(N)) // lint:ignore uintcast
denominator := new(big.Float).SetInt(denominatorInt)

rBig := big.NewFloat(0)

for i := uint64(0); i < k+1; i++ {
a := new(big.Int).Binomial(int64(n), int64(i)) // lint:ignore uintcast
b := new(big.Int).Binomial(int64(M-n), int64(N-i))
numeratorInt := new(big.Int).Mul(a, b)
numerator := new(big.Float).SetInt(numeratorInt)
item := new(big.Float).Quo(numerator, denominator)
rBig.Add(rBig, item)
}

r, _ := rBig.Float64()

return r
}

// ExtendedSampleCount computes, for a given number of samples per slot and allowed failures the
// number of samples we should actually query from peers.
// TODO: Add link to the specification once it is available.
func ExtendedSampleCount(samplesPerSlot, allowedFailures uint64) uint64 {
// Retrieve the columns count
columnsCount := params.BeaconConfig().NumberOfColumns

// If half of the columns are missing, we are able to reconstruct the data.
// If half of the columns + 1 are missing, we are not able to reconstruct the data.
// This is the smallest worst case.
worstCaseMissing := columnsCount/2 + 1

// Compute the false positive threshold.
falsePositiveThreshold := HypergeomCDF(0, columnsCount, worstCaseMissing, samplesPerSlot)

var sampleCount uint64

// Finally, compute the extended sample count.
for sampleCount = samplesPerSlot; sampleCount < columnsCount+1; sampleCount++ {
if HypergeomCDF(allowedFailures, columnsCount, worstCaseMissing, sampleCount) <= falsePositiveThreshold {
break
}
}

return sampleCount
}

func CustodyCountFromRecord(record *enr.Record) (uint64, error) {
// By default, we assume the peer custodies the minimum number of subnets.
if record == nil {
return 0, errRecordNil
}

// Load the `custody_subnet_count`
var csc Csc
if err := record.Load(&csc); err != nil {
return 0, errCannotLoadCustodySubnetCount
}

return uint64(csc), nil
}
52 changes: 52 additions & 0 deletions beacon-chain/core/peerdas/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,55 @@ func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {
require.Equal(t, true, verified, fmt.Sprintf("sidecar %d failed", i))
}
}

func TestHypergeomCDF(t *testing.T) {
// Test case from https://en.wikipedia.org/wiki/Hypergeometric_distribution
// Population size: 1000, number of successes in population: 500, sample size: 10, number of successes in sample: 5
// Expected result: 0.072
const (
expected = 0.0796665913283742
margin = 0.000001
)

actual := peerdas.HypergeomCDF(5, 128, 65, 16)
require.Equal(t, true, expected-margin <= actual && actual <= expected+margin)
}

func TestExtendedSampleCount(t *testing.T) {
const samplesPerSlot = 16

testCases := []struct {
name string
allowedMissings uint64
extendedSampleCount uint64
}{
{name: "allowedMissings=0", allowedMissings: 0, extendedSampleCount: 16},
{name: "allowedMissings=1", allowedMissings: 1, extendedSampleCount: 20},
{name: "allowedMissings=2", allowedMissings: 2, extendedSampleCount: 24},
{name: "allowedMissings=3", allowedMissings: 3, extendedSampleCount: 27},
{name: "allowedMissings=4", allowedMissings: 4, extendedSampleCount: 29},
{name: "allowedMissings=5", allowedMissings: 5, extendedSampleCount: 32},
{name: "allowedMissings=6", allowedMissings: 6, extendedSampleCount: 35},
{name: "allowedMissings=7", allowedMissings: 7, extendedSampleCount: 37},
{name: "allowedMissings=8", allowedMissings: 8, extendedSampleCount: 40},
{name: "allowedMissings=9", allowedMissings: 9, extendedSampleCount: 42},
{name: "allowedMissings=10", allowedMissings: 10, extendedSampleCount: 44},
{name: "allowedMissings=11", allowedMissings: 11, extendedSampleCount: 47},
{name: "allowedMissings=12", allowedMissings: 12, extendedSampleCount: 49},
{name: "allowedMissings=13", allowedMissings: 13, extendedSampleCount: 51},
{name: "allowedMissings=14", allowedMissings: 14, extendedSampleCount: 53},
{name: "allowedMissings=15", allowedMissings: 15, extendedSampleCount: 55},
{name: "allowedMissings=16", allowedMissings: 16, extendedSampleCount: 57},
{name: "allowedMissings=17", allowedMissings: 17, extendedSampleCount: 59},
{name: "allowedMissings=18", allowedMissings: 18, extendedSampleCount: 61},
{name: "allowedMissings=19", allowedMissings: 19, extendedSampleCount: 63},
{name: "allowedMissings=20", allowedMissings: 20, extendedSampleCount: 65},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := peerdas.ExtendedSampleCount(samplesPerSlot, tc.allowedMissings)
require.Equal(t, tc.extendedSampleCount, result)
})
}
}
1 change: 1 addition & 0 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ go_test(
"addr_factory_test.go",
"broadcaster_test.go",
"connection_gater_test.go",
"custody_test.go",
"dial_relay_node_test.go",
"discovery_test.go",
"fork_test.go",
Expand Down
93 changes: 60 additions & 33 deletions beacon-chain/p2p/custody.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,68 +9,95 @@ import (
"github.com/prysmaticlabs/prysm/v5/config/params"
)

// GetValidCustodyPeers returns a list of peers that custody a super set of the local node's custody columns.
func (s *Service) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
custodiedColumns, err := peerdas.CustodyColumns(s.NodeID(), peerdas.CustodySubnetCount())
// Get the total number of columns.
numberOfColumns := params.BeaconConfig().NumberOfColumns

localCustodySubnetCount := peerdas.CustodySubnetCount()
localCustodyColumns, err := peerdas.CustodyColumns(s.NodeID(), localCustodySubnetCount)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "custody columns for local node")
}
var validPeers []peer.ID

localCustotyColumnsCount := uint64(len(localCustodyColumns))

// Find the valid peers.
validPeers := make([]peer.ID, 0, len(peers))

loop:
for _, pid := range peers {
remoteCount := s.CustodyCountFromRemotePeer(pid)
// Get the custody subnets count of the remote peer.
remoteCustodySubnetCount := s.CustodyCountFromRemotePeer(pid)

nodeId, err := ConvertPeerIDToNodeID(pid)
// Get the remote node ID from the peer ID.
remoteNodeID, err := ConvertPeerIDToNodeID(pid)
if err != nil {
return nil, errors.Wrap(err, "convert peer ID to node ID")
}
remoteCustodiedColumns, err := peerdas.CustodyColumns(nodeId, remoteCount)

// Get the custody columns of the remote peer.
remoteCustodyColumns, err := peerdas.CustodyColumns(remoteNodeID, remoteCustodySubnetCount)
if err != nil {
return nil, errors.Wrap(err, "custody columns")
}
invalidPeer := false
for c := range custodiedColumns {
if !remoteCustodiedColumns[c] {
invalidPeer = true
break
}

remoteCustodyColumnsCount := uint64(len(remoteCustodyColumns))

// If the remote peer custodies less columns than the local node, skip it.
if remoteCustodyColumnsCount < localCustotyColumnsCount {
continue
}
if invalidPeer {

// If the remote peers custodies all the possible columns, add it to the list.
if remoteCustodyColumnsCount == numberOfColumns {
copiedId := pid
validPeers = append(validPeers, copiedId)
continue
}

// Filter out invalid peers.
for c := range localCustodyColumns {
if !remoteCustodyColumns[c] {
continue loop
}
}

copiedId := pid

// Add valid peer to list
validPeers = append(validPeers, copiedId)
}

return validPeers, nil
}

// CustodyCountFromRemotePeer retrieves the custody count from a remote peer.
func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) uint64 {
// By default, we assume the peer custodies the minimum number of subnets.
peerCustodyCountCount := params.BeaconConfig().CustodyRequirement
custodyRequirement := params.BeaconConfig().CustodyRequirement

// Retrieve the ENR of the peer.
peerRecord, err := s.peers.ENR(pid)
record, err := s.peers.ENR(pid)
if err != nil {
log.WithError(err).WithField("peerID", pid).Error("Failed to retrieve ENR for peer")
return peerCustodyCountCount
}
log.WithError(err).WithFields(logrus.Fields{
"peerID": pid,
"defaultValue": custodyRequirement,
}).Error("Failed to retrieve ENR for peer, defaulting to the default value")

if peerRecord == nil {
// This is the case for inbound peers. So we don't log an error for this.
log.WithField("peerID", pid).Debug("No ENR found for peer")
return peerCustodyCountCount
return custodyRequirement
}

// Load the `custody_subnet_count`
var csc CustodySubnetCount
if err := peerRecord.Load(&csc); err != nil {
log.WithField("peerID", pid).Error("Cannot load the custody_subnet_count from peer")
return peerCustodyCountCount
}
// Retrieve the custody subnets count from the ENR.
custodyCount, err := peerdas.CustodyCountFromRecord(record)
if err != nil {
log.WithError(err).WithFields(logrus.Fields{
"peerID": pid,
"defaultValue": custodyRequirement,
}).Error("Failed to retrieve custody count from ENR for peer, defaulting to the default value")

log.WithFields(logrus.Fields{
"peerID": pid,
"custodyCount": csc,
}).Debug("Custody count read from peer's ENR")
return custodyRequirement
}

return uint64(csc)
return custodyCount
}
Loading
Loading