Skip to content

Commit

Permalink
incrementalDAS: Rework and add tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
nalepae committed Jun 20, 2024
1 parent 20b9072 commit 01394d6
Show file tree
Hide file tree
Showing 3 changed files with 339 additions and 37 deletions.
5 changes: 5 additions & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ go_test(
"block_batcher_test.go",
"broadcast_bls_changes_test.go",
"context_test.go",
"data_columns_sampling_test.go",
"decode_pubsub_test.go",
"error_test.go",
"fork_watcher_test.go",
Expand Down Expand Up @@ -205,6 +206,7 @@ go_test(
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/core/transition:go_default_library",
Expand Down Expand Up @@ -249,6 +251,7 @@ go_test(
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/attestation:go_default_library",
"//proto/prysm/v1alpha1/metadata:go_default_library",
"//runtime/version:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
Expand All @@ -260,9 +263,11 @@ go_test(
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_golang_snappy//:go_default_library",
"@com_github_libp2p_go_libp2p//core:go_default_library",
"@com_github_libp2p_go_libp2p//core/crypto:go_default_library",
"@com_github_libp2p_go_libp2p//core/network:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p//core/protocol:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/net/swarm/testing:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
"@com_github_patrickmn_go_cache//:go_default_library",
Expand Down
147 changes: 110 additions & 37 deletions beacon-chain/sync/data_columns_sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,25 @@ import (
"github.com/prysmaticlabs/prysm/v5/runtime/version"
)

// randomSlice returns a slice of `count` random integers in the range [0, count[.
// Each item is unique.
func randomSlice(count uint64) []uint64 {
slice := make([]uint64, count)
type roundSummary struct {
RequestedColumns []uint64
MissingColumns map[uint64]bool
}

for i := uint64(0); i < count; i++ {
slice[i] = i
// randomizeColumns returns a slice containing all columns in a random order.
func randomizeColumns(columns map[uint64]bool) []uint64 {
// Create a slice from columns.
randomized := make([]uint64, 0, len(columns))
for column := range columns {
randomized = append(randomized, column)
}

// Shuffle the slice.
rand.NewGenerator().Shuffle(len(slice), func(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
rand.NewGenerator().Shuffle(len(randomized), func(i, j int) {
randomized[i], randomized[j] = randomized[j], randomized[i]
})

return slice
return randomized
}

// sortedSliceFromMap returns a sorted slices of keys from a map.
Expand Down Expand Up @@ -160,7 +164,7 @@ func (s *Service) sampleDataColumnsFromPeer(
func (s *Service) sampleDataColumnsFromPeers(
columnsToSample []uint64,
root [fieldparams.RootLength]byte,
) (uint64, error) {
) (map[uint64]bool, error) {
// Build all remaining columns to sample.
remainingColumnsToSample := make(map[uint64]bool, len(columnsToSample))
for _, column := range columnsToSample {
Expand All @@ -170,17 +174,17 @@ func (s *Service) sampleDataColumnsFromPeers(
// Get the active peers from the p2p service.
activePids := s.cfg.p2p.Peers().Active()

// Query all peers until either all columns to request are retrieved or all active peers are queried (whichever comes first).
retrievedColumnsCount := 0
retrievedColumns := make(map[uint64]bool, len(columnsToSample))

// Query all peers until either all columns to request are retrieved or all active peers are queried (whichever comes first).
for i := 0; len(remainingColumnsToSample) > 0 && i < len(activePids); i++ {
// Get the peer ID.
pid := activePids[i]

// Get the custody columns of the peer.
peerCustodyColumns, err := s.custodyColumnsFromPeer(pid)
if err != nil {
return 0, errors.Wrap(err, "custody columns from peer")
return nil, errors.Wrap(err, "custody columns from peer")
}

// Compute the intersection of the peer custody columns and the remaining columns to request.
Expand All @@ -199,57 +203,74 @@ func (s *Service) sampleDataColumnsFromPeers(
// Sample data columns from the peer.
peerRetrievedColumns, err := s.sampleDataColumnsFromPeer(pid, peerRequestedColumns, root)
if err != nil {
return 0, errors.Wrap(err, "sample data columns from peer")
return nil, errors.Wrap(err, "sample data columns from peer")
}

// Update the retrieved columns.
retrievedColumnsCount += len(peerRetrievedColumns)
for column := range peerRetrievedColumns {
retrievedColumns[column] = true
}
}

return uint64(retrievedColumnsCount), nil
return retrievedColumns, nil
}

// incrementalDAS samples data columns from active peers using incremental DAS.
// https://ethresear.ch/t/lossydas-lossy-incremental-and-diagonal-sampling-for-data-availability/18963#incrementaldas-dynamically-increase-the-sample-size-10
func (s *Service) incrementalDAS(root [fieldparams.RootLength]byte, sampleCount uint64) error {
// Retrieve the number of columns.
columnsCount := params.BeaconConfig().NumberOfColumns

// Ramdomize all columns.
columns := randomSlice(columnsCount)

// Define the first column to sample.
missingColumnsCount := uint64(0)

func (s *Service) incrementalDAS(
root [fieldparams.RootLength]byte,
columns []uint64,
sampleCount uint64,
) (bool, []roundSummary, error) {
columnsCount, missingColumnsCount := uint64(len(columns)), uint64(0)
firstColumnToSample, extendedSampleCount := uint64(0), peerdas.ExtendedSampleCount(sampleCount, 0)

for i := 1; ; i++ {
roundSummaries := make([]roundSummary, 0, 1) // We optimistically allocate only one round summary.

for round := 1; ; /*No exit condition */ round++ {
if extendedSampleCount > columnsCount {
// We already tried to sample all columns, this is the unhappy path.
// We already tried to sample all possible columns, this is the unhappy path.
log.WithField("root", fmt.Sprintf("%#x", root)).Warning("Some columns are still missing after sampling all possible columns")
return nil
return false, roundSummaries, nil
}

// Get the columns to sample for this round.
columnsToSample := columns[firstColumnToSample:extendedSampleCount]
columnsToSampleCount := extendedSampleCount - firstColumnToSample

retrievedSampleCount, err := s.sampleDataColumnsFromPeers(columnsToSample, root)
// Sample the data columns from the peers.
retrievedSamples, err := s.sampleDataColumnsFromPeers(columnsToSample, root)
if err != nil {
return errors.Wrap(err, "sample data columns from peers")
return false, nil, errors.Wrap(err, "sample data columns from peers")
}

// Compute the missing samples.
missingSamples := make(map[uint64]bool, max(0, len(columnsToSample)-len(retrievedSamples)))
for _, column := range columnsToSample {
if !retrievedSamples[column] {
missingSamples[column] = true
}
}

roundSummaries = append(roundSummaries, roundSummary{
RequestedColumns: columnsToSample,
MissingColumns: missingSamples,
})

retrievedSampleCount := uint64(len(retrievedSamples))

if retrievedSampleCount == columnsToSampleCount {
// All columns were correctly sampled, this is the happy path.
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"roundsNeeded": i,
"roundsNeeded": round,
}).Debug("All columns were successfully sampled")
return nil
return true, roundSummaries, nil
}

if retrievedSampleCount > columnsToSampleCount {
// This should never happen.
return errors.New("retrieved more columns than requested")
return false, nil, errors.New("retrieved more columns than requested")
}

// Some columns are missing, we need to extend the sample size.
Expand All @@ -261,7 +282,7 @@ func (s *Service) incrementalDAS(root [fieldparams.RootLength]byte, sampleCount

log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"round": i,
"round": round,
"missingColumnsCount": missingColumnsCount,
"currentSampleCount": oldExtendedSampleCount,
"nextSampleCount": extendedSampleCount,
Expand All @@ -271,10 +292,44 @@ func (s *Service) incrementalDAS(root [fieldparams.RootLength]byte, sampleCount

// DataColumnSamplingRoutine runs incremental DAS on block when received.
func (s *Service) DataColumnSamplingRoutine(ctx context.Context) {
// Get the custody subnets count.
custodySubnetsCount := peerdas.CustodySubnetCount()

// Create a subscription to the state feed.
stateChannel := make(chan *feed.Event, 1)
stateSub := s.cfg.stateNotifier.StateFeed().Subscribe(stateChannel)

// Retrieve the number of columns.
columnsCount := params.BeaconConfig().NumberOfColumns

// Retrieve all columns we custody.
custodyColumns, err := peerdas.CustodyColumns(s.cfg.p2p.NodeID(), custodySubnetsCount)
if err != nil {
log.WithError(err).Error("Failed to get custody columns")
return
}

custodyColumnsCount := uint64(len(custodyColumns))

// Compute the number of columns to sample.
if custodyColumnsCount >= columnsCount/2 {
log.WithFields(logrus.Fields{
"custodyColumnsCount": custodyColumnsCount,
"columnsCount": columnsCount,
}).Debug("At least half of the columns are custody columns, no need to sample")
return
}

samplesCount := min(params.BeaconConfig().SamplesPerSlot, columnsCount/2-custodyColumnsCount)

// Compute all the columns we do NOT custody.
nonCustodyColums := make(map[uint64]bool, columnsCount-custodyColumnsCount)
for i := uint64(0); i < columnsCount; i++ {
if !custodyColumns[i] {
nonCustodyColums[i] = true
}
}

// Unsubscribe from the state feed when the function returns.
defer stateSub.Unsubscribe()

Expand Down Expand Up @@ -315,9 +370,27 @@ func (s *Service) DataColumnSamplingRoutine(ctx context.Context) {
continue
}

// Ramdomize all columns.
randomizedColumns := randomizeColumns(nonCustodyColums)

// Sample data columns with incremental DAS.
if err := s.incrementalDAS(data.BlockRoot, params.BeaconConfig().SamplesPerSlot); err != nil {
log.WithError(err).Error("Failed to sample data columns")
ok, _, err = s.incrementalDAS(data.BlockRoot, randomizedColumns, samplesCount)
if err != nil {
log.WithError(err).Error("Error during incremental DAS")
}

if ok {
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", data.BlockRoot),
"columns": randomizedColumns,
"sampleCount": samplesCount,
}).Debug("Data column sampling successful")
} else {
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", data.BlockRoot),
"columns": randomizedColumns,
"sampleCount": samplesCount,
}).Warning("Data column sampling failed")
}

case <-s.ctx.Done():
Expand Down
Loading

0 comments on commit 01394d6

Please sign in to comment.