Skip to content

Commit

Permalink
PeerDAS: Implement IncrementalDAS (#14109)
Browse files Browse the repository at this point in the history
* `ConvertPeerIDToNodeID`: Add tests.

* Remove `extractNodeID` and uses `ConvertPeerIDToNodeID` instead.

* Implement IncrementalDAS.

* `DataColumnSamplingLoop` ==> `DataColumnSamplingRoutine`.

* HypergeomCDF: Add test.

* `GetValidCustodyPeers`: Optimize and add tests.

* Remove blank identifiers.

* Implement `CustodyCountFromRecord`.

* Implement `TestP2P.CustodyCountFromRemotePeer`.

* `NewTestP2P`: Add `swarmt.Option` parameters.

* `incrementalDAS`: Rework and add tests.

* Remove useless warning.
  • Loading branch information
nalepae committed Jun 21, 2024
1 parent 11f56ca commit 913e84d
Show file tree
Hide file tree
Showing 19 changed files with 942 additions and 266 deletions.
1 change: 1 addition & 0 deletions beacon-chain/core/peerdas/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//proto/prysm/v1alpha1:go_default_library",
"@com_github_ethereum_c_kzg_4844//bindings/go:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_holiman_uint256//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
],
Expand Down
85 changes: 81 additions & 4 deletions beacon-chain/core/peerdas/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package peerdas
import (
"encoding/binary"
"math"
"math/big"

cKzg4844 "github.com/ethereum/c-kzg-4844/bindings/go"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/holiman/uint256"
errors "github.com/pkg/errors"

Expand All @@ -19,13 +21,24 @@ import (
)

// Bytes per cell
const bytesPerCell = cKzg4844.FieldElementsPerCell * cKzg4844.BytesPerFieldElement
const (
CustodySubnetCountEnrKey = "csc"

bytesPerCell = cKzg4844.FieldElementsPerCell * cKzg4844.BytesPerFieldElement
)

// https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/p2p-interface.md#the-discovery-domain-discv5
type Csc uint64

func (Csc) ENRKey() string { return CustodySubnetCountEnrKey }

var (
// Custom errors
errCustodySubnetCountTooLarge = errors.New("custody subnet count larger than data column sidecar subnet count")
errIndexTooLarge = errors.New("column index is larger than the specified number of columns")
errMismatchLength = errors.New("mismatch in the length of the commitments and proofs")
errCustodySubnetCountTooLarge = errors.New("custody subnet count larger than data column sidecar subnet count")
errIndexTooLarge = errors.New("column index is larger than the specified columns count")
errMismatchLength = errors.New("mismatch in the length of the commitments and proofs")
errRecordNil = errors.New("record is nil")
errCannotLoadCustodySubnetCount = errors.New("cannot load the custody subnet count from peer")

// maxUint256 is the maximum value of a uint256.
maxUint256 = &uint256.Int{math.MaxUint64, math.MaxUint64, math.MaxUint64, math.MaxUint64}
Expand Down Expand Up @@ -311,3 +324,67 @@ func CustodySubnetCount() uint64 {
}
return count
}

// HypergeomCDF computes the hypergeometric cumulative distribution function.
// https://en.wikipedia.org/wiki/Hypergeometric_distribution
func HypergeomCDF(k, M, n, N uint64) float64 {
denominatorInt := new(big.Int).Binomial(int64(M), int64(N)) // lint:ignore uintcast
denominator := new(big.Float).SetInt(denominatorInt)

rBig := big.NewFloat(0)

for i := uint64(0); i < k+1; i++ {
a := new(big.Int).Binomial(int64(n), int64(i)) // lint:ignore uintcast
b := new(big.Int).Binomial(int64(M-n), int64(N-i))
numeratorInt := new(big.Int).Mul(a, b)
numerator := new(big.Float).SetInt(numeratorInt)
item := new(big.Float).Quo(numerator, denominator)
rBig.Add(rBig, item)
}

r, _ := rBig.Float64()

return r
}

// ExtendedSampleCount computes, for a given number of samples per slot and allowed failures the
// number of samples we should actually query from peers.
// TODO: Add link to the specification once it is available.
func ExtendedSampleCount(samplesPerSlot, allowedFailures uint64) uint64 {
// Retrieve the columns count
columnsCount := params.BeaconConfig().NumberOfColumns

// If half of the columns are missing, we are able to reconstruct the data.
// If half of the columns + 1 are missing, we are not able to reconstruct the data.
// This is the smallest worst case.
worstCaseMissing := columnsCount/2 + 1

// Compute the false positive threshold.
falsePositiveThreshold := HypergeomCDF(0, columnsCount, worstCaseMissing, samplesPerSlot)

var sampleCount uint64

// Finally, compute the extended sample count.
for sampleCount = samplesPerSlot; sampleCount < columnsCount+1; sampleCount++ {
if HypergeomCDF(allowedFailures, columnsCount, worstCaseMissing, sampleCount) <= falsePositiveThreshold {
break
}
}

return sampleCount
}

func CustodyCountFromRecord(record *enr.Record) (uint64, error) {
// By default, we assume the peer custodies the minimum number of subnets.
if record == nil {
return 0, errRecordNil
}

// Load the `custody_subnet_count`
var csc Csc
if err := record.Load(&csc); err != nil {
return 0, errCannotLoadCustodySubnetCount
}

return uint64(csc), nil
}
52 changes: 52 additions & 0 deletions beacon-chain/core/peerdas/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,55 @@ func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {
require.Equal(t, true, verified, fmt.Sprintf("sidecar %d failed", i))
}
}

func TestHypergeomCDF(t *testing.T) {
// Test case from https://en.wikipedia.org/wiki/Hypergeometric_distribution
// Population size: 1000, number of successes in population: 500, sample size: 10, number of successes in sample: 5
// Expected result: 0.072
const (
expected = 0.0796665913283742
margin = 0.000001
)

actual := peerdas.HypergeomCDF(5, 128, 65, 16)
require.Equal(t, true, expected-margin <= actual && actual <= expected+margin)
}

func TestExtendedSampleCount(t *testing.T) {
const samplesPerSlot = 16

testCases := []struct {
name string
allowedMissings uint64
extendedSampleCount uint64
}{
{name: "allowedMissings=0", allowedMissings: 0, extendedSampleCount: 16},
{name: "allowedMissings=1", allowedMissings: 1, extendedSampleCount: 20},
{name: "allowedMissings=2", allowedMissings: 2, extendedSampleCount: 24},
{name: "allowedMissings=3", allowedMissings: 3, extendedSampleCount: 27},
{name: "allowedMissings=4", allowedMissings: 4, extendedSampleCount: 29},
{name: "allowedMissings=5", allowedMissings: 5, extendedSampleCount: 32},
{name: "allowedMissings=6", allowedMissings: 6, extendedSampleCount: 35},
{name: "allowedMissings=7", allowedMissings: 7, extendedSampleCount: 37},
{name: "allowedMissings=8", allowedMissings: 8, extendedSampleCount: 40},
{name: "allowedMissings=9", allowedMissings: 9, extendedSampleCount: 42},
{name: "allowedMissings=10", allowedMissings: 10, extendedSampleCount: 44},
{name: "allowedMissings=11", allowedMissings: 11, extendedSampleCount: 47},
{name: "allowedMissings=12", allowedMissings: 12, extendedSampleCount: 49},
{name: "allowedMissings=13", allowedMissings: 13, extendedSampleCount: 51},
{name: "allowedMissings=14", allowedMissings: 14, extendedSampleCount: 53},
{name: "allowedMissings=15", allowedMissings: 15, extendedSampleCount: 55},
{name: "allowedMissings=16", allowedMissings: 16, extendedSampleCount: 57},
{name: "allowedMissings=17", allowedMissings: 17, extendedSampleCount: 59},
{name: "allowedMissings=18", allowedMissings: 18, extendedSampleCount: 61},
{name: "allowedMissings=19", allowedMissings: 19, extendedSampleCount: 63},
{name: "allowedMissings=20", allowedMissings: 20, extendedSampleCount: 65},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := peerdas.ExtendedSampleCount(samplesPerSlot, tc.allowedMissings)
require.Equal(t, tc.extendedSampleCount, result)
})
}
}
1 change: 1 addition & 0 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ go_test(
"addr_factory_test.go",
"broadcaster_test.go",
"connection_gater_test.go",
"custody_test.go",
"dial_relay_node_test.go",
"discovery_test.go",
"fork_test.go",
Expand Down
93 changes: 60 additions & 33 deletions beacon-chain/p2p/custody.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,68 +9,95 @@ import (
"github.com/prysmaticlabs/prysm/v5/config/params"
)

// GetValidCustodyPeers returns a list of peers that custody a super set of the local node's custody columns.
func (s *Service) GetValidCustodyPeers(peers []peer.ID) ([]peer.ID, error) {
custodiedColumns, err := peerdas.CustodyColumns(s.NodeID(), peerdas.CustodySubnetCount())
// Get the total number of columns.
numberOfColumns := params.BeaconConfig().NumberOfColumns

localCustodySubnetCount := peerdas.CustodySubnetCount()
localCustodyColumns, err := peerdas.CustodyColumns(s.NodeID(), localCustodySubnetCount)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "custody columns for local node")
}
var validPeers []peer.ID

localCustotyColumnsCount := uint64(len(localCustodyColumns))

// Find the valid peers.
validPeers := make([]peer.ID, 0, len(peers))

loop:
for _, pid := range peers {
remoteCount := s.CustodyCountFromRemotePeer(pid)
// Get the custody subnets count of the remote peer.
remoteCustodySubnetCount := s.CustodyCountFromRemotePeer(pid)

nodeId, err := ConvertPeerIDToNodeID(pid)
// Get the remote node ID from the peer ID.
remoteNodeID, err := ConvertPeerIDToNodeID(pid)
if err != nil {
return nil, errors.Wrap(err, "convert peer ID to node ID")
}
remoteCustodiedColumns, err := peerdas.CustodyColumns(nodeId, remoteCount)

// Get the custody columns of the remote peer.
remoteCustodyColumns, err := peerdas.CustodyColumns(remoteNodeID, remoteCustodySubnetCount)
if err != nil {
return nil, errors.Wrap(err, "custody columns")
}
invalidPeer := false
for c := range custodiedColumns {
if !remoteCustodiedColumns[c] {
invalidPeer = true
break
}

remoteCustodyColumnsCount := uint64(len(remoteCustodyColumns))

// If the remote peer custodies less columns than the local node, skip it.
if remoteCustodyColumnsCount < localCustotyColumnsCount {
continue
}
if invalidPeer {

// If the remote peers custodies all the possible columns, add it to the list.
if remoteCustodyColumnsCount == numberOfColumns {
copiedId := pid
validPeers = append(validPeers, copiedId)
continue
}

// Filter out invalid peers.
for c := range localCustodyColumns {
if !remoteCustodyColumns[c] {
continue loop
}
}

copiedId := pid

// Add valid peer to list
validPeers = append(validPeers, copiedId)
}

return validPeers, nil
}

// CustodyCountFromRemotePeer retrieves the custody count from a remote peer.
func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) uint64 {
// By default, we assume the peer custodies the minimum number of subnets.
peerCustodyCountCount := params.BeaconConfig().CustodyRequirement
custodyRequirement := params.BeaconConfig().CustodyRequirement

// Retrieve the ENR of the peer.
peerRecord, err := s.peers.ENR(pid)
record, err := s.peers.ENR(pid)
if err != nil {
log.WithError(err).WithField("peerID", pid).Error("Failed to retrieve ENR for peer")
return peerCustodyCountCount
}
log.WithError(err).WithFields(logrus.Fields{
"peerID": pid,
"defaultValue": custodyRequirement,
}).Error("Failed to retrieve ENR for peer, defaulting to the default value")

if peerRecord == nil {
// This is the case for inbound peers. So we don't log an error for this.
log.WithField("peerID", pid).Debug("No ENR found for peer")
return peerCustodyCountCount
return custodyRequirement
}

// Load the `custody_subnet_count`
var csc CustodySubnetCount
if err := peerRecord.Load(&csc); err != nil {
log.WithField("peerID", pid).Error("Cannot load the custody_subnet_count from peer")
return peerCustodyCountCount
}
// Retrieve the custody subnets count from the ENR.
custodyCount, err := peerdas.CustodyCountFromRecord(record)
if err != nil {
log.WithError(err).WithFields(logrus.Fields{
"peerID": pid,
"defaultValue": custodyRequirement,
}).Error("Failed to retrieve custody count from ENR for peer, defaulting to the default value")

log.WithFields(logrus.Fields{
"peerID": pid,
"custodyCount": csc,
}).Debug("Custody count read from peer's ENR")
return custodyRequirement
}

return uint64(csc)
return custodyCount
}
Loading

0 comments on commit 913e84d

Please sign in to comment.