From 92a1bfe80c8614092a631fd6e0da245c09b14c7a Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Thu, 20 Jun 2024 19:05:09 +0200 Subject: [PATCH] `incrementalDAS`: Rework and add tests. --- beacon-chain/sync/BUILD.bazel | 5 + beacon-chain/sync/data_columns_sampling.go | 149 +++++++++--- .../sync/data_columns_sampling_test.go | 224 ++++++++++++++++++ 3 files changed, 341 insertions(+), 37 deletions(-) create mode 100644 beacon-chain/sync/data_columns_sampling_test.go diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 346bdfaaff7..611a1c13005 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -158,6 +158,7 @@ go_test( "block_batcher_test.go", "broadcast_bls_changes_test.go", "context_test.go", + "data_columns_sampling_test.go", "decode_pubsub_test.go", "error_test.go", "fork_watcher_test.go", @@ -205,6 +206,7 @@ go_test( "//beacon-chain/core/feed:go_default_library", "//beacon-chain/core/feed/operation:go_default_library", "//beacon-chain/core/helpers:go_default_library", + "//beacon-chain/core/peerdas:go_default_library", "//beacon-chain/core/signing:go_default_library", "//beacon-chain/core/time:go_default_library", "//beacon-chain/core/transition:go_default_library", @@ -249,6 +251,7 @@ go_test( "//proto/prysm/v1alpha1:go_default_library", "//proto/prysm/v1alpha1/attestation:go_default_library", "//proto/prysm/v1alpha1/metadata:go_default_library", + "//runtime/version:go_default_library", "//testing/assert:go_default_library", "//testing/require:go_default_library", "//testing/util:go_default_library", @@ -260,9 +263,11 @@ go_test( "@com_github_ethereum_go_ethereum//p2p/enr:go_default_library", "@com_github_golang_snappy//:go_default_library", "@com_github_libp2p_go_libp2p//core:go_default_library", + "@com_github_libp2p_go_libp2p//core/crypto:go_default_library", "@com_github_libp2p_go_libp2p//core/network:go_default_library", "@com_github_libp2p_go_libp2p//core/peer:go_default_library", "@com_github_libp2p_go_libp2p//core/protocol:go_default_library", + "@com_github_libp2p_go_libp2p//p2p/net/swarm/testing:go_default_library", "@com_github_libp2p_go_libp2p_pubsub//:go_default_library", "@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library", "@com_github_patrickmn_go_cache//:go_default_library", diff --git a/beacon-chain/sync/data_columns_sampling.go b/beacon-chain/sync/data_columns_sampling.go index b8289ae8fb5..e50d5073d2e 100644 --- a/beacon-chain/sync/data_columns_sampling.go +++ b/beacon-chain/sync/data_columns_sampling.go @@ -21,21 +21,25 @@ import ( "github.com/prysmaticlabs/prysm/v5/runtime/version" ) -// randomSlice returns a slice of `count` random integers in the range [0, count[. -// Each item is unique. -func randomSlice(count uint64) []uint64 { - slice := make([]uint64, count) +type roundSummary struct { + RequestedColumns []uint64 + MissingColumns map[uint64]bool +} - for i := uint64(0); i < count; i++ { - slice[i] = i +// randomizeColumns returns a slice containing all columns in a random order. +func randomizeColumns(columns map[uint64]bool) []uint64 { + // Create a slice from columns. + randomized := make([]uint64, 0, len(columns)) + for column := range columns { + randomized = append(randomized, column) } // Shuffle the slice. - rand.NewGenerator().Shuffle(len(slice), func(i, j int) { - slice[i], slice[j] = slice[j], slice[i] + rand.NewGenerator().Shuffle(len(randomized), func(i, j int) { + randomized[i], randomized[j] = randomized[j], randomized[i] }) - return slice + return randomized } // sortedSliceFromMap returns a sorted slices of keys from a map. @@ -160,7 +164,7 @@ func (s *Service) sampleDataColumnsFromPeer( func (s *Service) sampleDataColumnsFromPeers( columnsToSample []uint64, root [fieldparams.RootLength]byte, -) (uint64, error) { +) (map[uint64]bool, error) { // Build all remaining columns to sample. remainingColumnsToSample := make(map[uint64]bool, len(columnsToSample)) for _, column := range columnsToSample { @@ -170,9 +174,9 @@ func (s *Service) sampleDataColumnsFromPeers( // Get the active peers from the p2p service. activePids := s.cfg.p2p.Peers().Active() - // Query all peers until either all columns to request are retrieved or all active peers are queried (whichever comes first). - retrievedColumnsCount := 0 + retrievedColumns := make(map[uint64]bool, len(columnsToSample)) + // Query all peers until either all columns to request are retrieved or all active peers are queried (whichever comes first). for i := 0; len(remainingColumnsToSample) > 0 && i < len(activePids); i++ { // Get the peer ID. pid := activePids[i] @@ -180,7 +184,7 @@ func (s *Service) sampleDataColumnsFromPeers( // Get the custody columns of the peer. peerCustodyColumns, err := s.custodyColumnsFromPeer(pid) if err != nil { - return 0, errors.Wrap(err, "custody columns from peer") + return nil, errors.Wrap(err, "custody columns from peer") } // Compute the intersection of the peer custody columns and the remaining columns to request. @@ -199,57 +203,74 @@ func (s *Service) sampleDataColumnsFromPeers( // Sample data columns from the peer. peerRetrievedColumns, err := s.sampleDataColumnsFromPeer(pid, peerRequestedColumns, root) if err != nil { - return 0, errors.Wrap(err, "sample data columns from peer") + return nil, errors.Wrap(err, "sample data columns from peer") } // Update the retrieved columns. - retrievedColumnsCount += len(peerRetrievedColumns) + for column := range peerRetrievedColumns { + retrievedColumns[column] = true + } } - return uint64(retrievedColumnsCount), nil + return retrievedColumns, nil } // incrementalDAS samples data columns from active peers using incremental DAS. // https://ethresear.ch/t/lossydas-lossy-incremental-and-diagonal-sampling-for-data-availability/18963#incrementaldas-dynamically-increase-the-sample-size-10 -func (s *Service) incrementalDAS(root [fieldparams.RootLength]byte, sampleCount uint64) error { - // Retrieve the number of columns. - columnsCount := params.BeaconConfig().NumberOfColumns - - // Ramdomize all columns. - columns := randomSlice(columnsCount) - - // Define the first column to sample. - missingColumnsCount := uint64(0) - +func (s *Service) incrementalDAS( + root [fieldparams.RootLength]byte, + columns []uint64, + sampleCount uint64, +) (bool, []roundSummary, error) { + columnsCount, missingColumnsCount := uint64(len(columns)), uint64(0) firstColumnToSample, extendedSampleCount := uint64(0), peerdas.ExtendedSampleCount(sampleCount, 0) - for i := 1; ; i++ { + roundSummaries := make([]roundSummary, 0, 1) // We optimistically allocate only one round summary. + + for round := 1; ; /*No exit condition */ round++ { if extendedSampleCount > columnsCount { - // We already tried to sample all columns, this is the unhappy path. + // We already tried to sample all possible columns, this is the unhappy path. log.WithField("root", fmt.Sprintf("%#x", root)).Warning("Some columns are still missing after sampling all possible columns") - return nil + return false, roundSummaries, nil } + // Get the columns to sample for this round. columnsToSample := columns[firstColumnToSample:extendedSampleCount] columnsToSampleCount := extendedSampleCount - firstColumnToSample - retrievedSampleCount, err := s.sampleDataColumnsFromPeers(columnsToSample, root) + // Sample the data columns from the peers. + retrievedSamples, err := s.sampleDataColumnsFromPeers(columnsToSample, root) if err != nil { - return errors.Wrap(err, "sample data columns from peers") + return false, nil, errors.Wrap(err, "sample data columns from peers") + } + + // Compute the missing samples. + missingSamples := make(map[uint64]bool, max(0, len(columnsToSample)-len(retrievedSamples))) + for _, column := range columnsToSample { + if !retrievedSamples[column] { + missingSamples[column] = true + } } + roundSummaries = append(roundSummaries, roundSummary{ + RequestedColumns: columnsToSample, + MissingColumns: missingSamples, + }) + + retrievedSampleCount := uint64(len(retrievedSamples)) + if retrievedSampleCount == columnsToSampleCount { // All columns were correctly sampled, this is the happy path. log.WithFields(logrus.Fields{ "root": fmt.Sprintf("%#x", root), - "roundsNeeded": i, + "roundsNeeded": round, }).Debug("All columns were successfully sampled") - return nil + return true, roundSummaries, nil } if retrievedSampleCount > columnsToSampleCount { // This should never happen. - return errors.New("retrieved more columns than requested") + return false, nil, errors.New("retrieved more columns than requested") } // Some columns are missing, we need to extend the sample size. @@ -261,7 +282,7 @@ func (s *Service) incrementalDAS(root [fieldparams.RootLength]byte, sampleCount log.WithFields(logrus.Fields{ "root": fmt.Sprintf("%#x", root), - "round": i, + "round": round, "missingColumnsCount": missingColumnsCount, "currentSampleCount": oldExtendedSampleCount, "nextSampleCount": extendedSampleCount, @@ -271,6 +292,9 @@ func (s *Service) incrementalDAS(root [fieldparams.RootLength]byte, sampleCount // DataColumnSamplingRoutine runs incremental DAS on block when received. func (s *Service) DataColumnSamplingRoutine(ctx context.Context) { + // Get the custody subnets count. + custodySubnetsCount := peerdas.CustodySubnetCount() + // Create a subscription to the state feed. stateChannel := make(chan *feed.Event, 1) stateSub := s.cfg.stateNotifier.StateFeed().Subscribe(stateChannel) @@ -278,6 +302,39 @@ func (s *Service) DataColumnSamplingRoutine(ctx context.Context) { // Unsubscribe from the state feed when the function returns. defer stateSub.Unsubscribe() + // Retrieve the number of columns. + columnsCount := params.BeaconConfig().NumberOfColumns + + // Retrieve all columns we custody. + custodyColumns, err := peerdas.CustodyColumns(s.cfg.p2p.NodeID(), custodySubnetsCount) + if err != nil { + log.WithError(err).Error("Failed to get custody columns") + return + } + + custodyColumnsCount := uint64(len(custodyColumns)) + + // Compute the number of columns to sample. + if custodyColumnsCount >= columnsCount/2 { + log.WithFields(logrus.Fields{ + "custodyColumnsCount": custodyColumnsCount, + "columnsCount": columnsCount, + }).Debug("The node custodies at least the half the data columns, no need to sample") + return + } + + samplesCount := min(params.BeaconConfig().SamplesPerSlot, columnsCount/2-custodyColumnsCount) + + // Compute all the columns we do NOT custody. + nonCustodyColums := make(map[uint64]bool, columnsCount-custodyColumnsCount) + for i := uint64(0); i < columnsCount; i++ { + if !custodyColumns[i] { + nonCustodyColums[i] = true + } + } + + log.Warning("CCCCCC") + for { select { case e := <-stateChannel: @@ -315,9 +372,27 @@ func (s *Service) DataColumnSamplingRoutine(ctx context.Context) { continue } + // Ramdomize all columns. + randomizedColumns := randomizeColumns(nonCustodyColums) + // Sample data columns with incremental DAS. - if err := s.incrementalDAS(data.BlockRoot, params.BeaconConfig().SamplesPerSlot); err != nil { - log.WithError(err).Error("Failed to sample data columns") + ok, _, err = s.incrementalDAS(data.BlockRoot, randomizedColumns, samplesCount) + if err != nil { + log.WithError(err).Error("Error during incremental DAS") + } + + if ok { + log.WithFields(logrus.Fields{ + "root": fmt.Sprintf("%#x", data.BlockRoot), + "columns": randomizedColumns, + "sampleCount": samplesCount, + }).Debug("Data column sampling successful") + } else { + log.WithFields(logrus.Fields{ + "root": fmt.Sprintf("%#x", data.BlockRoot), + "columns": randomizedColumns, + "sampleCount": samplesCount, + }).Warning("Data column sampling failed") } case <-s.ctx.Done(): diff --git a/beacon-chain/sync/data_columns_sampling_test.go b/beacon-chain/sync/data_columns_sampling_test.go new file mode 100644 index 00000000000..436cb3068d1 --- /dev/null +++ b/beacon-chain/sync/data_columns_sampling_test.go @@ -0,0 +1,224 @@ +package sync + +import ( + "context" + "testing" + + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/network" + swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" + mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers" + p2ptest "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing" + p2pTypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" + fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" + ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v5/runtime/version" + "github.com/prysmaticlabs/prysm/v5/testing/require" +) + +func TestRandomizeColumns(t *testing.T) { + const count uint64 = 128 + + // Generate columns. + columns := make(map[uint64]bool, count) + for i := uint64(0); i < count; i++ { + columns[i] = true + } + + // Randomize columns. + randomizedColumns := randomizeColumns(columns) + + // Convert back to a map. + randomizedColumnsMap := make(map[uint64]bool, count) + for _, column := range randomizedColumns { + randomizedColumnsMap[column] = true + } + + // Check duplicates and missing columns. + require.Equal(t, len(columns), len(randomizedColumnsMap)) + + // Check the values. + for column := range randomizedColumnsMap { + require.Equal(t, true, column < count) + } +} + +// createAndConnectPeer creates a peer with a private key `offset` fixed. +// The peer is added and connected to `p2pService` +func createAndConnectPeer( + t *testing.T, + p2pService *p2ptest.TestP2P, + chainService *mock.ChainService, + header *ethpb.BeaconBlockHeader, + custodyCount uint64, + columnsNotToRespond map[uint64]bool, + offset int, +) { + emptyRoot := [fieldparams.RootLength]byte{} + emptySignature := [fieldparams.BLSSignatureLength]byte{} + emptyKzgCommitmentInclusionProof := [4][]byte{ + emptyRoot[:], emptyRoot[:], emptyRoot[:], emptyRoot[:], + } + + // Create the private key, depending on the offset. + privateKeyBytes := make([]byte, 32) + for i := 0; i < 32; i++ { + privateKeyBytes[i] = byte(offset + i) + } + + privateKey, err := crypto.UnmarshalSecp256k1PrivateKey(privateKeyBytes) + require.NoError(t, err) + + // Create the peer. + peer := p2ptest.NewTestP2P(t, swarmt.OptPeerPrivateKey(privateKey)) + + // TODO: Do not hardcode the topic. + peer.SetStreamHandler("/eth2/beacon_chain/req/data_column_sidecars_by_root/1/ssz_snappy", func(stream network.Stream) { + // Decode the request. + req := new(p2pTypes.DataColumnSidecarsByRootReq) + err := peer.Encoding().DecodeWithMaxLength(stream, req) + require.NoError(t, err) + + for _, identifier := range *req { + // Filter out the columns not to respond. + if columnsNotToRespond[identifier.ColumnIndex] { + continue + } + + // Create the response. + resp := ethpb.DataColumnSidecar{ + ColumnIndex: identifier.ColumnIndex, + SignedBlockHeader: ðpb.SignedBeaconBlockHeader{ + Header: header, + Signature: emptySignature[:], + }, + KzgCommitmentsInclusionProof: emptyKzgCommitmentInclusionProof[:], + } + + // Send the response. + err := WriteDataColumnSidecarChunk(stream, chainService, p2pService.Encoding(), &resp) + require.NoError(t, err) + } + + // Close the stream. + closeStream(stream, log) + }) + + // Create the record and set the custody count. + enr := &enr.Record{} + enr.Set(peerdas.Csc(custodyCount)) + + // Add the peer and connect it. + p2pService.Peers().Add(enr, peer.PeerID(), nil, network.DirOutbound) + p2pService.Peers().SetConnectionState(peer.PeerID(), peers.PeerConnected) + p2pService.Connect(peer) +} + +func TestIncrementalDAS(t *testing.T) { + const custodyRequirement uint64 = 1 + + emptyRoot := [fieldparams.RootLength]byte{} + emptyHeader := ðpb.BeaconBlockHeader{ + ParentRoot: emptyRoot[:], + StateRoot: emptyRoot[:], + BodyRoot: emptyRoot[:], + } + + emptyHeaderRoot, err := emptyHeader.HashTreeRoot() + require.NoError(t, err) + + testCases := []struct { + name string + samplesCount uint64 + possibleColumnsToRequest []uint64 + columnsNotToRespond map[uint64]bool + expectedSuccess bool + expectedRoundSummaries []roundSummary + }{ + { + name: "All columns are correctly sampled in a single round", + samplesCount: 5, + possibleColumnsToRequest: []uint64{70, 35, 99, 6, 38, 3, 67, 102, 12, 44, 76, 108}, + columnsNotToRespond: map[uint64]bool{}, + expectedSuccess: true, + expectedRoundSummaries: []roundSummary{ + { + RequestedColumns: []uint64{70, 35, 99, 6, 38}, + MissingColumns: map[uint64]bool{}, + }, + }, + }, + { + name: "Two missing columns in the first round, ok in the second round", + samplesCount: 5, + possibleColumnsToRequest: []uint64{70, 35, 99, 6, 38, 3, 67, 102, 12, 44, 76, 108}, + columnsNotToRespond: map[uint64]bool{6: true, 70: true}, + expectedSuccess: true, + expectedRoundSummaries: []roundSummary{ + { + RequestedColumns: []uint64{70, 35, 99, 6, 38}, + MissingColumns: map[uint64]bool{70: true, 6: true}, + }, + { + RequestedColumns: []uint64{3, 67, 102, 12, 44, 76}, + MissingColumns: map[uint64]bool{}, + }, + }, + }, + { + name: "Two missing columns in the first round, one missing in the second round. Fail to sample.", + samplesCount: 5, + possibleColumnsToRequest: []uint64{70, 35, 99, 6, 38, 3, 67, 102, 12, 44, 76, 108}, + columnsNotToRespond: map[uint64]bool{6: true, 70: true, 3: true}, + expectedSuccess: false, + expectedRoundSummaries: []roundSummary{ + { + RequestedColumns: []uint64{70, 35, 99, 6, 38}, + MissingColumns: map[uint64]bool{70: true, 6: true}, + }, + { + RequestedColumns: []uint64{3, 67, 102, 12, 44, 76}, + MissingColumns: map[uint64]bool{3: true}, + }, + }, + }, + } + + for _, tc := range testCases { + // Create a context. + ctx := context.Background() + + // Create the p2p service. + p2pService := p2ptest.NewTestP2P(t) + + // Create a peer custodying `custodyRequirement` subnets. + chainService, clock := defaultMockChain(t) + + // Custody columns: [6, 38, 70, 102] + createAndConnectPeer(t, p2pService, chainService, emptyHeader, custodyRequirement, tc.columnsNotToRespond, 1) + + // Custody columns: [3, 35, 67, 99] + createAndConnectPeer(t, p2pService, chainService, emptyHeader, custodyRequirement, tc.columnsNotToRespond, 2) + + // Custody columns: [12, 44, 76, 108] + createAndConnectPeer(t, p2pService, chainService, emptyHeader, custodyRequirement, tc.columnsNotToRespond, 3) + + service := &Service{ + cfg: &config{ + p2p: p2pService, + clock: clock, + }, + ctx: ctx, + ctxMap: map[[4]byte]int{{245, 165, 253, 66}: version.Deneb}, + } + + actualSuccess, actualRoundSummaries, err := service.incrementalDAS(emptyHeaderRoot, tc.possibleColumnsToRequest, tc.samplesCount) + + require.NoError(t, err) + require.Equal(t, tc.expectedSuccess, actualSuccess) + require.DeepEqual(t, tc.expectedRoundSummaries, actualRoundSummaries) + } +}