Skip to content

Commit

Permalink
PeerDAS: add data column batch config (#14122)
Browse files Browse the repository at this point in the history
  • Loading branch information
0x00101010 committed Jun 21, 2024
1 parent 40924e5 commit 11f56ca
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 20 deletions.
16 changes: 11 additions & 5 deletions beacon-chain/sync/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import (

"github.com/libp2p/go-libp2p/core/network"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/trailofbits/go-mutexasserts"

"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/params"
leakybucket "github.com/prysmaticlabs/prysm/v5/container/leaky-bucket"
"github.com/sirupsen/logrus"
"github.com/trailofbits/go-mutexasserts"
)

const defaultBurstLimit = 5
Expand Down Expand Up @@ -43,9 +44,13 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter {
allowedBlocksPerSecond := float64(flags.Get().BlockBatchLimit)
allowedBlocksBurst := int64(flags.Get().BlockBatchLimitBurstFactor * flags.Get().BlockBatchLimit)

// Initialize blob limits.
allowedBlobsPerSecond := float64(flags.Get().BlobBatchLimit)
allowedBlobsBurst := int64(flags.Get().BlobBatchLimitBurstFactor * flags.Get().BlobBatchLimit)

// Initialize data column limits.
allowedDataColumnsPerSecond := float64(flags.Get().BlobBatchLimit * int(params.BeaconConfig().CustodyRequirement))
allowedDataColumnsBurst := int64(flags.Get().BlobBatchLimitBurstFactor * flags.Get().BlobBatchLimit * int(params.BeaconConfig().CustodyRequirement))
allowedDataColumnsPerSecond := float64(flags.Get().DataColumnBatchLimit * int(params.BeaconConfig().CustodyRequirement))
allowedDataColumnsBurst := int64(flags.Get().DataColumnBatchLimitBurstFactor * flags.Get().BlobBatchLimit * int(params.BeaconConfig().CustodyRequirement))

// Set topic map for all rpc topics.
topicMap := make(map[string]*leakybucket.Collector, len(p2p.RPCTopicMappings))
Expand All @@ -65,7 +70,8 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter {
blockCollectorV2 := leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, blockBucketPeriod, false /* deleteEmptyBuckets */)

// for BlobSidecarsByRoot and BlobSidecarsByRange
blobCollector := leakybucket.NewCollector(allowedDataColumnsPerSecond, allowedDataColumnsBurst, blockBucketPeriod, false)
blobCollector := leakybucket.NewCollector(allowedBlobsPerSecond, allowedBlobsBurst, blockBucketPeriod, false)

// for DataColumnSidecarsByRoot and DataColumnSidecarsByRange
columnCollector := leakybucket.NewCollector(allowedDataColumnsPerSecond, allowedDataColumnsBurst, blockBucketPeriod, false)

Expand Down
3 changes: 1 addition & 2 deletions beacon-chain/sync/rpc_data_column_sidecars_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
requestedColumnsList = append(requestedColumnsList, ident.ColumnIndex)
}

// TODO: Customize data column batches too
batchSize := flags.Get().BlobBatchLimit
batchSize := flags.Get().DataColumnBatchLimit
var ticker *time.Ticker
if len(requestedColumnIdents) > batchSize {
ticker = time.NewTicker(time.Second)
Expand Down
16 changes: 15 additions & 1 deletion cmd/beacon-chain/flags/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
package flags

import (
"github.com/urfave/cli/v2"

"github.com/prysmaticlabs/prysm/v5/cmd"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/urfave/cli/v2"
)

var (
Expand Down Expand Up @@ -174,6 +175,19 @@ var (
Usage: "The factor by which blob batch limit may increase on burst.",
Value: 2,
}
// DataColumnBatchLimit specifies the requested data column batch size.
DataColumnBatchLimit = &cli.IntFlag{
Name: "data-column-batch-limit",
Usage: "The amount of data columns the local peer is bounded to request and respond to in a batch.",
// TODO: determine a good default value for this flag.
Value: 128,
}
// DataColumnBatchLimitBurstFactor specifies the factor by which data column batch size may increase.
DataColumnBatchLimitBurstFactor = &cli.IntFlag{
Name: "data-column-batch-limit-burst-factor",
Usage: "The factor by which data column batch limit may increase on burst.",
Value: 2,
}
// DisableDebugRPCEndpoints disables the debug Beacon API namespace.
DisableDebugRPCEndpoints = &cli.BoolFlag{
Name: "disable-debug-rpc-endpoints",
Expand Down
23 changes: 14 additions & 9 deletions cmd/beacon-chain/flags/config.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
package flags

import (
"github.com/prysmaticlabs/prysm/v5/cmd"
"github.com/urfave/cli/v2"

"github.com/prysmaticlabs/prysm/v5/cmd"
)

// GlobalFlags specifies all the global flags for the
// beacon node.
type GlobalFlags struct {
SubscribeToAllSubnets bool
MinimumSyncPeers int
MinimumPeersPerSubnet int
MaxConcurrentDials int
BlockBatchLimit int
BlockBatchLimitBurstFactor int
BlobBatchLimit int
BlobBatchLimitBurstFactor int
SubscribeToAllSubnets bool
MinimumSyncPeers int
MinimumPeersPerSubnet int
MaxConcurrentDials int
BlockBatchLimit int
BlockBatchLimitBurstFactor int
BlobBatchLimit int
BlobBatchLimitBurstFactor int
DataColumnBatchLimit int
DataColumnBatchLimitBurstFactor int
}

var globalConfig *GlobalFlags
Expand Down Expand Up @@ -45,6 +48,8 @@ func ConfigureGlobalFlags(ctx *cli.Context) {
cfg.BlockBatchLimitBurstFactor = ctx.Int(BlockBatchLimitBurstFactor.Name)
cfg.BlobBatchLimit = ctx.Int(BlobBatchLimit.Name)
cfg.BlobBatchLimitBurstFactor = ctx.Int(BlobBatchLimitBurstFactor.Name)
cfg.DataColumnBatchLimit = ctx.Int(DataColumnBatchLimit.Name)
cfg.DataColumnBatchLimitBurstFactor = ctx.Int(DataColumnBatchLimitBurstFactor.Name)
cfg.MinimumPeersPerSubnet = ctx.Int(MinPeersPerSubnet.Name)
cfg.MaxConcurrentDials = ctx.Int(MaxConcurrentDials.Name)
configureMinimumPeers(ctx, cfg)
Expand Down
7 changes: 5 additions & 2 deletions cmd/beacon-chain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
golog "github.com/ipfs/go-log/v2"
joonix "github.com/joonix/log"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"

"github.com/prysmaticlabs/prysm/v5/beacon-chain/builder"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/node"
"github.com/prysmaticlabs/prysm/v5/cmd"
Expand All @@ -35,8 +38,6 @@ import (
_ "github.com/prysmaticlabs/prysm/v5/runtime/maxprocs"
"github.com/prysmaticlabs/prysm/v5/runtime/tos"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
)

var appFlags = []cli.Flag{
Expand All @@ -60,6 +61,8 @@ var appFlags = []cli.Flag{
flags.BlockBatchLimitBurstFactor,
flags.BlobBatchLimit,
flags.BlobBatchLimitBurstFactor,
flags.DataColumnBatchLimit,
flags.DataColumnBatchLimitBurstFactor,
flags.InteropMockEth1DataVotesFlag,
flags.InteropNumValidatorsFlag,
flags.InteropGenesisTimeFlag,
Expand Down
5 changes: 4 additions & 1 deletion cmd/beacon-chain/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"io"
"sort"

"github.com/urfave/cli/v2"

"github.com/prysmaticlabs/prysm/v5/cmd"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/storage"
Expand All @@ -13,7 +15,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/sync/genesis"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/runtime/debug"
"github.com/urfave/cli/v2"
)

var appHelpTemplate = `NAME:
Expand Down Expand Up @@ -116,6 +117,8 @@ var appHelpFlagGroups = []flagGroup{
flags.BlockBatchLimitBurstFactor,
flags.BlobBatchLimit,
flags.BlobBatchLimitBurstFactor,
flags.DataColumnBatchLimit,
flags.DataColumnBatchLimitBurstFactor,
flags.DisableDebugRPCEndpoints,
flags.SubscribeToAllSubnets,
flags.HistoricalSlasherNode,
Expand Down
3 changes: 3 additions & 0 deletions testing/endtoend/components/beacon_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/bazelbuild/rules_go/go/tools/bazel"
"github.com/pkg/errors"

"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
cmdshared "github.com/prysmaticlabs/prysm/v5/cmd"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
Expand Down Expand Up @@ -270,6 +271,8 @@ func (node *BeaconNode) Start(ctx context.Context) error {
fmt.Sprintf("--%s=%d", flags.BlockBatchLimitBurstFactor.Name, 8),
fmt.Sprintf("--%s=%d", flags.BlobBatchLimitBurstFactor.Name, 16),
fmt.Sprintf("--%s=%d", flags.BlobBatchLimit.Name, 256),
fmt.Sprintf("--%s=%d", flags.DataColumnBatchLimit.Name, 128),
fmt.Sprintf("--%s=%d", flags.DataColumnBatchLimitBurstFactor.Name, 2),
fmt.Sprintf("--%s=%s", cmdshared.ChainConfigFileFlag.Name, cfgPath),
"--" + cmdshared.ValidatorMonitorIndicesFlag.Name + "=1",
"--" + cmdshared.ValidatorMonitorIndicesFlag.Name + "=2",
Expand Down

0 comments on commit 11f56ca

Please sign in to comment.