From 09c71fef1521cbea2bcb322a9de864b7c1a660e6 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Thu, 13 Jun 2024 10:35:27 +0200 Subject: [PATCH] Implement IncrementalDAS. --- beacon-chain/core/peerdas/helpers.go | 52 +++- beacon-chain/core/peerdas/helpers_test.go | 27 ++ beacon-chain/p2p/rpc_topic_mappings.go | 2 +- beacon-chain/sync/data_columns_sampling.go | 273 +++++++++++------- .../sync/rpc_data_column_sidecars_by_root.go | 12 +- beacon-chain/sync/service.go | 2 +- 6 files changed, 254 insertions(+), 114 deletions(-) diff --git a/beacon-chain/core/peerdas/helpers.go b/beacon-chain/core/peerdas/helpers.go index 60be1b13fcfa..a97f366068d8 100644 --- a/beacon-chain/core/peerdas/helpers.go +++ b/beacon-chain/core/peerdas/helpers.go @@ -3,6 +3,7 @@ package peerdas import ( "encoding/binary" "math" + "math/big" cKzg4844 "github.com/ethereum/c-kzg-4844/bindings/go" "github.com/ethereum/go-ethereum/p2p/enode" @@ -22,7 +23,7 @@ const bytesPerCell = cKzg4844.FieldElementsPerCell * cKzg4844.BytesPerFieldEleme var ( // Custom errors errCustodySubnetCountTooLarge = errors.New("custody subnet count larger than data column sidecar subnet count") - errIndexTooLarge = errors.New("column index is larger than the specified number of columns") + errIndexTooLarge = errors.New("column index is larger than the specified columns count") errMismatchLength = errors.New("mismatch in the length of the commitments and proofs") // maxUint256 is the maximum value of a uint256. @@ -300,3 +301,52 @@ func VerifyDataColumnSidecarKZGProofs(sc *ethpb.DataColumnSidecar) (bool, error) } return cKzg4844.VerifyCellKZGProofBatch(ckzgComms, rowIdx, colIdx, cells, proofs) } + +// hypergeomCDF computes the hypergeometric cumulative distribution function. +// https://en.wikipedia.org/wiki/Hypergeometric_distribution +func hypergeomCDF(k, M, n, N uint64) float64 { + denominatorInt := new(big.Int).Binomial(int64(M), int64(N)) // lint:ignore uintcast + denominator := new(big.Float).SetInt(denominatorInt) + + rBig := big.NewFloat(0) + + for i := uint64(0); i < k+1; i++ { + a := new(big.Int).Binomial(int64(n), int64(i)) // lint:ignore uintcast + b := new(big.Int).Binomial(int64(M-n), int64(N-i)) + numeratorInt := new(big.Int).Mul(a, b) + numerator := new(big.Float).SetInt(numeratorInt) + item := new(big.Float).Quo(numerator, denominator) + rBig.Add(rBig, item) + } + + r, _ := rBig.Float64() + + return r +} + +// ExtendedSampleCount computes, for a given number of samples per slot and allowed failures the +// number of samples we should actually query from peers. +// TODO: Add link to the specification once it is available. +func ExtendedSampleCount(samplesPerSlot, allowedFailures uint64) uint64 { + // Retrieve the columns count + columnsCount := params.BeaconConfig().NumberOfColumns + + // If half of the columns are missing, we are able to reconstruct the data. + // If half of the columns + 1 are missing, we are not able to reconstruct the data. + // This is the smallest worst case. + worstCaseMissing := columnsCount/2 + 1 + + // Compute the false positive threshold. + falsePositiveThreshold := hypergeomCDF(0, columnsCount, worstCaseMissing, samplesPerSlot) + + var sampleCount uint64 + + // Finally, compute the extended sample count. + for sampleCount = samplesPerSlot; sampleCount < columnsCount+1; sampleCount++ { + if hypergeomCDF(allowedFailures, columnsCount, worstCaseMissing, sampleCount) <= falsePositiveThreshold { + break + } + } + + return sampleCount +} diff --git a/beacon-chain/core/peerdas/helpers_test.go b/beacon-chain/core/peerdas/helpers_test.go index 4a798590c9e7..420a7e0a67f9 100644 --- a/beacon-chain/core/peerdas/helpers_test.go +++ b/beacon-chain/core/peerdas/helpers_test.go @@ -89,3 +89,30 @@ func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) { require.Equal(t, true, verified, fmt.Sprintf("sidecar %d failed", i)) } } + +func TestExtendedSampleCount(t *testing.T) { + const samplesPerSlot = 16 + + testCases := []struct { + name string + allowedMissings uint64 + extendedSampleCount uint64 + }{ + {name: "allowedMissings=0", allowedMissings: 0, extendedSampleCount: 16}, + {name: "allowedMissings=1", allowedMissings: 1, extendedSampleCount: 20}, + {name: "allowedMissings=2", allowedMissings: 2, extendedSampleCount: 24}, + {name: "allowedMissings=3", allowedMissings: 3, extendedSampleCount: 27}, + {name: "allowedMissings=4", allowedMissings: 4, extendedSampleCount: 29}, + {name: "allowedMissings=5", allowedMissings: 5, extendedSampleCount: 32}, + {name: "allowedMissings=6", allowedMissings: 6, extendedSampleCount: 35}, + {name: "allowedMissings=7", allowedMissings: 7, extendedSampleCount: 37}, + {name: "allowedMissings=8", allowedMissings: 8, extendedSampleCount: 40}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := peerdas.ExtendedSampleCount(samplesPerSlot, tc.allowedMissings) + require.Equal(t, tc.extendedSampleCount, result) + }) + } +} diff --git a/beacon-chain/p2p/rpc_topic_mappings.go b/beacon-chain/p2p/rpc_topic_mappings.go index 6412f883fee1..0b4f6688d95f 100644 --- a/beacon-chain/p2p/rpc_topic_mappings.go +++ b/beacon-chain/p2p/rpc_topic_mappings.go @@ -116,7 +116,7 @@ var RPCTopicMappings = map[string]interface{}{ // DataColumnSidecarsByRange v1 Message RPCDataColumnSidecarsByRangeTopicV1: new(pb.DataColumnSidecarsByRangeRequest), // DataColumnSidecarsByRoot v1 Message - RPCDataColumnSidecarsByRootTopicV1: new(p2ptypes.BlobSidecarsByRootReq), + RPCDataColumnSidecarsByRootTopicV1: new(p2ptypes.DataColumnSidecarsByRootReq), } // Maps all registered protocol prefixes. diff --git a/beacon-chain/sync/data_columns_sampling.go b/beacon-chain/sync/data_columns_sampling.go index 927a75f03d45..6d36c12cd654 100644 --- a/beacon-chain/sync/data_columns_sampling.go +++ b/beacon-chain/sync/data_columns_sampling.go @@ -21,21 +21,25 @@ import ( "github.com/prysmaticlabs/prysm/v5/runtime/version" ) -// reandomIntegers returns a map of `count` random integers in the range [0, max[. -func randomIntegers(count uint64, max uint64) map[uint64]bool { - result := make(map[uint64]bool, count) - randGenerator := rand.NewGenerator() - - for uint64(len(result)) < count { - n := randGenerator.Uint64() % max - result[n] = true +// randomSlice returns a slice of `count` random integers in the range [0, count[. +// Each item is unique. +func randomSlice(count uint64) []uint64 { + slice := make([]uint64, count) + + for i := uint64(0); i < count; i++ { + slice[i] = i } - return result + // Shuffle the slice. + rand.NewGenerator().Shuffle(len(slice), func(i, j int) { + slice[i], slice[j] = slice[j], slice[i] + }) + + return slice } -// sortedListFromMap returns a sorted list of keys from a map. -func sortedListFromMap(m map[uint64]bool) []uint64 { +// sortedSliceFromMap returns a sorted slices of keys from a map. +func sortedSliceFromMap(m map[uint64]bool) []uint64 { result := make([]uint64, 0, len(m)) for k := range m { result = append(result, k) @@ -48,21 +52,10 @@ func sortedListFromMap(m map[uint64]bool) []uint64 { return result } -// sampleDataColumnFromPeer samples data columns from a peer. -// It returns the missing columns after sampling. -func (s *Service) sampleDataColumnFromPeer( - pid peer.ID, - columnsToSample map[uint64]bool, - requestedRoot [fieldparams.RootLength]byte, -) (map[uint64]bool, error) { - // Define missing columns. - missingColumns := make(map[uint64]bool, len(columnsToSample)) - for index := range columnsToSample { - missingColumns[index] = true - } - +// custodyColumnsFromPeer returns the columns the peer should custody. +func (s *Service) custodyColumnsFromPeer(pid peer.ID) (map[uint64]bool, error) { // Retrieve the custody count of the peer. - peerCustodiedSubnetCount := s.cfg.p2p.CustodyCountFromRemotePeer(pid) + custodySubnetCount := s.cfg.p2p.CustodyCountFromRemotePeer(pid) // Extract the node ID from the peer ID. nodeID, err := p2p.ConvertPeerIDToNodeID(pid) @@ -71,141 +64,213 @@ func (s *Service) sampleDataColumnFromPeer( } // Determine which columns the peer should custody. - peerCustodiedColumns, err := peerdas.CustodyColumns(nodeID, peerCustodiedSubnetCount) + custodyColumns, err := peerdas.CustodyColumns(nodeID, custodySubnetCount) if err != nil { return nil, errors.Wrap(err, "custody columns") } - peerCustodiedColumnsList := sortedListFromMap(peerCustodiedColumns) - - // Compute the intersection of the columns to sample and the columns the peer should custody. - peerRequestedColumns := make(map[uint64]bool, len(columnsToSample)) - for column := range columnsToSample { - if peerCustodiedColumns[column] { - peerRequestedColumns[column] = true - } - } - - peerRequestedColumnsList := sortedListFromMap(peerRequestedColumns) + return custodyColumns, nil +} - // Get the data column identifiers to sample from this peer. - dataColumnIdentifiers := make(types.DataColumnSidecarsByRootReq, 0, len(peerRequestedColumns)) - for index := range peerRequestedColumns { +// sampleDataColumnsFromPeer samples data columns from a peer. +// It filters out columns that were not requested and columns with incorrect root. +// It returns the retrieved columns. +func (s *Service) sampleDataColumnsFromPeer( + pid peer.ID, + requestedColumns map[uint64]bool, + root [fieldparams.RootLength]byte, +) (map[uint64]bool, error) { + // Build the data column identifiers. + dataColumnIdentifiers := make(types.DataColumnSidecarsByRootReq, 0, len(requestedColumns)) + for index := range requestedColumns { dataColumnIdentifiers = append(dataColumnIdentifiers, ð.DataColumnIdentifier{ - BlockRoot: requestedRoot[:], + BlockRoot: root[:], ColumnIndex: index, }) } - // Return early if there are no data columns to sample. - if len(dataColumnIdentifiers) == 0 { - log.WithFields(logrus.Fields{ - "peerID": pid, - "custodiedColumns": peerCustodiedColumnsList, - "requestedColumns": peerRequestedColumnsList, - }).Debug("Peer does not custody any of the requested columns") - return columnsToSample, nil - } - - // Sample data columns. + // Send the request. roDataColumns, err := SendDataColumnSidecarByRoot(s.ctx, s.cfg.clock, s.cfg.p2p, pid, s.ctxMap, &dataColumnIdentifiers) if err != nil { return nil, errors.Wrap(err, "send data column sidecar by root") } - peerRetrievedColumns := make(map[uint64]bool, len(roDataColumns)) + retrievedColumns := make(map[uint64]bool, len(roDataColumns)) // Remove retrieved items from rootsByDataColumnIndex. for _, roDataColumn := range roDataColumns { retrievedColumn := roDataColumn.ColumnIndex actualRoot := roDataColumn.BlockRoot() - if actualRoot != requestedRoot { + + // Filter out columns with incorrect root. + if actualRoot != root { // TODO: Should we decrease the peer score here? log.WithFields(logrus.Fields{ "peerID": pid, - "requestedRoot": fmt.Sprintf("%#x", requestedRoot), + "requestedRoot": fmt.Sprintf("%#x", root), "actualRoot": fmt.Sprintf("%#x", actualRoot), }).Warning("Actual root does not match requested root") continue } - peerRetrievedColumns[retrievedColumn] = true - - if !columnsToSample[retrievedColumn] { + // Filter out columns that were not requested. + if !requestedColumns[retrievedColumn] { // TODO: Should we decrease the peer score here? + columnsToSampleList := sortedSliceFromMap(requestedColumns) + log.WithFields(logrus.Fields{ "peerID": pid, + "requestedColumns": columnsToSampleList, "retrievedColumn": retrievedColumn, - "requestedColumns": peerRequestedColumnsList, - }).Warning("Retrieved column is was not requested") + }).Warning("Retrieved column was not requested") + + continue } - delete(missingColumns, retrievedColumn) + retrievedColumns[retrievedColumn] = true } - peerRetrievedColumnsList := sortedListFromMap(peerRetrievedColumns) - remainingMissingColumnsList := sortedListFromMap(missingColumns) + if len(retrievedColumns) == len(requestedColumns) { + // This is the happy path. + log.WithFields(logrus.Fields{ + "peerID": pid, + "root": fmt.Sprintf("%#x", root), + "requestedColumns": sortedSliceFromMap(requestedColumns), + }).Debug("All requested columns were successfully sampled from peer") + return retrievedColumns, nil + } + + // Some columns are missing. log.WithFields(logrus.Fields{ - "peerID": pid, - "custodiedColumns": peerCustodiedColumnsList, - "requestedColumns": peerRequestedColumnsList, - "retrievedColumns": peerRetrievedColumnsList, - "remainingMissingColumns": remainingMissingColumnsList, - }).Debug("Peer data column sampling summary") - - return missingColumns, nil -} + "peerID": pid, + "root": fmt.Sprintf("%#x", root), + "requestedColumns": sortedSliceFromMap(requestedColumns), + "retrievedColumns": sortedSliceFromMap(retrievedColumns), + }).Warning("Some requested columns were not sampled from peer") -// sampleDataColumns samples data columns from active peers. -func (s *Service) sampleDataColumns(requestedRoot [fieldparams.RootLength]byte, samplesCount uint64) error { - // Determine `samplesCount` random column indexes. - requestedColumns := randomIntegers(samplesCount, params.BeaconConfig().NumberOfColumns) + return retrievedColumns, nil +} - missingColumns := make(map[uint64]bool, len(requestedColumns)) - for index := range requestedColumns { - missingColumns[index] = true +// sampleDataColumnsFromPeers samples data columns from active peers. +// It returns the retrieved columns count. +// If one peer fails to return a column it should custody, the column is considered as missing. +func (s *Service) sampleDataColumnsFromPeers( + columnsToSample []uint64, + root [fieldparams.RootLength]byte, +) (uint64, error) { + // Build all remaining columns to sample. + remainingColumnsToSample := make(map[uint64]bool, len(columnsToSample)) + for _, column := range columnsToSample { + remainingColumnsToSample[column] = true } // Get the active peers from the p2p service. - activePeers := s.cfg.p2p.Peers().Active() + activePids := s.cfg.p2p.Peers().Active() + + // Query all peers until either all columns to request are retrieved or all active peers are queried (whichever comes first). + retrievedColumnsCount := 0 + + for i := 0; len(remainingColumnsToSample) > 0 && i < len(activePids); i++ { + // Get the peer ID. + pid := activePids[i] + + // Get the custody columns of the peer. + peerCustodyColumns, err := s.custodyColumnsFromPeer(pid) + if err != nil { + return 0, errors.Wrap(err, "custody columns from peer") + } - var err error + // Compute the intersection of the peer custody columns and the remaining columns to request. + peerRequestedColumns := make(map[uint64]bool, len(peerCustodyColumns)) + for column := range remainingColumnsToSample { + if peerCustodyColumns[column] { + peerRequestedColumns[column] = true + } + } - // Sampling is done sequentially peer by peer. - // TODO: Add parallelism if (probably) needed. - for _, pid := range activePeers { - // Early exit if all needed columns are already sampled. (This is the happy path.) - if len(missingColumns) == 0 { - break + // Remove the newsly requested columns from the remaining columns to request. + for column := range peerRequestedColumns { + delete(remainingColumnsToSample, column) } // Sample data columns from the peer. - missingColumns, err = s.sampleDataColumnFromPeer(pid, missingColumns, requestedRoot) + peerRetrievedColumns, err := s.sampleDataColumnsFromPeer(pid, peerRequestedColumns, root) if err != nil { - return errors.Wrap(err, "sample data column from peer") + return 0, errors.Wrap(err, "sample data columns from peer") } + + // Update the retrieved columns. + retrievedColumnsCount += len(peerRetrievedColumns) } - requestedColumnsList := sortedListFromMap(requestedColumns) + return uint64(retrievedColumnsCount), nil +} - if len(missingColumns) == 0 { - log.WithField("requestedColumns", requestedColumnsList).Debug("Successfully sampled all requested columns") - return nil - } +// incrementalDAS samples data columns from active peers using incremental DAS. +// https://ethresear.ch/t/lossydas-lossy-incremental-and-diagonal-sampling-for-data-availability/18963#incrementaldas-dynamically-increase-the-sample-size-10 +func (s *Service) incrementalDAS(root [fieldparams.RootLength]byte, sampleCount uint64) error { + // Retrieve the number of columns. + columnsCount := params.BeaconConfig().NumberOfColumns - missingColumnsList := sortedListFromMap(missingColumns) - log.WithFields(logrus.Fields{ - "requestedColumns": requestedColumnsList, - "missingColumns": missingColumnsList, - }).Warning("Failed to sample some requested columns") + // Ramdomize all columns. + columns := randomSlice(columnsCount) + + // Define the first column to sample. + missingColumnsCount := uint64(0) + + firstColumnToSample, extendedSampleCount := uint64(0), peerdas.ExtendedSampleCount(sampleCount, 0) + + for i := 1; ; i++ { + if extendedSampleCount > columnsCount { + // We already tried to sample all columns, this is the unhappy path. + log.WithField("root", fmt.Sprintf("%#x", root)).Warning("Some columns are still missing after sampling all possible columns") + return nil + } + + columnsToSample := columns[firstColumnToSample:extendedSampleCount] + columnsToSampleCount := extendedSampleCount - firstColumnToSample + + retrievedSampleCount, err := s.sampleDataColumnsFromPeers(columnsToSample, root) + if err != nil { + return errors.Wrap(err, "sample data columns from peers") + } + + if retrievedSampleCount == columnsToSampleCount { + // All columns were correctly sampled, this is the happy path. + log.WithFields(logrus.Fields{ + "root": fmt.Sprintf("%#x", root), + "roundsNeeded": i, + }).Debug("All columns were successfully sampled") + return nil + } - return nil + if retrievedSampleCount > columnsToSampleCount { + // This should never happen. + return errors.New("retrieved more columns than requested") + } + + // Some columns are missing, we need to extend the sample size. + missingColumnsCount += columnsToSampleCount - retrievedSampleCount + + firstColumnToSample = extendedSampleCount + oldExtendedSampleCount := extendedSampleCount + extendedSampleCount = peerdas.ExtendedSampleCount(sampleCount, missingColumnsCount) + + log.WithFields(logrus.Fields{ + "root": fmt.Sprintf("%#x", root), + "round": i, + "missingColumnsCount": missingColumnsCount, + "currentSampleCount": oldExtendedSampleCount, + "nextSampleCount": extendedSampleCount, + }).Debug("Some columns are still missing after sampling this round.") + } } -func (s *Service) dataColumnSampling(ctx context.Context) { +// DataColumnSamplingLoop runs incremental DAS on block when received. +func (s *Service) DataColumnSamplingLoop(ctx context.Context) { // Create a subscription to the state feed. stateChannel := make(chan *feed.Event, 1) stateSub := s.cfg.stateNotifier.StateFeed().Subscribe(stateChannel) @@ -250,10 +315,8 @@ func (s *Service) dataColumnSampling(ctx context.Context) { continue } - dataColumnSamplingCount := params.BeaconConfig().SamplesPerSlot - - // Sample data columns. - if err := s.sampleDataColumns(data.BlockRoot, dataColumnSamplingCount); err != nil { + // Sample data columns with incremental DAS. + if err := s.incrementalDAS(data.BlockRoot, params.BeaconConfig().SamplesPerSlot); err != nil { log.WithError(err).Error("Failed to sample data columns") } diff --git a/beacon-chain/sync/rpc_data_column_sidecars_by_root.go b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go index 241fd8185f29..1e80258e1a65 100644 --- a/beacon-chain/sync/rpc_data_column_sidecars_by_root.go +++ b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go @@ -37,13 +37,13 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int // We use the same type as for blobs as they are the same data structure. // TODO: Make the type naming more generic to be extensible to data columns - ref, ok := msg.(*types.BlobSidecarsByRootReq) + ref, ok := msg.(*types.DataColumnSidecarsByRootReq) if !ok { - return errors.New("message is not type BlobSidecarsByRootReq") + return errors.New("message is not type DataColumnSidecarsByRootReq") } requestedColumnIdents := *ref - if err := validateDataColummnsByRootRequest(requestedColumnIdents); err != nil { + if err := validateDataColumnsByRootRequest(requestedColumnIdents); err != nil { s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream) return errors.Wrap(err, "validate data columns by root request") @@ -54,7 +54,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int requestedColumnsList := make([]uint64, 0, len(requestedColumnIdents)) for _, ident := range requestedColumnIdents { - requestedColumnsList = append(requestedColumnsList, ident.Index) + requestedColumnsList = append(requestedColumnsList, ident.ColumnIndex) } // TODO: Customize data column batches too @@ -127,7 +127,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int } s.rateLimiter.add(stream, 1) - requestedRoot, requestedIndex := bytesutil.ToBytes32(requestedColumnIdents[i].BlockRoot), requestedColumnIdents[i].Index + requestedRoot, requestedIndex := bytesutil.ToBytes32(requestedColumnIdents[i].BlockRoot), requestedColumnIdents[i].ColumnIndex // Decrease the peer's score if it requests a column that is not custodied. isCustodied := custodiedColumns[requestedIndex] @@ -207,7 +207,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int return nil } -func validateDataColummnsByRootRequest(colIdents types.BlobSidecarsByRootReq) error { +func validateDataColumnsByRootRequest(colIdents types.DataColumnSidecarsByRootReq) error { if uint64(len(colIdents)) > params.BeaconConfig().MaxRequestDataColumnSidecars { return types.ErrMaxDataColumnReqExceeded } diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 7e195fc2fea5..26728142858d 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -254,7 +254,7 @@ func (s *Service) Start() { // Run data column sampling if features.Get().EnablePeerDAS { - go s.dataColumnSampling(s.ctx) + go s.DataColumnSamplingLoop(s.ctx) } }