diff --git a/sei-db/state_db/bench/cryptosim/block.go b/sei-db/state_db/bench/cryptosim/block.go new file mode 100644 index 0000000000..a7833f957d --- /dev/null +++ b/sei-db/state_db/bench/cryptosim/block.go @@ -0,0 +1,94 @@ +package cryptosim + +import "iter" + +// A simulated block of transactions. +type block struct { + config *CryptoSimConfig + + // The transactions in the block. + transactions []*transaction + + // The block number. This is not currently preserved across benchmark restarts, but otherwise monotonically + // increases as you'd expect. + blockNumber int64 + + // The next account ID to be used when creating a new account, as of the end of this block. + nextAccountID int64 + + // The number of cold accounts, as of the end of this block. + numberOfColdAccounts int64 + + // The next ERC20 contract ID to be used when creating a new ERC20 contract, as of the end of this block. + nextErc20ContractID int64 + + metrics *CryptosimMetrics +} + +// Creates a new block with the given capacity. +func NewBlock( + config *CryptoSimConfig, + metrics *CryptosimMetrics, + blockNumber int64, + capacity int, +) *block { + return &block{ + config: config, + blockNumber: blockNumber, + transactions: make([]*transaction, 0, capacity), + metrics: metrics, + } +} + +// Returns an iterator over the transactions in the block. +func (b *block) Iterator() iter.Seq[*transaction] { + return func(yield func(*transaction) bool) { + for _, txn := range b.transactions { + if !yield(txn) { + return + } + } + } +} + +// Adds a transaction to the block. +func (b *block) AddTransaction(txn *transaction) { + b.transactions = append(b.transactions, txn) +} + +// Returns the block number. +func (b *block) BlockNumber() int64 { + return b.blockNumber +} + +// Sets information about account state as of the end of this block. +func (b *block) SetBlockAccountStats( + nextAccountID int64, + numberOfColdAccounts int64, + nextErc20ContractID int64, +) { + b.nextAccountID = nextAccountID + b.numberOfColdAccounts = numberOfColdAccounts + b.nextErc20ContractID = nextErc20ContractID +} + +// This method should be called after a block is finished executing and finalized. +// Reports metrics about the block. +func (b *block) ReportBlockMetrics() { + b.metrics.SetTotalNumberOfAccounts(b.nextAccountID, int64(b.config.NumberOfHotAccounts), b.numberOfColdAccounts) +} + +// Returns the next account ID to be used when creating a new account, as of the end of this block. +func (b *block) NextAccountID() int64 { + return b.nextAccountID +} + +// Returns the next ERC20 contract ID to be used when creating a new ERC20 contract, as of the end of this block. +func (b *block) NextErc20ContractID() int64 { + return b.nextErc20ContractID +} + +// Returns the number of transactions in the block. +func (b *block) TransactionCount() int64 { + return int64(len(b.transactions)) +} diff --git a/sei-db/state_db/bench/cryptosim/block_builder.go b/sei-db/state_db/bench/cryptosim/block_builder.go new file mode 100644 index 0000000000..eb36b5d281 --- /dev/null +++ b/sei-db/state_db/bench/cryptosim/block_builder.go @@ -0,0 +1,82 @@ +package cryptosim + +import ( + "context" + "fmt" +) + +// A builder for blocks of transactions. +type blockBuilder struct { + ctx context.Context + + config *CryptoSimConfig + + // Metrics for the benchmark. + metrics *CryptosimMetrics + + // Produces random data. + dataGenerator *DataGenerator + + // Blocks are sent to this channel. + blocksChan chan *block + + // The next block number to be used. + nextBlockNumber int64 +} + +// Asyncronously produces blocks of transactions. +func NewBlockBuilder( + ctx context.Context, + config *CryptoSimConfig, + metrics *CryptosimMetrics, + dataGenerator *DataGenerator, +) *blockBuilder { + return &blockBuilder{ + ctx: ctx, + config: config, + metrics: metrics, + dataGenerator: dataGenerator, + blocksChan: make(chan *block, config.BlockChannelCapacity), + } +} + +// Starts the block builder. This should not be called until all other threads are done using the data generator, +// as the data generator is not thread-safe. +func (b *blockBuilder) Start() { + go b.mainLoop() +} + +// Builds blocks and sends them to the blocks channel. +func (b *blockBuilder) mainLoop() { + for { + block := b.buildBlock() + select { + case <-b.ctx.Done(): + return + case b.blocksChan <- block: + } + } +} + +func (b *blockBuilder) buildBlock() *block { + blk := NewBlock(b.config, b.metrics, b.nextBlockNumber, b.config.TransactionsPerBlock) + b.nextBlockNumber++ + + for i := 0; i < b.config.TransactionsPerBlock; i++ { + txn, err := BuildTransaction(b.dataGenerator) + if err != nil { + fmt.Printf("failed to build transaction: %v\n", err) + continue + } + blk.AddTransaction(txn) + } + + blk.SetBlockAccountStats( + b.dataGenerator.NextAccountID(), + b.dataGenerator.NumberOfColdAccounts(), + b.dataGenerator.NextErc20ContractID()) + + b.dataGenerator.ReportEndOfBlock() + + return blk +} diff --git a/sei-db/state_db/bench/cryptosim/config/basic-config.json b/sei-db/state_db/bench/cryptosim/config/basic-config.json index dea1b2a229..53cdf98724 100644 --- a/sei-db/state_db/bench/cryptosim/config/basic-config.json +++ b/sei-db/state_db/bench/cryptosim/config/basic-config.json @@ -11,7 +11,7 @@ "Erc20ContractSize": 2048, "Erc20InteractionsPerAccount": 10, "Erc20StorageSlotSize": 32, - "ExecutorQueueSize": 64, + "ExecutorQueueSize": 1024, "HotAccountProbability": 0.1, "HotErc20ContractProbability": 0.5, "HotErc20ContractSetSize": 100, @@ -29,5 +29,6 @@ "TransactionsPerBlock": 1024, "MaxRuntimeSeconds": 0, "TransactionMetricsSampleRate": 0.001, - "BackgroundMetricsScrapeInterval": 60 + "BackgroundMetricsScrapeInterval": 60, + "BlockChannelCapacity": 8 } diff --git a/sei-db/state_db/bench/cryptosim/cryptosim.go b/sei-db/state_db/bench/cryptosim/cryptosim.go index 04fc441b1a..94fdd02d3a 100644 --- a/sei-db/state_db/bench/cryptosim/cryptosim.go +++ b/sei-db/state_db/bench/cryptosim/cryptosim.go @@ -48,6 +48,9 @@ type CryptoSim struct { // The data generator for the benchmark. dataGenerator *DataGenerator + // Builds blocks of transactions. + blockBuilder *blockBuilder + // The database for the benchmark. database *Database @@ -64,6 +67,14 @@ type CryptoSim struct { // benchmark, sending "false" will resume it. Suspending an already suspended benchmark will have no effect, // and resuming an already resumed benchmark will likewise have no effect. suspendChan chan bool + + // The most recent block that has been processed. + mostRecentBlock *block + + // The next ERC20 contract ID to be used when creating a new ERC20 contract. + // This is fixed after initial setup is complete, since we don't currently simulate + // the creation of new ERC20 contracts during the benchmark. + nextERC20ContractID int64 } // Creates a new cryptosim benchmark runner. @@ -142,6 +153,8 @@ func NewCryptoSim( ctx, cancel, database, dataGenerator.FeeCollectionAddress(), config.ExecutorQueueSize, metrics) } + blockBuilder := NewBlockBuilder(ctx, config, metrics, dataGenerator) + c := &CryptoSim{ ctx: ctx, cancel: cancel, @@ -151,6 +164,7 @@ func NewCryptoSim( lastConsoleUpdateTransactionCount: 0, closeChan: make(chan struct{}, 1), dataGenerator: dataGenerator, + blockBuilder: blockBuilder, database: database, executors: executors, metrics: metrics, @@ -166,7 +180,10 @@ func NewCryptoSim( c.database.ResetTransactionCount() c.startTimestamp = time.Now() - c.metrics.StartBackgroundSampling(c.startTimestamp) + + // Now that we are done generating initial data, it is thread safe to start the block builder. + // (dataGenerator is not thread safe, and is used both for initial setup and for transaction generation) + c.blockBuilder.Start() go c.run() return c, nil @@ -210,7 +227,7 @@ func (c *CryptoSim) setupAccounts() error { if err != nil { return fmt.Errorf("failed to create new account: %w", err) } - c.database.IncrementTransactionCount() + c.database.IncrementTransactionCount(1) finalized, err := c.database.MaybeFinalizeBlock( c.dataGenerator.NextAccountID(), c.dataGenerator.NextErc20ContractID()) if err != nil { @@ -218,7 +235,7 @@ func (c *CryptoSim) setupAccounts() error { } if finalized { c.dataGenerator.ReportAccountCounts() - c.dataGenerator.ReportFinalizeBlock() + c.dataGenerator.ReportEndOfBlock() } if c.dataGenerator.NextAccountID()%c.config.SetupUpdateIntervalCount == 0 { @@ -237,7 +254,7 @@ func (c *CryptoSim) setupAccounts() error { return fmt.Errorf("failed to finalize block: %w", err) } c.dataGenerator.ReportAccountCounts() - c.dataGenerator.ReportFinalizeBlock() + c.dataGenerator.ReportEndOfBlock() fmt.Printf("There are now %s accounts in the database.\n", int64Commas(c.dataGenerator.NextAccountID())) @@ -267,7 +284,7 @@ func (c *CryptoSim) setupErc20Contracts() error { break } - c.database.IncrementTransactionCount() + c.database.IncrementTransactionCount(1) _, _, err := c.dataGenerator.CreateNewErc20Contract(c.config.Erc20ContractSize, true) if err != nil { @@ -279,7 +296,7 @@ func (c *CryptoSim) setupErc20Contracts() error { return fmt.Errorf("failed to maybe commit batch: %w", err) } if finalized { - c.dataGenerator.ReportFinalizeBlock() + c.dataGenerator.ReportEndOfBlock() c.metrics.SetTotalNumberOfERC20Contracts(c.dataGenerator.NextErc20ContractID()) } @@ -301,12 +318,14 @@ func (c *CryptoSim) setupErc20Contracts() error { if err != nil { return fmt.Errorf("failed to finalize block: %w", err) } - c.dataGenerator.ReportFinalizeBlock() + c.dataGenerator.ReportEndOfBlock() c.metrics.SetTotalNumberOfERC20Contracts(c.dataGenerator.NextErc20ContractID()) fmt.Printf("There are now %s simulated ERC20 contracts in the database.\n", int64Commas(c.dataGenerator.NextErc20ContractID())) + c.nextERC20ContractID = c.dataGenerator.NextErc20ContractID() + return nil } @@ -316,10 +335,14 @@ func (c *CryptoSim) run() { defer c.teardown() haltTime := time.Now().Add(time.Duration(c.config.MaxRuntimeSeconds) * time.Second) - - c.metrics.SetMainThreadPhase("executing") + var timeoutChan <-chan time.Time + if c.config.MaxRuntimeSeconds > 0 { + timeoutChan = time.After(time.Until(haltTime)) + } for { + c.metrics.SetMainThreadPhase("get_block") + select { case <-c.ctx.Done(): if c.database.TransactionCount() > 0 { @@ -331,55 +354,52 @@ func (c *CryptoSim) run() { if isSuspended { c.suspend() } - default: - c.handleNextCycle(haltTime) + case <-timeoutChan: + fmt.Printf("\nBenchmark timed out after %s.\n", formatDuration(time.Since(c.startTimestamp), 1)) + c.cancel() + return + case blk := <-c.blockBuilder.blocksChan: + c.handleNextBlock(blk) } + + c.generateConsoleReport(false) } } -// Process the next benchmark cycle, creating a new transaction and executing it. -func (c *CryptoSim) handleNextCycle(haltTime time.Time) { - txn, err := BuildTransaction(c.dataGenerator) - if err != nil { - fmt.Printf("\nfailed to build transaction: %v\n", err) - c.cancel() - return - } +// Execute and finalize the next block. +func (c *CryptoSim) handleNextBlock(blk *block) { + c.mostRecentBlock = blk + c.metrics.SetMainThreadPhase("send_to_executors") - c.executors[c.nextExecutorIndex].ScheduleForExecution(txn) - c.nextExecutorIndex = (c.nextExecutorIndex + 1) % len(c.executors) + c.database.IncrementTransactionCount(blk.TransactionCount()) - finalized, err := c.database.MaybeFinalizeBlock( - c.dataGenerator.NextAccountID(), c.dataGenerator.NextErc20ContractID()) - if err != nil { - fmt.Printf("error finalizing block: %v\n", err) - c.cancel() - return + for txn := range blk.Iterator() { + c.executors[c.nextExecutorIndex].ScheduleForExecution(txn) + c.nextExecutorIndex = (c.nextExecutorIndex + 1) % len(c.executors) } - if finalized { - c.dataGenerator.ReportAccountCounts() - c.dataGenerator.ReportFinalizeBlock() - if c.config.MaxRuntimeSeconds > 0 && time.Now().After(haltTime) { - c.cancel() - } + if err := c.database.FinalizeBlock(blk.NextAccountID(), blk.NextErc20ContractID(), false); err != nil { + fmt.Printf("failed to finalize block: %v\n", err) + c.cancel() + return } - - c.database.IncrementTransactionCount() - c.generateConsoleReport(false) + blk.ReportBlockMetrics() } // Suspends the benchmark. This method blocks until the benchmark is resumed or shut down. func (c *CryptoSim) suspend() { - err := c.database.FinalizeBlock(c.dataGenerator.NextAccountID(), c.dataGenerator.NextErc20ContractID(), true) - if err != nil { - fmt.Printf("failed to finalize block: %v\n", err) - c.cancel() - return + if c.mostRecentBlock != nil { + err := c.database.FinalizeBlock(c.mostRecentBlock.nextAccountID, c.nextERC20ContractID, true) + if err != nil { + fmt.Printf("failed to finalize block: %v\n", err) + c.cancel() + return + } } fmt.Printf("Benchmark suspended.\n") + c.metrics.SetMainThreadPhase("suspended") for { select { @@ -403,9 +423,16 @@ func (c *CryptoSim) suspend() { // Clean up the benchmark and release any resources. func (c *CryptoSim) teardown() { - err := c.database.Close(c.dataGenerator.NextAccountID(), c.dataGenerator.NextErc20ContractID()) - if err != nil { - fmt.Printf("failed to close database: %v\n", err) + if c.mostRecentBlock == nil { + err := c.database.CloseWithoutFinalizing() + if err != nil { + fmt.Printf("failed to close database: %v\n", err) + } + } else { + err := c.database.Close(c.mostRecentBlock.nextAccountID, c.nextERC20ContractID) + if err != nil { + fmt.Printf("failed to close database: %v\n", err) + } } c.dataGenerator.Close() diff --git a/sei-db/state_db/bench/cryptosim/cryptosim_config.go b/sei-db/state_db/bench/cryptosim/cryptosim_config.go index 0f7579bb5a..a4735a775b 100644 --- a/sei-db/state_db/bench/cryptosim/cryptosim_config.go +++ b/sei-db/state_db/bench/cryptosim/cryptosim_config.go @@ -132,6 +132,9 @@ type CryptoSimConfig struct { // If true, pressing Enter in the terminal will toggle suspend/resume of the benchmark. // If false, Enter has no effect. EnableSuspension bool + + // The capacity of the channel that holds blocks awaiting execution. + BlockChannelCapacity int } // Returns the default configuration for the cryptosim benchmark. @@ -164,12 +167,13 @@ func DefaultCryptoSimConfig() *CryptoSimConfig { SetupUpdateIntervalCount: 100_000, ThreadsPerCore: 2.0, ConstantThreadCount: 0, - ExecutorQueueSize: 64, + ExecutorQueueSize: 1024, MaxRuntimeSeconds: 0, MetricsAddr: ":9090", TransactionMetricsSampleRate: 0.001, BackgroundMetricsScrapeInterval: 60, EnableSuspension: true, + BlockChannelCapacity: 8, } } @@ -242,6 +246,9 @@ func (c *CryptoSimConfig) Validate() error { if c.BackgroundMetricsScrapeInterval < 0 { return fmt.Errorf("BackgroundMetricsScrapeInterval must be non-negative (got %d)", c.BackgroundMetricsScrapeInterval) } + if c.BlockChannelCapacity < 1 { + return fmt.Errorf("BlockChannelCapacity must be at least 1 (got %d)", c.BlockChannelCapacity) + } return nil } diff --git a/sei-db/state_db/bench/cryptosim/cryptosim_metrics.go b/sei-db/state_db/bench/cryptosim/cryptosim_metrics.go index a25ac9ebc6..c5b16408b8 100644 --- a/sei-db/state_db/bench/cryptosim/cryptosim_metrics.go +++ b/sei-db/state_db/bench/cryptosim/cryptosim_metrics.go @@ -172,20 +172,11 @@ func NewCryptosimMetrics( m.startDataDirSizeSampling(dataDir, config.BackgroundMetricsScrapeInterval) } m.startProcessIOSampling(config.BackgroundMetricsScrapeInterval) + m.startUptimeSampling(time.Now()) } return m } -// StartBackgroundSampling starts goroutines that periodically update gauges -// (uptime, etc.). Call this when the benchmark is about to run, after initial -// state is loaded. Does not start any HTTP server; the caller configures export. -func (m *CryptosimMetrics) StartBackgroundSampling(startTime time.Time) { - if m == nil { - return - } - m.startUptimeSampling(startTime) -} - func (m *CryptosimMetrics) startUptimeSampling(startTime time.Time) { if m == nil || m.uptimeSeconds == nil { return diff --git a/sei-db/state_db/bench/cryptosim/data_generator.go b/sei-db/state_db/bench/cryptosim/data_generator.go index 5920366814..8347d6bd2d 100644 --- a/sei-db/state_db/bench/cryptosim/data_generator.go +++ b/sei-db/state_db/bench/cryptosim/data_generator.go @@ -138,7 +138,6 @@ func (d *DataGenerator) CreateNewAccount( accountID := d.nextAccountID d.nextAccountID++ - // Use EVMKeyCode for account data (balance+padding); EVMKeyNonce only accepts 8-byte values. addr := d.rand.Address(accountPrefix, accountID, AddressLen) address = evm.BuildMemIAVLEVMKey(evm.EVMKeyCode, addr) @@ -289,7 +288,8 @@ func (d *DataGenerator) FeeCollectionAddress() []byte { return d.feeCollectionAddress } -// This method should be called after a block is finalized. -func (d *DataGenerator) ReportFinalizeBlock() { +// Call this to signal that we have reached the end of a block. This is a signal that it is now safe to use +// recently created accounts as read/write targets. +func (d *DataGenerator) ReportEndOfBlock() { d.highestSafeAccountIDInBlock = d.nextAccountID - 1 } diff --git a/sei-db/state_db/bench/cryptosim/database.go b/sei-db/state_db/bench/cryptosim/database.go index 2b723fbdbc..21d9a48937 100644 --- a/sei-db/state_db/bench/cryptosim/database.go +++ b/sei-db/state_db/bench/cryptosim/database.go @@ -80,10 +80,10 @@ func (d *Database) Get(key []byte) ([]byte, bool, error) { return nil, false, nil } -// Signal that a transaction has been executed. -func (d *Database) IncrementTransactionCount() { - d.transactionCount++ - d.transactionsInCurrentBlock++ +// Signal that transactions have been added to the current block. +func (d *Database) IncrementTransactionCount(count int64) { + d.transactionCount += count + d.transactionsInCurrentBlock += count } // Reset the transaction count. Useful for when changing test phases. @@ -120,6 +120,8 @@ func (d *Database) FinalizeBlock( forceCommit bool, ) error { + d.metrics.SetMainThreadPhase("execute_block") + // Wait for all transactions in the current block to be executed. if d.flushFunc != nil { d.flushFunc() @@ -204,6 +206,17 @@ func (d *Database) Close(nextAccountID int64, nextErc20ContractID int64) error { return nil } +// Close the database and release any resources without finalizing the last batch. +func (d *Database) CloseWithoutFinalizing() error { + fmt.Printf("Closing database.\n") + err := d.db.Close() + if err != nil { + return fmt.Errorf("failed to close database: %w", err) + } + + return nil +} + // Set the function that flushes the executors. This setter is required to break a circular dependency. func (d *Database) SetFlushFunc(flushFunc func()) { d.flushFunc = flushFunc