From 1672794d6d3a7fc438ce7021398f6603837f1c3f Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Mon, 4 Aug 2025 11:15:04 +0200 Subject: [PATCH 1/7] utils + path --- generator/generator.go | 6 +++--- generator/generator_test.go | 6 +++--- generator/prewarm.go | 6 +++--- generator/scenario.go | 4 ++-- generator/scenarios/ERC20.go | 6 +++--- generator/scenarios/ERC20Conflict.go | 6 +++--- generator/scenarios/ERC20Noop.go | 6 +++--- generator/scenarios/ERC721.go | 6 +++--- generator/scenarios/EVMTransfer.go | 4 ++-- generator/scenarios/base.go | 6 +++--- generator/utils/utils.go | 4 ++-- generator/weighted.go | 2 +- go.mod | 10 +++++++--- go.sum | 4 ++++ main.go | 8 ++++---- sender/dispatcher.go | 4 ++-- sender/sender.go | 2 +- sender/sender_test.go | 8 ++++---- sender/sharded_sender.go | 6 +++--- sender/worker.go | 4 ++-- 20 files changed, 58 insertions(+), 50 deletions(-) diff --git a/generator/generator.go b/generator/generator.go index a1ae5e2..13fc04d 100644 --- a/generator/generator.go +++ b/generator/generator.go @@ -7,9 +7,9 @@ import ( "github.com/ethereum/go-ethereum/common" - "seiload/config" - "seiload/generator/scenarios" - "seiload/types" + "github.com/sei-protocol/sei-load/config" + "github.com/sei-protocol/sei-load/generator/scenarios" + "github.com/sei-protocol/sei-load/types" ) // Generator interface defines the contract for transaction generators diff --git a/generator/generator_test.go b/generator/generator_test.go index b89ba41..a6c1fda 100644 --- a/generator/generator_test.go +++ b/generator/generator_test.go @@ -5,9 +5,9 @@ import ( "github.com/stretchr/testify/require" - "seiload/config" - "seiload/generator" - "seiload/generator/scenarios" + "github.com/sei-protocol/sei-load/config" + "github.com/sei-protocol/sei-load/generator" + "github.com/sei-protocol/sei-load/generator/scenarios" ) func TestScenarioWeightsAndAccountDistribution(t *testing.T) { diff --git a/generator/prewarm.go b/generator/prewarm.go index 94a65e6..5f5d7c1 100644 --- a/generator/prewarm.go +++ b/generator/prewarm.go @@ -3,9 +3,9 @@ package generator import ( "sync" - "seiload/config" - "seiload/generator/scenarios" - "seiload/types" + "github.com/sei-protocol/sei-load/config" + "github.com/sei-protocol/sei-load/generator/scenarios" + "github.com/sei-protocol/sei-load/types" ) // PrewarmGenerator generates self-transfer transactions to prewarm account nonces diff --git a/generator/scenario.go b/generator/scenario.go index eced568..7cb3dd9 100644 --- a/generator/scenario.go +++ b/generator/scenario.go @@ -3,8 +3,8 @@ package generator import ( "sync" - "seiload/generator/scenarios" - "seiload/types" + "github.com/sei-protocol/sei-load/generator/scenarios" + "github.com/sei-protocol/sei-load/types" ) type scenarioGenerator struct { diff --git a/generator/scenarios/ERC20.go b/generator/scenarios/ERC20.go index cbe6cdc..b3ea656 100644 --- a/generator/scenarios/ERC20.go +++ b/generator/scenarios/ERC20.go @@ -6,9 +6,9 @@ import ( ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" - "seiload/config" - "seiload/generator/bindings" - "seiload/types" + "github.com/sei-protocol/sei-load/config" + "github.com/sei-protocol/sei-load/generator/bindings" + "github.com/sei-protocol/sei-load/types" ) const ERC20 = "ERC20" diff --git a/generator/scenarios/ERC20Conflict.go b/generator/scenarios/ERC20Conflict.go index c7cfb19..73cc5b9 100644 --- a/generator/scenarios/ERC20Conflict.go +++ b/generator/scenarios/ERC20Conflict.go @@ -6,9 +6,9 @@ import ( ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" - "seiload/config" - "seiload/generator/bindings" - "seiload/types" + "github.com/sei-protocol/sei-load/config" + "github.com/sei-protocol/sei-load/generator/bindings" + "github.com/sei-protocol/sei-load/types" ) const ERC20Conflict = "ERC20Conflict" diff --git a/generator/scenarios/ERC20Noop.go b/generator/scenarios/ERC20Noop.go index d584d68..1d50031 100644 --- a/generator/scenarios/ERC20Noop.go +++ b/generator/scenarios/ERC20Noop.go @@ -6,9 +6,9 @@ import ( ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" - "seiload/config" - "seiload/generator/bindings" - "seiload/types" + "github.com/sei-protocol/sei-load/config" + "github.com/sei-protocol/sei-load/generator/bindings" + "github.com/sei-protocol/sei-load/types" ) const ERC20Noop = "ERC20Noop" diff --git a/generator/scenarios/ERC721.go b/generator/scenarios/ERC721.go index 9b8317d..f84785c 100644 --- a/generator/scenarios/ERC721.go +++ b/generator/scenarios/ERC721.go @@ -9,9 +9,9 @@ import ( ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" - "seiload/config" - "seiload/generator/bindings" - "seiload/types" + "github.com/sei-protocol/sei-load/config" + "github.com/sei-protocol/sei-load/generator/bindings" + "github.com/sei-protocol/sei-load/types" ) const ERC721 = "ERC721" diff --git a/generator/scenarios/EVMTransfer.go b/generator/scenarios/EVMTransfer.go index 2649b13..441025b 100644 --- a/generator/scenarios/EVMTransfer.go +++ b/generator/scenarios/EVMTransfer.go @@ -6,8 +6,8 @@ import ( "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" - "seiload/config" - types2 "seiload/types" + "github.com/sei-protocol/sei-load/config" + types2 "github.com/sei-protocol/sei-load/types" ) const EVMTransfer = "EVMTransfer" diff --git a/generator/scenarios/base.go b/generator/scenarios/base.go index b6f2b75..7a6b742 100644 --- a/generator/scenarios/base.go +++ b/generator/scenarios/base.go @@ -8,9 +8,9 @@ import ( ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" - "seiload/config" - "seiload/generator/utils" - "seiload/types" + "github.com/sei-protocol/sei-load/config" + "github.com/sei-protocol/sei-load/generator/utils" + "github.com/sei-protocol/sei-load/types" ) // bigOne is 1 in big.Int. diff --git a/generator/utils/utils.go b/generator/utils/utils.go index 231ecc0..9d9eaab 100644 --- a/generator/utils/utils.go +++ b/generator/utils/utils.go @@ -8,8 +8,8 @@ import ( ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" - "seiload/config" - loadtypes "seiload/types" + "github.com/sei-protocol/sei-load/config" + loadtypes "github.com/sei-protocol/sei-load/types" ) type DeployFunc[T any] func( diff --git a/generator/weighted.go b/generator/weighted.go index 71bf0b1..b71be84 100644 --- a/generator/weighted.go +++ b/generator/weighted.go @@ -3,7 +3,7 @@ package generator import ( "context" "math/rand" - "seiload/types" + "github.com/sei-protocol/sei-load/types" "sync" ) diff --git a/go.mod b/go.mod index b0f2cc5..e0ee201 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,15 @@ -module seiload +module github.com/sei-protocol/sei-load -go 1.24.1 +go 1.24.5 require ( github.com/ethereum/go-ethereum v1.16.1 + github.com/google/go-cmp v0.5.5 + github.com/pkg/errors v0.9.1 github.com/spf13/cobra v1.9.1 github.com/stretchr/testify v1.10.0 + golang.org/x/sync v0.16.0 + google.golang.org/protobuf v1.34.2 ) require ( @@ -33,7 +37,7 @@ require ( github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect golang.org/x/crypto v0.40.0 // indirect - golang.org/x/sync v0.16.0 // indirect golang.org/x/sys v0.34.0 // indirect + golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 69a6096..607607b 100644 --- a/go.sum +++ b/go.sum @@ -71,6 +71,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb h1:PBC98N2aIaM3XXiurYmW7fx4GZkL8feAMVq7nEjURHk= github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= @@ -201,6 +203,8 @@ golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4= golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU= golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/main.go b/main.go index 8e778fd..0646600 100644 --- a/main.go +++ b/main.go @@ -12,10 +12,10 @@ import ( "github.com/spf13/cobra" - "seiload/config" - "seiload/generator" - "seiload/sender" - "seiload/stats" + "github.com/sei-protocol/sei-load/config" + "github.com/sei-protocol/sei-load/generator" + "github.com/sei-protocol/sei-load/sender" + "github.com/sei-protocol/sei-load/stats" ) var ( diff --git a/sender/dispatcher.go b/sender/dispatcher.go index 44905a5..44e946d 100644 --- a/sender/dispatcher.go +++ b/sender/dispatcher.go @@ -6,8 +6,8 @@ import ( "sync" "time" - "seiload/generator" - "seiload/stats" + "github.com/sei-protocol/sei-load/generator" + "github.com/sei-protocol/sei-load/stats" ) // Dispatcher continuously generates transactions and dispatches them to the sender diff --git a/sender/sender.go b/sender/sender.go index 4edc332..878b8b9 100644 --- a/sender/sender.go +++ b/sender/sender.go @@ -1,6 +1,6 @@ package sender -import "seiload/types" +import "github.com/sei-protocol/sei-load/types" type TxSender interface { Send(tx *types.LoadTx) error diff --git a/sender/sender_test.go b/sender/sender_test.go index d0c3bc7..19dc6f5 100644 --- a/sender/sender_test.go +++ b/sender/sender_test.go @@ -14,10 +14,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "seiload/config" - "seiload/generator" - "seiload/generator/scenarios" - "seiload/types" + "github.com/sei-protocol/sei-load/config" + "github.com/sei-protocol/sei-load/generator" + "github.com/sei-protocol/sei-load/generator/scenarios" + "github.com/sei-protocol/sei-load/types" ) // JSONRPCRequest represents a captured JSON-RPC request diff --git a/sender/sharded_sender.go b/sender/sharded_sender.go index a2798ce..8801c3f 100644 --- a/sender/sharded_sender.go +++ b/sender/sharded_sender.go @@ -4,9 +4,9 @@ import ( "fmt" "sync" - "seiload/config" - "seiload/stats" - "seiload/types" + "github.com/sei-protocol/sei-load/config" + "github.com/sei-protocol/sei-load/stats" + "github.com/sei-protocol/sei-load/types" ) // ShardedSender implements TxSender with multiple workers, one per endpoint diff --git a/sender/worker.go b/sender/worker.go index 5b30e65..1d23641 100644 --- a/sender/worker.go +++ b/sender/worker.go @@ -10,8 +10,8 @@ import ( "net/http" "time" - "seiload/stats" - "seiload/types" + "github.com/sei-protocol/sei-load/stats" + "github.com/sei-protocol/sei-load/types" ) // Worker handles sending transactions to a specific endpoint From ade54464810d922178ee4456d21ae4986ec1f68d Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Mon, 4 Aug 2025 11:31:20 +0200 Subject: [PATCH 2/7] main.go wip --- main.go | 307 +++++++++++++++++++++++++++++--------------------------- 1 file changed, 157 insertions(+), 150 deletions(-) diff --git a/main.go b/main.go index 0646600..e180e84 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/json" "fmt" "log" @@ -8,6 +9,7 @@ import ( "os/signal" "strings" "syscall" + "github.com/sei-protocol/sei-load/utils/service" "time" "github.com/spf13/cobra" @@ -73,183 +75,188 @@ func main() { } func runLoadTest(cmd *cobra.Command, args []string) { - // Parse the config file into a config.LoadConfig struct - cfg, err := loadConfig(configFile) - if err != nil { - log.Fatalf("Failed to load config: %v", err) - } - - fmt.Printf("šŸš€ Starting Sei Chain Load Test v2\n") - fmt.Printf("šŸ“ Config file: %s\n", configFile) - fmt.Printf("šŸŽÆ Endpoints: %d\n", len(cfg.Endpoints)) - fmt.Printf("šŸ‘„ Workers per endpoint: %d\n", workers) - fmt.Printf("šŸ”§ Total workers: %d\n", len(cfg.Endpoints)*workers) - fmt.Printf("šŸ“Š Scenarios: %d\n", len(cfg.Scenarios)) - fmt.Printf("ā±ļø Stats interval: %v\n", statsInterval) - fmt.Printf("šŸ“¦ Buffer size per worker: %d\n", bufferSize) - if tps > 0 { - fmt.Printf("šŸ“ˆ Transactions per second: %.2f\n", tps) - } - if dryRun { - fmt.Printf("šŸ“ Dry run: enabled\n") - } - if trackReceipts { - fmt.Printf("šŸ“ Track receipts: enabled\n") - } - if trackBlocks { - fmt.Printf("šŸ“ Track blocks: enabled\n") - } - if prewarm { - fmt.Printf("šŸ“ Prewarm: enabled\n") - } - fmt.Println() + ctx := context.Background() + err := service.Run(ctx, func(ctx context.Context, s service.Scope) error { + // Parse the config file into a config.LoadConfig struct + cfg, err := loadConfig(configFile) + if err != nil { + return fmt.Errorf("Failed to load config: %w", err) + } - // Enable mock deployment in dry-run mode - if dryRun { - cfg.MockDeploy = true - } + log.Printf("šŸš€ Starting Sei Chain Load Test v2") + log.Printf("šŸ“ Config file: %s", configFile) + log.Printf("šŸŽÆ Endpoints: %d", len(cfg.Endpoints)) + log.Printf("šŸ‘„ Workers per endpoint: %d", workers) + log.Printf("šŸ”§ Total workers: %d", len(cfg.Endpoints)*workers) + log.Printf("šŸ“Š Scenarios: %d", len(cfg.Scenarios)) + log.Printf("ā±ļø Stats interval: %v", statsInterval) + log.Printf("šŸ“¦ Buffer size per worker: %d", bufferSize) + if tps > 0 { + log.Printf("šŸ“ˆ Transactions per second: %.2f", tps) + } + if dryRun { + log.Printf("šŸ“ Dry run: enabled") + } + if trackReceipts { + log.Printf("šŸ“ Track receipts: enabled") + } + if trackBlocks { + log.Printf("šŸ“ Track blocks: enabled") + } + if prewarm { + log.Printf("šŸ“ Prewarm: enabled") + } + log.Println() - // Create the generator from the config struct - gen, err := generator.NewConfigBasedGenerator(cfg) - if err != nil { - log.Fatalf("Failed to create generator: %v", err) - } + // Enable mock deployment in dry-run mode + if dryRun { + cfg.MockDeploy = true + } - // Create the sender from the config struct - snd, err := sender.NewShardedSender(cfg, bufferSize, workers) - if err != nil { - log.Fatalf("Failed to create sender: %v", err) - } + // Create the generator from the config struct + gen, err := generator.NewConfigBasedGenerator(cfg) + if err != nil { + return fmt.Errorf("Failed to create generator: %w", err) + } - // Create statistics collector and logger - collector := stats.NewCollector() - logger := stats.NewLogger(collector, statsInterval, debug) - - // Create and start block collector if endpoints are available - var blockCollector *stats.BlockCollector - if len(cfg.Endpoints) > 0 && trackBlocks { - blockCollector = stats.NewBlockCollector(cfg.Endpoints[0]) - collector.SetBlockCollector(blockCollector) - // Start block collector - if err := blockCollector.Start(); err != nil { - log.Printf("āš ļø Failed to start block collector: %v", err) + // Create the sender from the config struct + snd, err := sender.NewShardedSender(cfg, bufferSize, workers) + if err != nil { + return fmt.Errorf("Failed to create sender: %w", err) } - } - // Enable dry-run mode in sender if specified - if dryRun { - snd.SetDryRun(true) - } - if debug { - snd.SetDebug(true) - } - if trackReceipts { - snd.SetTrackReceipts(true) - } - if trackBlocks { - snd.SetTrackBlocks(true) - } + // Create statistics collector and logger + collector := stats.NewCollector() + logger := stats.NewLogger(collector, statsInterval, debug) + + // Create and start block collector if endpoints are available + var blockCollector *stats.BlockCollector + if len(cfg.Endpoints) > 0 && trackBlocks { + blockCollector = stats.NewBlockCollector(cfg.Endpoints[0]) + collector.SetBlockCollector(blockCollector) + // Start block collector + if err := blockCollector.Start(); err != nil { + log.Printf("āš ļø Failed to start block collector: %v", err) + } + } - // Set statistics collector for sender and its workers - snd.SetStatsCollector(collector, logger) + // Enable dry-run mode in sender if specified + if dryRun { + snd.SetDryRun(true) + } + if debug { + snd.SetDebug(true) + } + if trackReceipts { + snd.SetTrackReceipts(true) + } + if trackBlocks { + snd.SetTrackBlocks(true) + } - // Create dispatcher - dispatcher := sender.NewDispatcher(gen, snd) - if tps > 0 { - // Convert TPS to interval: 1/tps seconds = (1/tps) * 1e9 nanoseconds - intervalNs := int64((1.0 / tps) * 1e9) - dispatcher.SetRateLimit(time.Duration(intervalNs)) - } + // Set statistics collector for sender and its workers + snd.SetStatsCollector(collector, logger) - // Set statistics collector for dispatcher - dispatcher.SetStatsCollector(collector, logger) + // Create dispatcher + dispatcher := sender.NewDispatcher(gen, snd) + if tps > 0 { + // Convert TPS to interval: 1/tps seconds = (1/tps) * 1e9 nanoseconds + intervalNs := int64((1.0 / tps) * 1e9) + dispatcher.SetRateLimit(time.Duration(intervalNs)) + } - // Set up prewarming if enabled - if prewarm { - fmt.Println("šŸ”„ Creating prewarm generator...") - prewarmGen := generator.NewPrewarmGenerator(cfg, gen) - dispatcher.SetPrewarmGenerator(prewarmGen) - fmt.Println("āœ… Prewarm generator ready") - fmt.Printf("šŸ“ Prewarm mode: Accounts will be prewarmed\n") - } + // Set statistics collector for dispatcher + dispatcher.SetStatsCollector(collector, logger) - // Start the sender (starts all workers) - snd.Start() - fmt.Printf("āœ… Connected to %d endpoints\n", snd.GetNumShards()) + // Set up prewarming if enabled + if prewarm { + fmt.Println("šŸ”„ Creating prewarm generator...") + prewarmGen := generator.NewPrewarmGenerator(cfg, gen) + dispatcher.SetPrewarmGenerator(prewarmGen) + log.Println("āœ… Prewarm generator ready") + log.Printf("šŸ“ Prewarm mode: Accounts will be prewarmed") + } - // Start block collector if enabled - if trackBlocks { - blockCollector = stats.NewBlockCollector(cfg.Endpoints[0]) - collector.SetBlockCollector(blockCollector) - err = blockCollector.Start() - if err != nil { - log.Fatalf("Failed to start block collector: %v", err) + // Start the sender (starts all workers) + snd.Start() + log.Printf("āœ… Connected to %d endpoints", snd.GetNumShards()) + + // Start block collector if enabled + if trackBlocks { + blockCollector = stats.NewBlockCollector(cfg.Endpoints[0]) + collector.SetBlockCollector(blockCollector) + if err := blockCollector.Start(); err != nil { + return fmt.Errorf("Failed to start block collector: %w", err) + } + log.Println("āœ… Started block collector") } - fmt.Println("āœ… Started block collector") - } - // Perform prewarming if enabled (before starting logger to avoid logging prewarm transactions) - if prewarm { - err = dispatcher.Prewarm() - if err != nil { - log.Fatalf("Failed to prewarm accounts: %v", err) + // Perform prewarming if enabled (before starting logger to avoid logging prewarm transactions) + if prewarm { + if err := dispatcher.Prewarm(); err != nil { + return fmt.Errorf("Failed to prewarm accounts: %w", err) + } } - } - // Start logger (after prewarming to capture only main load test metrics) - logger.Start() - fmt.Println("āœ… Started statistics logger") + // Start logger (after prewarming to capture only main load test metrics) + logger.Start() + log.Println("āœ… Started statistics logger") - // Start dispatcher for main load test - dispatcher.Start() - fmt.Println("āœ… Started dispatcher") + // Start dispatcher for main load test + dispatcher.Start() + log.Println("āœ… Started dispatcher") - // Set up signal handling for graceful shutdown - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + // Set up signal handling for graceful shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - fmt.Printf("šŸ“ˆ Logging statistics every %v (Press Ctrl+C to stop)\n", statsInterval) - if dryRun { - fmt.Printf("šŸ“ Dry-run mode: Simulating requests without sending\n") - } - if debug { - fmt.Printf("šŸ› Debug mode: Each transaction will be logged\n") - } - if trackReceipts { - fmt.Printf("šŸ“ Track receipts mode: Receipts will be tracked\n") - } - if trackBlocks { - fmt.Printf("šŸ“ Track blocks mode: Block data will be collected\n") - } - fmt.Println(strings.Repeat("=", 60)) + log.Printf("šŸ“ˆ Logging statistics every %v (Press Ctrl+C to stop)", statsInterval) + if dryRun { + log.Printf("šŸ“ Dry-run mode: Simulating requests without sending") + } + if debug { + log.Printf("šŸ› Debug mode: Each transaction will be logged") + } + if trackReceipts { + log.Printf("šŸ“ Track receipts mode: Receipts will be tracked") + } + if trackBlocks { + log.Printf("šŸ“ Track blocks mode: Block data will be collected") + } + fmt.Println(strings.Repeat("=", 60)) - // Main loop - wait for shutdown signal - <-sigChan + // Main loop - wait for shutdown signal + <-sigChan - fmt.Println("\nšŸ›‘ Received shutdown signal, stopping gracefully...") + log.Println("šŸ›‘ Received shutdown signal, stopping gracefully...") - // Stop block collector first - if blockCollector != nil { - blockCollector.Stop() - fmt.Println("āœ… Stopped block collector") - } + // Stop block collector first + if blockCollector != nil { + blockCollector.Stop() + log.Println("āœ… Stopped block collector") + } - // Stop statistics logger first - logger.Stop() - fmt.Println("āœ… Stopped statistics logger") + // Stop statistics logger first + logger.Stop() + log.Println("āœ… Stopped statistics logger") - // Stop dispatcher - dispatcher.Stop() - fmt.Println("āœ… Stopped dispatcher") + // Stop dispatcher + dispatcher.Stop() + log.Println("āœ… Stopped dispatcher") - // Stop sender and all workers - snd.Stop() - fmt.Println("āœ… Stopped sender and workers") + // Stop sender and all workers + snd.Stop() + log.Println("āœ… Stopped sender and workers") - // Print final statistics - logger.LogFinalStats() + // Print final statistics + logger.LogFinalStats() - fmt.Println("šŸ‘‹ Shutdown complete") + log.Println("šŸ‘‹ Shutdown complete") + return nil + }) + if err != nil { + log.Fatal(err) + } } // loadConfig reads and parses the configuration file From 19b5e98eb4601a17a770b892747f3d9fa8136af9 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Mon, 4 Aug 2025 11:38:27 +0200 Subject: [PATCH 3/7] main wip --- main.go | 143 ++++++++++++++++++++++++-------------------------------- 1 file changed, 61 insertions(+), 82 deletions(-) diff --git a/main.go b/main.go index e180e84..247b3c1 100644 --- a/main.go +++ b/main.go @@ -44,7 +44,11 @@ to multiple endpoints with account pooling management. Use --dry-run to test configuration and view transaction details without actually sending requests or deploying contracts.`, - Run: runLoadTest, + Run: func(cmd *cobra.Command, args []string) { + if err:=runLoadTest(context.Background(),cmd,args); err != nil { + log.Fatal(err) + } + }, } func init() { @@ -74,45 +78,48 @@ func main() { } } -func runLoadTest(cmd *cobra.Command, args []string) { - ctx := context.Background() - err := service.Run(ctx, func(ctx context.Context, s service.Scope) error { - // Parse the config file into a config.LoadConfig struct - cfg, err := loadConfig(configFile) - if err != nil { - return fmt.Errorf("Failed to load config: %w", err) - } +func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { + // Parse the config file into a config.LoadConfig struct + cfg, err := loadConfig(configFile) + if err != nil { + return fmt.Errorf("Failed to load config: %w", err) + } - log.Printf("šŸš€ Starting Sei Chain Load Test v2") - log.Printf("šŸ“ Config file: %s", configFile) - log.Printf("šŸŽÆ Endpoints: %d", len(cfg.Endpoints)) - log.Printf("šŸ‘„ Workers per endpoint: %d", workers) - log.Printf("šŸ”§ Total workers: %d", len(cfg.Endpoints)*workers) - log.Printf("šŸ“Š Scenarios: %d", len(cfg.Scenarios)) - log.Printf("ā±ļø Stats interval: %v", statsInterval) - log.Printf("šŸ“¦ Buffer size per worker: %d", bufferSize) - if tps > 0 { - log.Printf("šŸ“ˆ Transactions per second: %.2f", tps) - } - if dryRun { - log.Printf("šŸ“ Dry run: enabled") - } - if trackReceipts { - log.Printf("šŸ“ Track receipts: enabled") - } - if trackBlocks { - log.Printf("šŸ“ Track blocks: enabled") - } - if prewarm { - log.Printf("šŸ“ Prewarm: enabled") - } - log.Println() + log.Printf("šŸš€ Starting Sei Chain Load Test v2") + log.Printf("šŸ“ Config file: %s", configFile) + log.Printf("šŸŽÆ Endpoints: %d", len(cfg.Endpoints)) + log.Printf("šŸ‘„ Workers per endpoint: %d", workers) + log.Printf("šŸ”§ Total workers: %d", len(cfg.Endpoints)*workers) + log.Printf("šŸ“Š Scenarios: %d", len(cfg.Scenarios)) + log.Printf("ā±ļø Stats interval: %v", statsInterval) + log.Printf("šŸ“¦ Buffer size per worker: %d", bufferSize) + if tps > 0 { + log.Printf("šŸ“ˆ Transactions per second: %.2f", tps) + } + if dryRun { + log.Printf("šŸ“ Dry run: enabled") + } + if trackReceipts { + log.Printf("šŸ“ Track receipts: enabled") + } + if trackBlocks { + log.Printf("šŸ“ Track blocks: enabled") + } + if prewarm { + log.Printf("šŸ“ Prewarm: enabled") + } + log.Println() - // Enable mock deployment in dry-run mode - if dryRun { - cfg.MockDeploy = true - } + // Enable mock deployment in dry-run mode + if dryRun { + cfg.MockDeploy = true + } + // Create statistics collector and logger + collector := stats.NewCollector() + logger := stats.NewLogger(collector, statsInterval, debug) + + err = service.Run(ctx, func(ctx context.Context, s service.Scope) error { // Create the generator from the config struct gen, err := generator.NewConfigBasedGenerator(cfg) if err != nil { @@ -125,19 +132,14 @@ func runLoadTest(cmd *cobra.Command, args []string) { return fmt.Errorf("Failed to create sender: %w", err) } - // Create statistics collector and logger - collector := stats.NewCollector() - logger := stats.NewLogger(collector, statsInterval, debug) - // Create and start block collector if endpoints are available var blockCollector *stats.BlockCollector if len(cfg.Endpoints) > 0 && trackBlocks { blockCollector = stats.NewBlockCollector(cfg.Endpoints[0]) collector.SetBlockCollector(blockCollector) - // Start block collector - if err := blockCollector.Start(); err != nil { - log.Printf("āš ļø Failed to start block collector: %v", err) - } + s.SpawnBgNamed("block collector", func() error { + return blockCollector.Run(ctx) + }) } // Enable dry-run mode in sender if specified @@ -170,7 +172,7 @@ func runLoadTest(cmd *cobra.Command, args []string) { // Set up prewarming if enabled if prewarm { - fmt.Println("šŸ”„ Creating prewarm generator...") + log.Println("šŸ”„ Creating prewarm generator...") prewarmGen := generator.NewPrewarmGenerator(cfg, gen) dispatcher.SetPrewarmGenerator(prewarmGen) log.Println("āœ… Prewarm generator ready") @@ -178,32 +180,30 @@ func runLoadTest(cmd *cobra.Command, args []string) { } // Start the sender (starts all workers) - snd.Start() + s.SpawnBgNamed("sender", func() error { return snd.Run(ctx) }) log.Printf("āœ… Connected to %d endpoints", snd.GetNumShards()) // Start block collector if enabled if trackBlocks { blockCollector = stats.NewBlockCollector(cfg.Endpoints[0]) collector.SetBlockCollector(blockCollector) - if err := blockCollector.Start(); err != nil { - return fmt.Errorf("Failed to start block collector: %w", err) - } + s.SpawnBgNamed("block collector", func() error { return blockCollector.Run(ctx) }) log.Println("āœ… Started block collector") } // Perform prewarming if enabled (before starting logger to avoid logging prewarm transactions) if prewarm { - if err := dispatcher.Prewarm(); err != nil { + if err := dispatcher.Prewarm(ctx); err != nil { return fmt.Errorf("Failed to prewarm accounts: %w", err) } } // Start logger (after prewarming to capture only main load test metrics) - logger.Start() + s.SpawnBgNamed("logger", func() error { return logger.Run(ctx) }) log.Println("āœ… Started statistics logger") // Start dispatcher for main load test - dispatcher.Start() + s.SpawnBgNamed("dispatcher", func() error { return dispatcher.Run(ctx) }) log.Println("āœ… Started dispatcher") // Set up signal handling for graceful shutdown @@ -223,40 +223,19 @@ func runLoadTest(cmd *cobra.Command, args []string) { if trackBlocks { log.Printf("šŸ“ Track blocks mode: Block data will be collected") } - fmt.Println(strings.Repeat("=", 60)) + log.Println(strings.Repeat("=", 60)) // Main loop - wait for shutdown signal - <-sigChan - - log.Println("šŸ›‘ Received shutdown signal, stopping gracefully...") - - // Stop block collector first - if blockCollector != nil { - blockCollector.Stop() - log.Println("āœ… Stopped block collector") + if _,err:=utils.Recv(ctx, sigChan); err!=nil { + return err } - - // Stop statistics logger first - logger.Stop() - log.Println("āœ… Stopped statistics logger") - - // Stop dispatcher - dispatcher.Stop() - log.Println("āœ… Stopped dispatcher") - - // Stop sender and all workers - snd.Stop() - log.Println("āœ… Stopped sender and workers") - - // Print final statistics - logger.LogFinalStats() - - log.Println("šŸ‘‹ Shutdown complete") + log.Print("\nšŸ›‘ Received shutdown signal, stopping gracefully...") return nil }) - if err != nil { - log.Fatal(err) - } + // Print final statistics + logger.LogFinalStats() + log.Printf("šŸ‘‹ Shutdown complete") + return err } // loadConfig reads and parses the configuration file From 5b2151baa9d6117218900c965a6e0535d4508279 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Mon, 4 Aug 2025 12:19:34 +0200 Subject: [PATCH 4/7] refactor done --- Makefile | 6 +- generator/weighted.go | 4 +- go.mod | 1 + main.go | 31 ++-- sender/dispatcher.go | 189 +++++++-------------- sender/sender.go | 7 +- sender/sender_test.go | 6 - sender/sharded_sender.go | 40 ++--- sender/worker.go | 225 ++++++++++--------------- stats/block_collector.go | 345 ++++++++++++++++----------------------- stats/collector.go | 30 ++-- stats/logger.go | 80 ++++----- types/account.go | 2 +- 13 files changed, 365 insertions(+), 601 deletions(-) diff --git a/Makefile b/Makefile index ba6893e..799bae7 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,7 @@ ifeq ($(GOPATH),) endif # Tools -SOLC := solc +SOLC := /tmp/solc ABIGEN := abigen NVM_DIR := $(HOME)/.nvm NODE_VERSION := 20 @@ -63,8 +63,8 @@ setup-node: nvm install $(NODE_VERSION) && \ nvm use $(NODE_VERSION) @echo "šŸ“¦ Installing native solc binary..." - @curl -L https://github.com/ethereum/solidity/releases/download/v0.8.19/solc-static-linux -o /usr/local/bin/solc && \ - chmod +x /usr/local/bin/solc + @curl -L https://github.com/ethereum/solidity/releases/download/v0.8.19/solc-static-linux -o /tmp/solc && \ + chmod +x /tmp/solc @echo "āœ… Node.js environment setup complete" @echo "ā„¹ļø Note: You may need to restart your shell or run 'source ~/.bashrc' to use nvm in new sessions" diff --git a/generator/weighted.go b/generator/weighted.go index b71be84..60759db 100644 --- a/generator/weighted.go +++ b/generator/weighted.go @@ -69,7 +69,7 @@ func (w *weightedGenerator) nextGenerator() Generator { // GenerateN generates n transactions. func (w *weightedGenerator) GenerateN(n int) []*types.LoadTx { txs := make([]*types.LoadTx, 0, n) - for i := 0; i < n; i++ { + for range n { if tx, ok := w.Generate(); ok { txs = append(txs, tx) } else { @@ -108,7 +108,7 @@ func NewWeightedGenerator(cfgs ...*WeightedCfg) Generator { } var weighted []Generator for _, cfg := range cfgs { - for i := 0; i < cfg.Weight; i++ { + for range cfg.Weight { weighted = append(weighted, cfg.Generator) } } diff --git a/go.mod b/go.mod index e0ee201..372e11f 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/spf13/cobra v1.9.1 github.com/stretchr/testify v1.10.0 golang.org/x/sync v0.16.0 + golang.org/x/time v0.9.0 google.golang.org/protobuf v1.34.2 ) diff --git a/main.go b/main.go index 247b3c1..5442ddb 100644 --- a/main.go +++ b/main.go @@ -4,12 +4,12 @@ import ( "context" "encoding/json" "fmt" + "github.com/sei-protocol/sei-load/utils/service" "log" "os" "os/signal" "strings" "syscall" - "github.com/sei-protocol/sei-load/utils/service" "time" "github.com/spf13/cobra" @@ -18,6 +18,7 @@ import ( "github.com/sei-protocol/sei-load/generator" "github.com/sei-protocol/sei-load/sender" "github.com/sei-protocol/sei-load/stats" + "github.com/sei-protocol/sei-load/utils" ) var ( @@ -45,7 +46,7 @@ to multiple endpoints with account pooling management. Use --dry-run to test configuration and view transaction details without actually sending requests or deploying contracts.`, Run: func(cmd *cobra.Command, args []string) { - if err:=runLoadTest(context.Background(),cmd,args); err != nil { + if err := runLoadTest(context.Background(), cmd, args); err != nil { log.Fatal(err) } }, @@ -135,10 +136,10 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { // Create and start block collector if endpoints are available var blockCollector *stats.BlockCollector if len(cfg.Endpoints) > 0 && trackBlocks { - blockCollector = stats.NewBlockCollector(cfg.Endpoints[0]) + blockCollector = stats.NewBlockCollector() collector.SetBlockCollector(blockCollector) s.SpawnBgNamed("block collector", func() error { - return blockCollector.Run(ctx) + return blockCollector.Run(ctx, cfg.Endpoints[0]) }) } @@ -168,14 +169,14 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { } // Set statistics collector for dispatcher - dispatcher.SetStatsCollector(collector, logger) + dispatcher.SetStatsCollector(collector) // Set up prewarming if enabled if prewarm { - log.Println("šŸ”„ Creating prewarm generator...") + log.Printf("šŸ”„ Creating prewarm generator...") prewarmGen := generator.NewPrewarmGenerator(cfg, gen) dispatcher.SetPrewarmGenerator(prewarmGen) - log.Println("āœ… Prewarm generator ready") + log.Printf("āœ… Prewarm generator ready") log.Printf("šŸ“ Prewarm mode: Accounts will be prewarmed") } @@ -183,14 +184,6 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { s.SpawnBgNamed("sender", func() error { return snd.Run(ctx) }) log.Printf("āœ… Connected to %d endpoints", snd.GetNumShards()) - // Start block collector if enabled - if trackBlocks { - blockCollector = stats.NewBlockCollector(cfg.Endpoints[0]) - collector.SetBlockCollector(blockCollector) - s.SpawnBgNamed("block collector", func() error { return blockCollector.Run(ctx) }) - log.Println("āœ… Started block collector") - } - // Perform prewarming if enabled (before starting logger to avoid logging prewarm transactions) if prewarm { if err := dispatcher.Prewarm(ctx); err != nil { @@ -200,11 +193,11 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { // Start logger (after prewarming to capture only main load test metrics) s.SpawnBgNamed("logger", func() error { return logger.Run(ctx) }) - log.Println("āœ… Started statistics logger") + log.Printf("āœ… Started statistics logger") // Start dispatcher for main load test s.SpawnBgNamed("dispatcher", func() error { return dispatcher.Run(ctx) }) - log.Println("āœ… Started dispatcher") + log.Printf("āœ… Started dispatcher") // Set up signal handling for graceful shutdown sigChan := make(chan os.Signal, 1) @@ -223,10 +216,10 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { if trackBlocks { log.Printf("šŸ“ Track blocks mode: Block data will be collected") } - log.Println(strings.Repeat("=", 60)) + log.Print(strings.Repeat("=", 60)) // Main loop - wait for shutdown signal - if _,err:=utils.Recv(ctx, sigChan); err!=nil { + if _, err := utils.Recv(ctx, sigChan); err != nil { return err } log.Print("\nšŸ›‘ Received shutdown signal, stopping gracefully...") diff --git a/sender/dispatcher.go b/sender/dispatcher.go index 44e946d..19f1db3 100644 --- a/sender/dispatcher.go +++ b/sender/dispatcher.go @@ -5,7 +5,10 @@ import ( "fmt" "sync" "time" + "log" + "golang.org/x/time/rate" + "github.com/sei-protocol/sei-load/utils" "github.com/sei-protocol/sei-load/generator" "github.com/sei-protocol/sei-load/stats" ) @@ -13,32 +16,24 @@ import ( // Dispatcher continuously generates transactions and dispatches them to the sender type Dispatcher struct { generator generator.Generator - prewarmGen generator.Generator // Optional prewarm generator + prewarmGen utils.Option[generator.Generator] // Optional prewarm generator sender TxSender - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup // Configuration - rateLimit time.Duration // Minimum time between transactions + limiter *rate.Limiter // Statistics totalSent uint64 mu sync.RWMutex collector *stats.Collector - logger *stats.Logger } // NewDispatcher creates a new dispatcher func NewDispatcher(gen generator.Generator, sender TxSender) *Dispatcher { - ctx, cancel := context.WithCancel(context.Background()) - return &Dispatcher{ generator: gen, sender: sender, - ctx: ctx, - cancel: cancel, - rateLimit: 0, // No rate limiting by default + limiter: rate.NewLimiter(rate.Inf, 1), // No rate limiting by default } } @@ -46,49 +41,46 @@ func NewDispatcher(gen generator.Generator, sender TxSender) *Dispatcher { func (d *Dispatcher) SetRateLimit(duration time.Duration) { d.mu.Lock() defer d.mu.Unlock() - d.rateLimit = duration + d.limiter = rate.NewLimiter(rate.Every(duration),1) } // SetStatsCollector sets the statistics collector for this dispatcher -func (d *Dispatcher) SetStatsCollector(collector *stats.Collector, logger *stats.Logger) { +func (d *Dispatcher) SetStatsCollector(collector *stats.Collector) { d.mu.Lock() defer d.mu.Unlock() d.collector = collector - d.logger = logger } // SetPrewarmGenerator sets the prewarm generator for this dispatcher func (d *Dispatcher) SetPrewarmGenerator(prewarmGen generator.Generator) { d.mu.Lock() defer d.mu.Unlock() - d.prewarmGen = prewarmGen + d.prewarmGen = utils.Some(prewarmGen) } // Prewarm runs the prewarm generator to completion before starting the main load test -func (d *Dispatcher) Prewarm() error { +func (d *Dispatcher) Prewarm(ctx context.Context) error { d.mu.RLock() prewarmGen := d.prewarmGen d.mu.RUnlock() - if prewarmGen == nil { - return nil // No prewarming configured - } + gen, ok := prewarmGen.Get() + if !ok { return nil } // No prewarming configured - fmt.Println("šŸ”„ Starting account prewarming...") + log.Print("šŸ”„ Starting account prewarming...") processedAccounts := 0 logInterval := 100 // Run prewarm generator until completion for { - tx, ok := prewarmGen.Generate() + tx, ok := gen.Generate() if !ok { break // Prewarming is complete } // Send the prewarming transaction - err := d.sender.Send(tx) - if err != nil { - fmt.Printf("šŸ”„ Failed to send prewarm transaction for account %s: %v\n", tx.Scenario.Sender.Address.Hex(), err) + if err := d.sender.Send(ctx, tx); err != nil { + log.Printf("šŸ”„ Failed to send prewarm transaction for account %s: %v", tx.Scenario.Sender.Address.Hex(), err) continue } @@ -96,124 +88,68 @@ func (d *Dispatcher) Prewarm() error { // Log progress periodically if processedAccounts%logInterval == 0 { - fmt.Printf("šŸ”„ Prewarming progress: %d accounts processed...\n", processedAccounts) + log.Printf("šŸ”„ Prewarming progress: %d accounts processed...", processedAccounts) } } - fmt.Printf("šŸ”„ Prewarming complete! Processed %d accounts\n", processedAccounts) + log.Printf("šŸ”„ Prewarming complete! Processed %d accounts", processedAccounts) return nil } // Start begins the dispatcher's transaction generation and sending loop -func (d *Dispatcher) Start() { - d.wg.Add(1) - go d.dispatchLoop() -} - -// Stop gracefully shuts down the dispatcher -func (d *Dispatcher) Stop() { - d.cancel() - d.wg.Wait() -} - -// dispatchLoop is the main loop that generates and dispatches transactions -func (d *Dispatcher) dispatchLoop() { - defer d.wg.Done() - - var lastSent time.Time +func (d *Dispatcher) Run(ctx context.Context) error { + d.mu.RLock() + limiter := d.limiter + d.mu.RUnlock() for { - select { - case <-d.ctx.Done(): - return - default: - // Check rate limiting - d.mu.RLock() - rateLimit := d.rateLimit - d.mu.RUnlock() - - if rateLimit > 0 { - elapsed := time.Since(lastSent) - if elapsed < rateLimit { - time.Sleep(rateLimit - elapsed) - } - } - - // Generate a transaction from main generator - tx, ok := d.generator.Generate() - if !ok || tx == nil { - fmt.Println("Dispatcher: Generator returned no more transactions") - continue - } - - // Send the transaction - err := d.sender.Send(tx) - if err != nil { - fmt.Printf("Dispatcher: Failed to send transaction: %v\n", err) - // Continue despite errors - could add retry logic here - } else { - d.mu.Lock() - d.totalSent++ - d.mu.Unlock() - } - - lastSent = time.Now() + if err:=limiter.Wait(ctx); err!=nil { + return err + } + // Generate a transaction from main generator + tx, ok := d.generator.Generate() + if !ok { + log.Print("Dispatcher: Generator returned no more transactions") + return nil } + + // Send the transaction + if err := d.sender.Send(ctx, tx); err != nil { + return err + } + d.mu.Lock() + d.totalSent++ + d.mu.Unlock() } } // StartBatch generates and sends a specific number of transactions then stops -func (d *Dispatcher) StartBatch(count int) error { +func (d *Dispatcher) RunBatch(ctx context.Context, count int) error { if count <= 0 { return fmt.Errorf("count must be positive") } - - d.wg.Add(1) - go func() { - defer d.wg.Done() - - var lastSent time.Time - - for i := 0; i < count; i++ { - select { - case <-d.ctx.Done(): - return - default: - // Check rate limiting - d.mu.RLock() - rateLimit := d.rateLimit - d.mu.RUnlock() - - if rateLimit > 0 && i > 0 { - elapsed := time.Since(lastSent) - if elapsed < rateLimit { - time.Sleep(rateLimit - elapsed) - } - } - - // Generate a transaction - tx, ok := d.generator.Generate() - if !ok { - fmt.Printf("Dispatcher: Generator returned nil transaction (batch %d/%d)\n", i+1, count) - continue - } - - // Send the transaction - err := d.sender.Send(tx) - if err != nil { - fmt.Printf("Dispatcher: Failed to send transaction %d/%d: %v\n", i+1, count, err) - // Continue despite errors - } else { - d.mu.Lock() - d.totalSent++ - d.mu.Unlock() - } - - lastSent = time.Now() - } + d.mu.RLock() + limiter := d.limiter + d.mu.RUnlock() + for i := range count { + if err:=limiter.Wait(ctx); err!=nil { + return err } - }() - + // Generate a transaction + tx, ok := d.generator.Generate() + if !ok { + return fmt.Errorf("Dispatcher: Generator returned nil transaction (batch %d/%d)\n", i+1, count) + } + // Send the transaction + if err := d.sender.Send(ctx, tx); err != nil { + log.Printf("Dispatcher: Failed to send transaction %d/%d: %v", i+1, count, err) + // Continue despite errors + } else { + d.mu.Lock() + d.totalSent++ + d.mu.Unlock() + } + } return nil } @@ -231,8 +167,3 @@ func (d *Dispatcher) GetStats() DispatcherStats { type DispatcherStats struct { TotalSent uint64 } - -// Wait waits for the dispatcher to finish (useful for batch mode) -func (d *Dispatcher) Wait() { - d.wg.Wait() -} diff --git a/sender/sender.go b/sender/sender.go index 878b8b9..a2b92b5 100644 --- a/sender/sender.go +++ b/sender/sender.go @@ -1,7 +1,10 @@ package sender -import "github.com/sei-protocol/sei-load/types" +import ( + "context" + "github.com/sei-protocol/sei-load/types" +) type TxSender interface { - Send(tx *types.LoadTx) error + Send(ctx context.Context, tx *types.LoadTx) error } diff --git a/sender/sender_test.go b/sender/sender_test.go index 19dc6f5..59ac91e 100644 --- a/sender/sender_test.go +++ b/sender/sender_test.go @@ -98,12 +98,6 @@ func (ms *MockServer) Close() { ms.server.Close() } -// TestShardedSenderWithMockServers tests the sharded sender with mock HTTP servers -func TestShardedSenderWithMockServers(t *testing.T) { - // Skip this test to avoid hanging - it requires actual HTTP servers - t.Skip("Skipping integration test that requires HTTP servers - use unit tests instead") -} - // TestShardDistributionVerification tests that specific transactions go to expected shards func TestShardDistributionVerification(t *testing.T) { // Test shard distribution logic without network operations or scenario deployment diff --git a/sender/sharded_sender.go b/sender/sharded_sender.go index 8801c3f..d98bbc5 100644 --- a/sender/sharded_sender.go +++ b/sender/sharded_sender.go @@ -3,10 +3,12 @@ package sender import ( "fmt" "sync" + "context" "github.com/sei-protocol/sei-load/config" "github.com/sei-protocol/sei-load/stats" "github.com/sei-protocol/sei-load/types" + "github.com/sei-protocol/sei-load/utils/service" ) // ShardedSender implements TxSender with multiple workers, one per endpoint @@ -40,45 +42,29 @@ func NewShardedSender(cfg *config.LoadConfig, bufferSize int, workers int) (*Sha } // Start initializes and starts all workers -func (s *ShardedSender) Start() { +func (s *ShardedSender) Run(ctx context.Context) error { s.mu.Lock() - defer s.mu.Unlock() - - for _, worker := range s.workers { - worker.Start() - } -} - -// Stop gracefully shuts down all workers -func (s *ShardedSender) Stop() { - s.mu.Lock() - defer s.mu.Unlock() - - for _, worker := range s.workers { - worker.Stop() - } + workers := s.workers + s.mu.Unlock() + return service.Run(ctx, func(ctx context.Context, s service.Scope) error { + for _, worker := range workers { + s.Spawn(func() error { return worker.Run(ctx) }) + } + return nil + }) } // Send implements TxSender interface - calculates shard ID and routes to appropriate worker -func (s *ShardedSender) Send(tx *types.LoadTx) error { - if tx == nil { - return fmt.Errorf("transaction is nil") - } - +func (s *ShardedSender) Send(ctx context.Context, tx *types.LoadTx) error { // Calculate shard ID based on the transaction shardID := tx.ShardID(s.numShards) - // Validate shard ID - if shardID < 0 || shardID >= s.numShards { - return fmt.Errorf("invalid shard ID %d for %d shards", shardID, s.numShards) - } - // Send to the appropriate worker s.mu.RLock() worker := s.workers[shardID] s.mu.RUnlock() - return worker.Send(tx) + return worker.Send(ctx, tx) } // GetWorkerStats returns statistics for all workers diff --git a/sender/worker.go b/sender/worker.go index 1d23641..8492cac 100644 --- a/sender/worker.go +++ b/sender/worker.go @@ -7,61 +7,57 @@ import ( "github.com/ethereum/go-ethereum/ethclient" "io" "net" + "log" "net/http" "time" "github.com/sei-protocol/sei-load/stats" "github.com/sei-protocol/sei-load/types" + "github.com/sei-protocol/sei-load/utils/service" + "github.com/sei-protocol/sei-load/utils" ) // Worker handles sending transactions to a specific endpoint type Worker struct { id int endpoint string - client *http.Client txChan chan *types.LoadTx sentTxs chan *types.LoadTx - ctx context.Context - cancel context.CancelFunc dryRun bool debug bool collector *stats.Collector logger *stats.Logger workers int - noReceipts bool + trackReceipts bool } -// NewWorker creates a new worker for a specific endpoint -func NewWorker(id int, endpoint string, bufferSize int, workers int) *Worker { - ctx, cancel := context.WithCancel(context.Background()) - - // Configure HTTP transport with proper connection pooling - transport := &http.Transport{ - DialContext: (&net.Dialer{ - Timeout: 10 * time.Second, - KeepAlive: 30 * time.Second, - }).DialContext, - MaxIdleConns: 100, - MaxIdleConnsPerHost: 10, - IdleConnTimeout: 90 * time.Second, - TLSHandshakeTimeout: 10 * time.Second, - ExpectContinueTimeout: 1 * time.Second, - DisableKeepAlives: false, +func newHttpClient() *http.Client { + return &http.Client{ + Timeout: 30 * time.Second, + Transport: &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: 10 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + MaxIdleConns: 100, + MaxIdleConnsPerHost: 10, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + DisableKeepAlives: false, + }, } +} +// NewWorker creates a new worker for a specific endpoint +func NewWorker(id int, endpoint string, bufferSize int, workers int) *Worker { return &Worker{ - id: id, - endpoint: endpoint, - client: &http.Client{ - Timeout: 30 * time.Second, - Transport: transport, - }, + id: id, + endpoint: endpoint, txChan: make(chan *types.LoadTx, bufferSize), sentTxs: make(chan *types.LoadTx, bufferSize), - ctx: ctx, - cancel: cancel, workers: workers, - noReceipts: false, + trackReceipts: true, } } @@ -72,36 +68,20 @@ func (w *Worker) SetStatsCollector(collector *stats.Collector, logger *stats.Log } // Start begins the worker's processing loop -func (w *Worker) Start() { - // Start multiple worker goroutines that share the same channel - for i := 0; i < w.workers; i++ { - go w.processTransactions() - } - // Only start receipt checking if not disabled - if !w.noReceipts { - go w.watchTransactions() - } -} - -// Stop gracefully shuts down the worker -func (w *Worker) Stop() { - w.cancel() - - // Close HTTP transport to release connections - if transport, ok := w.client.Transport.(*http.Transport); ok { - transport.CloseIdleConnections() - } - close(w.txChan) +func (w *Worker) Run(ctx context.Context) error { + return service.Run(ctx, func(ctx context.Context, s service.Scope) error { + // Start multiple worker goroutines that share the same channel + client := newHttpClient() + for range w.workers { + s.Spawn(func() error { return w.processTransactions(ctx,client) }) + } + return w.watchTransactions(ctx) + }) } // Send queues a transaction for this worker to process -func (w *Worker) Send(tx *types.LoadTx) error { - select { - case w.txChan <- tx: - return nil - case <-w.ctx.Done(): - return fmt.Errorf("worker %d is shutting down", w.id) - } +func (w *Worker) Send(ctx context.Context, tx *types.LoadTx) error { + return utils.Send(ctx, w.txChan, tx) } // SetDebug sets the dry-run mode for the worker @@ -116,110 +96,82 @@ func (w *Worker) SetDryRun(dryRun bool) { // SetTrackReceipts sets the track-receipts mode for the worker func (w *Worker) SetTrackReceipts(trackReceipts bool) { - w.noReceipts = !trackReceipts + w.trackReceipts = trackReceipts } -func (w *Worker) watchTransactions() { - if w.dryRun { - return +func (w *Worker) watchTransactions(ctx context.Context) error { + if w.dryRun || !w.trackReceipts { + return nil } eth, err := ethclient.Dial(w.endpoint) if err != nil { - panic(err) + return fmt.Errorf("ethclient.Dial(%q): %w", w.endpoint, err) } - for { - select { - case tx, ok := <-w.sentTxs: - if !ok { - return // Channel closed, worker should exit - } - w.waitForReceipt(eth, tx) - - case <-w.ctx.Done(): - return // Context cancelled, worker should exit + tx, err := utils.Recv(ctx, w.sentTxs) + if err!=nil { return err } + ctx,cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + if err := w.waitForReceipt(ctx, eth, tx); err != nil { + log.Printf("āŒ %v",err) } } } -func (w *Worker) waitForReceipt(eth *ethclient.Client, tx *types.LoadTx) { - timeout := time.After(10 * time.Second) +func (w *Worker) waitForReceipt(ctx context.Context, eth *ethclient.Client, tx *types.LoadTx) error { ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - for { - select { - case <-timeout: - fmt.Printf("āŒ timeout waiting for receipt for tx %s\n", tx.EthTx.Hash().Hex()) - return - - case <-ticker.C: - receipt, err := eth.TransactionReceipt(context.Background(), tx.EthTx.Hash()) - if err != nil { - if err.Error() == "not found" { - continue - } - fmt.Printf("āŒ error getting receipt for tx %s: %v\n", tx.EthTx.Hash().Hex(), err) + if _,err := utils.Recv(ctx, ticker.C); err != nil { + return fmt.Errorf("timeout waiting for receipt for tx %s", tx.EthTx.Hash().Hex()) + } + receipt, err := eth.TransactionReceipt(context.Background(), tx.EthTx.Hash()) + if err != nil { + if err.Error() == "not found" { continue } - - // Receipt found - log status and return - if receipt.Status != 1 { - fmt.Printf("āŒ tx %s failed\n", tx.EthTx.Hash().Hex()) - } else if w.debug { - fmt.Printf("āœ… tx %s, %s, gas=%d succeeded\n", tx.Scenario.Name, tx.EthTx.Hash().Hex(), receipt.GasUsed) - } - return - - case <-w.ctx.Done(): - return + log.Printf("āŒ error getting receipt for tx %s: %v", tx.EthTx.Hash().Hex(), err) + continue + } + // Receipt found - log status and return + if receipt.Status != 1 { + return fmt.Errorf("tx %s failed", tx.EthTx.Hash().Hex()) + } + if w.debug { + log.Printf("āœ… tx %s, %s, gas=%d succeeded\n", tx.Scenario.Name, tx.EthTx.Hash().Hex(), receipt.GasUsed) } + return nil } } // processTransactions is the main worker loop that processes transactions -func (w *Worker) processTransactions() { +func (w *Worker) processTransactions(ctx context.Context, client *http.Client) error { for { - select { - case tx, ok := <-w.txChan: - if !ok { - // Channel closed, worker should exit - return - } - w.sendTransaction(tx) - case <-w.ctx.Done(): - // Context cancelled, worker should exit - return + tx,err := utils.Recv(ctx, w.txChan) + if err != nil { return err } + startTime := time.Now() + err = w.sendTransaction(ctx, client, tx) + // Record statistics if collector is available + if w.collector != nil { + w.collector.RecordTransaction(tx.Scenario.Name, w.endpoint, time.Since(startTime), err==nil) + } + if err!=nil { + log.Printf("%v",err) } } } // sendTransaction sends a single transaction to the endpoint -func (w *Worker) sendTransaction(tx *types.LoadTx) { - startTime := time.Now() - success := false - - defer func() { - // Record statistics if collector is available - if w.collector != nil { - latency := time.Since(startTime) - w.collector.RecordTransaction(tx.Scenario.Name, w.endpoint, latency, success) - } - }() - +func (w *Worker) sendTransaction(ctx context.Context, client *http.Client, tx *types.LoadTx) error { if w.dryRun { // In dry-run mode, simulate processing time and mark as successful // Use very minimal delay to avoid channel overflow - time.Sleep(10 * time.Microsecond) // Much faster simulation - success = true - return + return utils.Sleep(ctx, 10 * time.Microsecond) // Much faster simulation } // Create HTTP request with JSON-RPC payload - req, err := http.NewRequestWithContext(w.ctx, "POST", w.endpoint, bytes.NewReader(tx.JSONRPCPayload)) + req, err := http.NewRequestWithContext(ctx, "POST", w.endpoint, bytes.NewReader(tx.JSONRPCPayload)) if err != nil { - fmt.Printf("Worker %d: Failed to create request: %v\n", w.id, err) - return + return fmt.Errorf("Worker %d: Failed to create request: %w", w.id, err) } // Set headers for JSON-RPC @@ -227,39 +179,30 @@ func (w *Worker) sendTransaction(tx *types.LoadTx) { req.Header.Set("Accept", "application/json") // Send the request - resp, err := w.client.Do(req) + resp, err := client.Do(req) if err != nil { - fmt.Printf("Worker %d: Failed to send transaction: %v\n", w.id, err) - return + return fmt.Errorf("Worker %d: Failed to send transaction: %w", w.id, err) } - defer func() { - if err := resp.Body.Close(); err != nil { - fmt.Printf("Worker %d: Failed to close response body: %v\n", w.id, err) - } - }() + defer resp.Body.Close() // Always read and discard response body to enable connection reuse // Limit read to prevent memory issues with large responses _, err = io.CopyN(io.Discard, resp.Body, 64*1024) // Read up to 64KB if err != nil && err != io.EOF { - fmt.Printf("Worker %d: Failed to read response body: %v\n", w.id, err) // Log but don't fail - this is just for connection reuse } // Check response status if resp.StatusCode != http.StatusOK { - fmt.Printf("Worker %d: HTTP error %d for transaction to %s\n", w.id, resp.StatusCode, w.endpoint) - return + return fmt.Errorf("Worker %d: HTTP error %d for transaction to %s", w.id, resp.StatusCode, w.endpoint) } - // Mark as successful - success = true - // Write to sentTxs channel without blocking select { case w.sentTxs <- tx: default: } + return nil } // GetChannelLength returns the current length of the worker's channel (for monitoring) diff --git a/stats/block_collector.go b/stats/block_collector.go index 1623496..8a32c43 100644 --- a/stats/block_collector.go +++ b/stats/block_collector.go @@ -6,18 +6,15 @@ import ( "log" "sort" "strings" - "sync" "time" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" + "github.com/sei-protocol/sei-load/utils" + "github.com/sei-protocol/sei-load/utils/service" ) -// BlockCollector subscribes to new blocks and tracks block metrics -type BlockCollector struct { - mu sync.RWMutex - +type blockCollectorStats struct { // Cumulative data (for final stats) allBlockTimes []time.Duration // All block times allGasUsed []uint64 // All gas used values @@ -28,246 +25,182 @@ type BlockCollector struct { windowBlockTimes []time.Duration // Block times in current window windowGasUsed []uint64 // Gas used in current window windowStart time.Time // Start of current window +} - // WebSocket connection - client *ethclient.Client - subscription ethereum.Subscription - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - - // Configuration - wsEndpoint string - running bool +// BlockCollector subscribes to new blocks and tracks block metrics +type BlockCollector struct { + stats utils.Mutex[*blockCollectorStats] } // NewBlockCollector creates a new block data collector -func NewBlockCollector(firstEndpoint string) *BlockCollector { - // Convert HTTP endpoint to WebSocket endpoint (8545 -> 8546) - wsEndpoint := strings.Replace(firstEndpoint, ":8545", ":8546", 1) - wsEndpoint = strings.Replace(wsEndpoint, "http://", "ws://", 1) - - ctx, cancel := context.WithCancel(context.Background()) - +func NewBlockCollector() *BlockCollector { return &BlockCollector{ - allBlockTimes: make([]time.Duration, 0), - allGasUsed: make([]uint64, 0), - windowBlockTimes: make([]time.Duration, 0), - windowGasUsed: make([]uint64, 0), - wsEndpoint: wsEndpoint, - ctx: ctx, - cancel: cancel, + stats: utils.NewMutex(&blockCollectorStats{ + allBlockTimes: make([]time.Duration, 0), + allGasUsed: make([]uint64, 0), + windowBlockTimes: make([]time.Duration, 0), + windowGasUsed: make([]uint64, 0), + }), } } // Start begins block subscription and data collection -func (bc *BlockCollector) Start() error { - bc.mu.Lock() - defer bc.mu.Unlock() - - if bc.running { - return fmt.Errorf("block collector already running") - } - - // Connect to WebSocket endpoint - client, err := ethclient.Dial(bc.wsEndpoint) - if err != nil { - return fmt.Errorf("failed to connect to WebSocket endpoint %s: %v", bc.wsEndpoint, err) - } - - bc.client = client - bc.running = true - - // Start the subscription goroutine - bc.wg.Add(1) - go bc.subscribeToBlocks() - return nil -} - -// Stop gracefully shuts down the block collector -func (bc *BlockCollector) Stop() { - bc.mu.Lock() - defer bc.mu.Unlock() - - if !bc.running { - return - } - - bc.running = false - bc.cancel() - - if bc.subscription != nil { - bc.subscription.Unsubscribe() - } - - if bc.client != nil { - bc.client.Close() - } - - bc.wg.Wait() - log.Printf("āœ… Stopped block collector") -} - -// subscribeToBlocks handles the WebSocket subscription to new blocks -func (bc *BlockCollector) subscribeToBlocks() { - defer bc.wg.Done() - - headers := make(chan *types.Header) - sub, err := bc.client.SubscribeNewHead(bc.ctx, headers) - if err != nil { - log.Printf("āŒ Failed to subscribe to new blocks: %v", err) - return - } - - bc.mu.Lock() - bc.subscription = sub - bc.mu.Unlock() - - log.Printf("šŸ“” Subscribed to new blocks on %s", bc.wsEndpoint) - - for { - select { - case err := <-sub.Err(): - log.Printf("āŒ Block subscription error: %v", err) - return - - case header := <-headers: +func (bc *BlockCollector) Run(ctx context.Context, firstEndpoint string) error { + // Convert HTTP endpoint to WebSocket endpoint (8545 -> 8546) + wsEndpoint := strings.Replace(firstEndpoint, ":8545", ":8546", 1) + wsEndpoint = strings.Replace(wsEndpoint, "http://", "ws://", 1) + return service.Run(ctx, func(ctx context.Context, s service.Scope) error { + // Connect to WebSocket endpoint + client, err := ethclient.Dial(wsEndpoint) + if err != nil { + return fmt.Errorf("failed to connect to WebSocket endpoint %s: %w", wsEndpoint, err) + } + headers := make(chan *types.Header) + sub, err := client.SubscribeNewHead(ctx, headers) + if err != nil { + return fmt.Errorf("āŒ Failed to subscribe to new blocks: %w", err) + } + defer sub.Unsubscribe() + s.SpawnBg(func() error { + subErr,err := utils.Recv(ctx, sub.Err()) + if err != nil { return err } + return subErr + }) + log.Printf("šŸ“” Subscribed to new blocks on %s", wsEndpoint) + for { + header,err := utils.Recv(ctx, headers) + if err != nil { return err } bc.processNewBlock(header) - - case <-bc.ctx.Done(): - return } - } + }) } // processNewBlock processes a new block header and updates metrics func (bc *BlockCollector) processNewBlock(header *types.Header) { - bc.mu.Lock() - defer bc.mu.Unlock() - - now := time.Now() - blockNum := header.Number.Uint64() - gasUsed := header.GasUsed - - // Update max block number - if blockNum > bc.maxBlockNum { - bc.maxBlockNum = blockNum - } + for stats := range bc.stats.Lock() { + now := time.Now() + blockNum := header.Number.Uint64() + gasUsed := header.GasUsed + + // Update max block number + if blockNum > stats.maxBlockNum { + stats.maxBlockNum = blockNum + } - // Track gas used - bc.allGasUsed = append(bc.allGasUsed, gasUsed) - bc.windowGasUsed = append(bc.windowGasUsed, gasUsed) + // Track gas used + stats.allGasUsed = append(stats.allGasUsed, gasUsed) + stats.windowGasUsed = append(stats.windowGasUsed, gasUsed) - // Calculate time between blocks - if !bc.lastBlockTime.IsZero() { - timeBetween := now.Sub(bc.lastBlockTime) - bc.allBlockTimes = append(bc.allBlockTimes, timeBetween) - bc.windowBlockTimes = append(bc.windowBlockTimes, timeBetween) - } + // Calculate time between blocks + if !stats.lastBlockTime.IsZero() { + timeBetween := now.Sub(stats.lastBlockTime) + stats.allBlockTimes = append(stats.allBlockTimes, timeBetween) + stats.windowBlockTimes = append(stats.windowBlockTimes, timeBetween) + } - bc.lastBlockTime = now + stats.lastBlockTime = now - // Limit history to prevent memory growth (keep last 1000 entries) - if len(bc.allBlockTimes) > 1000 { - bc.allBlockTimes = bc.allBlockTimes[len(bc.allBlockTimes)-1000:] - } - if len(bc.allGasUsed) > 1000 { - bc.allGasUsed = bc.allGasUsed[len(bc.allGasUsed)-1000:] - } - if len(bc.windowBlockTimes) > 1000 { - bc.windowBlockTimes = bc.windowBlockTimes[len(bc.windowBlockTimes)-1000:] - } - if len(bc.windowGasUsed) > 1000 { - bc.windowGasUsed = bc.windowGasUsed[len(bc.windowGasUsed)-1000:] + // Limit history to prevent memory growth (keep last 1000 entries) + if len(stats.allBlockTimes) > 1000 { + stats.allBlockTimes = stats.allBlockTimes[len(stats.allBlockTimes)-1000:] + } + if len(stats.allGasUsed) > 1000 { + stats.allGasUsed = stats.allGasUsed[len(stats.allGasUsed)-1000:] + } + if len(stats.windowBlockTimes) > 1000 { + stats.windowBlockTimes = stats.windowBlockTimes[len(stats.windowBlockTimes)-1000:] + } + if len(stats.windowGasUsed) > 1000 { + stats.windowGasUsed = stats.windowGasUsed[len(stats.windowGasUsed)-1000:] + } } } // GetBlockStats returns current block statistics func (bc *BlockCollector) GetBlockStats() BlockStats { - bc.mu.RLock() - defer bc.mu.RUnlock() - - stats := BlockStats{ - MaxBlockNumber: bc.maxBlockNum, - SampleCount: len(bc.allBlockTimes), - } - - // Calculate block time percentiles - if len(bc.allBlockTimes) > 0 { - sortedTimes := make([]time.Duration, len(bc.allBlockTimes)) - copy(sortedTimes, bc.allBlockTimes) - sort.Slice(sortedTimes, func(i, j int) bool { - return sortedTimes[i] < sortedTimes[j] - }) + for bc := range bc.stats.Lock() { + stats := BlockStats{ + MaxBlockNumber: bc.maxBlockNum, + SampleCount: len(bc.allBlockTimes), + } - stats.P50BlockTime = calculatePercentile(sortedTimes, 50) - stats.P99BlockTime = calculatePercentile(sortedTimes, 99) - stats.MaxBlockTime = sortedTimes[len(sortedTimes)-1] - } + // Calculate block time percentiles + if len(bc.allBlockTimes) > 0 { + sortedTimes := make([]time.Duration, len(bc.allBlockTimes)) + copy(sortedTimes, bc.allBlockTimes) + sort.Slice(sortedTimes, func(i, j int) bool { + return sortedTimes[i] < sortedTimes[j] + }) + + stats.P50BlockTime = calculatePercentile(sortedTimes, 50) + stats.P99BlockTime = calculatePercentile(sortedTimes, 99) + stats.MaxBlockTime = sortedTimes[len(sortedTimes)-1] + } - // Calculate gas used percentiles - if len(bc.allGasUsed) > 0 { - sortedGas := make([]uint64, len(bc.allGasUsed)) - copy(sortedGas, bc.allGasUsed) - sort.Slice(sortedGas, func(i, j int) bool { - return sortedGas[i] < sortedGas[j] - }) + // Calculate gas used percentiles + if len(bc.allGasUsed) > 0 { + sortedGas := make([]uint64, len(bc.allGasUsed)) + copy(sortedGas, bc.allGasUsed) + sort.Slice(sortedGas, func(i, j int) bool { + return sortedGas[i] < sortedGas[j] + }) + + stats.P50GasUsed = calculateGasPercentile(sortedGas, 50) + stats.P99GasUsed = calculateGasPercentile(sortedGas, 99) + stats.MaxGasUsed = sortedGas[len(sortedGas)-1] + } - stats.P50GasUsed = calculateGasPercentile(sortedGas, 50) - stats.P99GasUsed = calculateGasPercentile(sortedGas, 99) - stats.MaxGasUsed = sortedGas[len(sortedGas)-1] + return stats } - - return stats + panic("unreachable") } // GetWindowBlockStats returns current window-based block statistics func (bc *BlockCollector) GetWindowBlockStats() BlockStats { - bc.mu.RLock() - defer bc.mu.RUnlock() - - stats := BlockStats{ - MaxBlockNumber: bc.maxBlockNum, - SampleCount: len(bc.windowBlockTimes), - } - - // Calculate block time percentiles for current window - if len(bc.windowBlockTimes) > 0 { - sortedTimes := make([]time.Duration, len(bc.windowBlockTimes)) - copy(sortedTimes, bc.windowBlockTimes) - sort.Slice(sortedTimes, func(i, j int) bool { - return sortedTimes[i] < sortedTimes[j] - }) + for bc := range bc.stats.Lock() { + stats := BlockStats{ + MaxBlockNumber: bc.maxBlockNum, + SampleCount: len(bc.windowBlockTimes), + } - stats.P50BlockTime = calculatePercentile(sortedTimes, 50) - stats.P99BlockTime = calculatePercentile(sortedTimes, 99) - stats.MaxBlockTime = sortedTimes[len(sortedTimes)-1] - } + // Calculate block time percentiles for current window + if len(bc.windowBlockTimes) > 0 { + sortedTimes := make([]time.Duration, len(bc.windowBlockTimes)) + copy(sortedTimes, bc.windowBlockTimes) + sort.Slice(sortedTimes, func(i, j int) bool { + return sortedTimes[i] < sortedTimes[j] + }) + + stats.P50BlockTime = calculatePercentile(sortedTimes, 50) + stats.P99BlockTime = calculatePercentile(sortedTimes, 99) + stats.MaxBlockTime = sortedTimes[len(sortedTimes)-1] + } - // Calculate gas used percentiles for current window - if len(bc.windowGasUsed) > 0 { - sortedGas := make([]uint64, len(bc.windowGasUsed)) - copy(sortedGas, bc.windowGasUsed) - sort.Slice(sortedGas, func(i, j int) bool { - return sortedGas[i] < sortedGas[j] - }) + // Calculate gas used percentiles for current window + if len(bc.windowGasUsed) > 0 { + sortedGas := make([]uint64, len(bc.windowGasUsed)) + copy(sortedGas, bc.windowGasUsed) + sort.Slice(sortedGas, func(i, j int) bool { + return sortedGas[i] < sortedGas[j] + }) + + stats.P50GasUsed = calculateGasPercentile(sortedGas, 50) + stats.P99GasUsed = calculateGasPercentile(sortedGas, 99) + stats.MaxGasUsed = sortedGas[len(sortedGas)-1] + } - stats.P50GasUsed = calculateGasPercentile(sortedGas, 50) - stats.P99GasUsed = calculateGasPercentile(sortedGas, 99) - stats.MaxGasUsed = sortedGas[len(sortedGas)-1] + return stats } - - return stats + panic("unreachable") } // ResetWindowStats resets the window-based statistics for the next reporting period func (bc *BlockCollector) ResetWindowStats() { - bc.mu.Lock() - defer bc.mu.Unlock() - - bc.windowBlockTimes = make([]time.Duration, 0) - bc.windowGasUsed = make([]uint64, 0) - bc.windowStart = time.Now() + for bc := range bc.stats.Lock() { + bc.windowBlockTimes = make([]time.Duration, 0) + bc.windowGasUsed = make([]uint64, 0) + bc.windowStart = time.Now() + } } // calculateGasPercentile calculates the given percentile from sorted gas values diff --git a/stats/collector.go b/stats/collector.go index 3c2ea7e..474a164 100644 --- a/stats/collector.go +++ b/stats/collector.go @@ -30,8 +30,8 @@ type Collector struct { blockCollector *BlockCollector // Global metrics - startTime time.Time - totalTxs uint64 + startTime time.Time + totalTxs uint64 lastWindowTime time.Time // Configuration @@ -316,7 +316,7 @@ func (c *Collector) GetStats() Stats { func (c *Collector) GetCumulativeBlockStats() *BlockStats { c.mu.RLock() defer c.mu.RUnlock() - + if c.blockCollector != nil { stats := c.blockCollector.GetBlockStats() return &stats @@ -340,13 +340,13 @@ func calculatePercentile(sorted []time.Duration, percentile int) time.Duration { // Stats represents comprehensive load test statistics type Stats struct { - StartTime time.Time - TotalTxs uint64 - TxCounts map[string]map[string]uint64 // [scenario][endpoint] -> count - EndpointStats map[string]EndpointStats - OverallMaxTPS float64 + StartTime time.Time + TotalTxs uint64 + TxCounts map[string]map[string]uint64 // [scenario][endpoint] -> count + EndpointStats map[string]EndpointStats + OverallMaxTPS float64 OverallCurrentTPS float64 - BlockStats *BlockStats // Block-related statistics + BlockStats *BlockStats // Block-related statistics } // EndpointStats represents statistics for a specific endpoint @@ -371,12 +371,12 @@ type EndpointStats struct { // WindowStats tracks metrics for the current reporting window type WindowStats struct { - windowStart time.Time - txCount uint64 - latencySum time.Duration - latencyCount int - maxLatency time.Duration - minLatency time.Duration + windowStart time.Time + txCount uint64 + latencySum time.Duration + latencyCount int + maxLatency time.Duration + minLatency time.Duration cumulativeMaxTPS float64 cumulativeMaxLatency time.Duration } diff --git a/stats/logger.go b/stats/logger.go index 1654484..4881c0c 100644 --- a/stats/logger.go +++ b/stats/logger.go @@ -2,9 +2,10 @@ package stats import ( "context" - "fmt" + "log" "sync" "time" + "github.com/sei-protocol/sei-load/utils" ) // Logger handles periodic statistics logging and dry-run transaction printing @@ -12,50 +13,29 @@ type Logger struct { collector *Collector interval time.Duration debug bool - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup + + // Dry-run transaction logging + txCounter uint64 + txCounterMu sync.Mutex } // NewLogger creates a new statistics logger func NewLogger(collector *Collector, interval time.Duration, debug bool) *Logger { - ctx, cancel := context.WithCancel(context.Background()) - return &Logger{ collector: collector, interval: interval, debug: debug, - ctx: ctx, - cancel: cancel, } } // Start begins periodic statistics logging -func (l *Logger) Start() { - l.wg.Add(1) - go l.logLoop() -} - -// Stop gracefully shuts down the logger -func (l *Logger) Stop() { - l.cancel() - l.wg.Wait() -} - -// logLoop runs the periodic statistics logging -func (l *Logger) logLoop() { - defer l.wg.Done() - +func (l *Logger) Run(ctx context.Context) error { ticker := time.NewTicker(l.interval) - defer ticker.Stop() - for { - select { - case <-l.ctx.Done(): - return - case <-ticker.C: - l.logCurrentStats() + if _,err := utils.Recv(ctx, ticker.C); err != nil { + return err } + l.logCurrentStats() } } @@ -116,7 +96,7 @@ func (l *Logger) logCurrentStats() { if l.debug { // Format: [timestamp] endpoint | TXs: total | TPS: window(max) | Latency: avg(max) | P50: x P99: x - fmt.Printf("[%s] %s | TXs: %d | TPS: %.1f(%.1f) | Lat: %v(%v) | P50: %v P99: %v\n", + log.Printf("[%s] %s | TXs: %d | TPS: %.1f(%.1f) | Lat: %v(%v) | P50: %v P99: %v", time.Now().Format("15:04:05"), endpoint, totalTxsForEndpoint, @@ -136,7 +116,7 @@ func (l *Logger) logCurrentStats() { } // Print overall summary line - fmt.Printf("[%s] throughput tps=%.2f, txs=%d, latency(avg=%v p50=%v p99=%v max=%v)\n", + log.Printf("[%s] throughput tps=%.2f, txs=%d, latency(avg=%v p50=%v p99=%v max=%v)", time.Now().Format("15:04:05"), totalWindowTPS, totalTxs, @@ -147,7 +127,7 @@ func (l *Logger) logCurrentStats() { // Print block statistics if available if stats.BlockStats != nil && stats.BlockStats.SampleCount > 0 { - fmt.Printf("[%s] %s\n", + log.Printf("[%s] %s", time.Now().Format("15:04:05"), stats.BlockStats.FormatBlockStats()) } @@ -165,19 +145,19 @@ func (l *Logger) logCurrentStats() { func (l *Logger) LogFinalStats() { stats := l.collector.GetStats() - fmt.Println("\n" + "=============================") - fmt.Println("FINAL LOAD TEST RESULTS") - fmt.Println("=============================") - fmt.Print(stats.FormatStats()) + log.Print("\n=============================") + log.Print("FINAL LOAD TEST RESULTS") + log.Print("=============================") + log.Print(stats.FormatStats()) // Additional final statistics duration := time.Since(stats.StartTime) if duration.Seconds() > 0 { - fmt.Printf("\nOverall Performance Summary:\n") - fmt.Printf(" Total Runtime: %v\n", duration.Round(time.Second)) - fmt.Printf(" Total Transactions: %d\n", stats.TotalTxs) - fmt.Printf(" Average TPS: %.2f\n", float64(stats.TotalTxs)/duration.Seconds()) - fmt.Printf(" Max TPS: %.2f\n", stats.OverallMaxTPS) + log.Printf("\nOverall Performance Summary:") + log.Printf(" Total Runtime: %v", duration.Round(time.Second)) + log.Printf(" Total Transactions: %d", stats.TotalTxs) + log.Printf(" Average TPS: %.2f", float64(stats.TotalTxs)/duration.Seconds()) + log.Printf(" Max TPS: %.2f", stats.OverallMaxTPS) // Calculate total transactions per scenario scenarioTotals := make(map[string]uint64) @@ -189,27 +169,27 @@ func (l *Logger) LogFinalStats() { scenarioTotals[scenario] = total } - fmt.Printf("\nScenario Distribution:\n") + log.Printf("\nScenario Distribution:") for scenario, total := range scenarioTotals { percentage := float64(total) / float64(stats.TotalTxs) * 100 - fmt.Printf(" %s: %d (%.1f%%)\n", scenario, total, percentage) + log.Printf(" %s: %d (%.1f%%)", scenario, total, percentage) } } // Print overall gas statistics if available (use cumulative data) if cumulativeBlockStats := l.collector.GetCumulativeBlockStats(); cumulativeBlockStats != nil && cumulativeBlockStats.SampleCount > 0 { - fmt.Printf("\nOverall Gas Statistics:\n") - fmt.Printf(" Max Block Number: %d\n", cumulativeBlockStats.MaxBlockNumber) - fmt.Printf(" Block Times: p50=%v p99=%v max=%v\n", + log.Printf("\nOverall Gas Statistics:") + log.Printf(" Max Block Number: %d", cumulativeBlockStats.MaxBlockNumber) + log.Printf(" Block Times: p50=%v p99=%v max=%v", cumulativeBlockStats.P50BlockTime.Round(time.Millisecond), cumulativeBlockStats.P99BlockTime.Round(time.Millisecond), cumulativeBlockStats.MaxBlockTime.Round(time.Millisecond)) - fmt.Printf(" Gas Usage: p50=%d p99=%d max=%d\n", + log.Printf(" Gas Usage: p50=%d p99=%d max=%d", cumulativeBlockStats.P50GasUsed, cumulativeBlockStats.P99GasUsed, cumulativeBlockStats.MaxGasUsed) - fmt.Printf(" Block Samples: %d\n", cumulativeBlockStats.SampleCount) + log.Printf(" Block Samples: %d", cumulativeBlockStats.SampleCount) } - fmt.Println("==============================") + log.Printf("==============================") } diff --git a/types/account.go b/types/account.go index dd6b809..45dc96b 100644 --- a/types/account.go +++ b/types/account.go @@ -36,7 +36,7 @@ func (s *Account) GetAndIncrementNonce() uint64 { // GenerateAccounts generates random accounts. func GenerateAccounts(n int) []*Account { result := make([]*Account, 0, n) - for i := 0; i < n; i++ { + for range n { newAcc, err := NewAccount() if err != nil { panic(err) From bb727f2e92175a64f6d42c85bb01045ad7ea2225 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Mon, 4 Aug 2025 12:55:45 +0200 Subject: [PATCH 5/7] fmt --- generator/prewarm.go | 40 +++++------ generator/scenarios/factory.go | 6 +- generator/weighted.go | 2 +- sender/dispatcher.go | 22 +++--- sender/sender_test.go | 2 +- sender/sharded_sender.go | 2 +- sender/worker.go | 60 ++++++++-------- stats/block_collector.go | 12 ++-- stats/collector.go | 30 ++++---- stats/logger.go | 4 +- types/types_test.go | 126 ++++++++++++++++----------------- 11 files changed, 158 insertions(+), 148 deletions(-) diff --git a/generator/prewarm.go b/generator/prewarm.go index 5f5d7c1..0862bc7 100644 --- a/generator/prewarm.go +++ b/generator/prewarm.go @@ -10,31 +10,31 @@ import ( // PrewarmGenerator generates self-transfer transactions to prewarm account nonces type PrewarmGenerator struct { - accountPools []types.AccountPool - evmScenario scenarios.TxGenerator - currentPoolIdx int - finished bool - mu sync.RWMutex + accountPools []types.AccountPool + evmScenario scenarios.TxGenerator + currentPoolIdx int + finished bool + mu sync.RWMutex } // NewPrewarmGenerator creates a new prewarm generator using all account pools from the main generator func NewPrewarmGenerator(config *config.LoadConfig, mainGenerator Generator) *PrewarmGenerator { // Get all account pools from the main generator accountPools := mainGenerator.GetAccountPools() - + // Create EVMTransfer scenario for prewarming evmScenario := scenarios.NewEVMTransferScenario() - + // Deploy/initialize the scenario (EVMTransfer doesn't need actual deployment) deployerAccounts := types.GenerateAccounts(1) deployer := deployerAccounts[0] evmScenario.Deploy(config, deployer) - + return &PrewarmGenerator{ - accountPools: accountPools, - evmScenario: evmScenario, + accountPools: accountPools, + evmScenario: evmScenario, currentPoolIdx: 0, - finished: false, + finished: false, } } @@ -42,38 +42,38 @@ func NewPrewarmGenerator(config *config.LoadConfig, mainGenerator Generator) *Pr func (pg *PrewarmGenerator) Generate() (*types.LoadTx, bool) { pg.mu.Lock() defer pg.mu.Unlock() - + // Check if we're already finished if pg.finished || pg.currentPoolIdx >= len(pg.accountPools) { return nil, false } - + // Get current pool currentPool := pg.accountPools[pg.currentPoolIdx] account := currentPool.NextAccount() - + // If this account has nonce > 0, we've already prewarmed it (round-robin means we're done with this pool) if account.Nonce > 0 { // Move to next pool pg.currentPoolIdx++ - + // Check if we've finished all pools if pg.currentPoolIdx >= len(pg.accountPools) { pg.finished = true return nil, false } - + // Get account from next pool currentPool = pg.accountPools[pg.currentPoolIdx] account = currentPool.NextAccount() - + // If this account also has nonce > 0, we're completely done if account.Nonce > 0 { pg.finished = true return nil, false } } - + // Create self-transfer transaction scenario := &types.TxScenario{ Name: "EVMTransfer", @@ -81,7 +81,7 @@ func (pg *PrewarmGenerator) Generate() (*types.LoadTx, bool) { Receiver: account.Address, // Send to self Nonce: account.GetAndIncrementNonce(), } - + // Generate the transaction using EVMTransfer scenario return pg.evmScenario.Generate(scenario), true } @@ -103,7 +103,7 @@ func (pg *PrewarmGenerator) GenerateN(n int) []*types.LoadTx { func (pg *PrewarmGenerator) GetAccountPools() []types.AccountPool { pg.mu.RLock() defer pg.mu.RUnlock() - + // Return a copy to prevent external modification pools := make([]types.AccountPool, len(pg.accountPools)) copy(pools, pg.accountPools) diff --git a/generator/scenarios/factory.go b/generator/scenarios/factory.go index e574e41..f04c388 100644 --- a/generator/scenarios/factory.go +++ b/generator/scenarios/factory.go @@ -11,9 +11,9 @@ var scenarioFactories = map[string]ScenarioFactory{ // Auto-generated entries will be added below this line by make generate // DO NOT EDIT BELOW THIS LINE - AUTO-GENERATED CONTENT ERC20Conflict: NewERC20ConflictScenario, - ERC20Noop: NewERC20NoopScenario, - ERC20: NewERC20Scenario, - ERC721: NewERC721Scenario, + ERC20Noop: NewERC20NoopScenario, + ERC20: NewERC20Scenario, + ERC721: NewERC721Scenario, // DO NOT EDIT ABOVE THIS LINE - AUTO-GENERATED CONTENT } diff --git a/generator/weighted.go b/generator/weighted.go index 60759db..d1c4452 100644 --- a/generator/weighted.go +++ b/generator/weighted.go @@ -2,8 +2,8 @@ package generator import ( "context" - "math/rand" "github.com/sei-protocol/sei-load/types" + "math/rand" "sync" ) diff --git a/sender/dispatcher.go b/sender/dispatcher.go index 19f1db3..e62a884 100644 --- a/sender/dispatcher.go +++ b/sender/dispatcher.go @@ -3,14 +3,14 @@ package sender import ( "context" "fmt" + "golang.org/x/time/rate" + "log" "sync" "time" - "log" - "golang.org/x/time/rate" - "github.com/sei-protocol/sei-load/utils" "github.com/sei-protocol/sei-load/generator" "github.com/sei-protocol/sei-load/stats" + "github.com/sei-protocol/sei-load/utils" ) // Dispatcher continuously generates transactions and dispatches them to the sender @@ -20,7 +20,7 @@ type Dispatcher struct { sender TxSender // Configuration - limiter *rate.Limiter + limiter *rate.Limiter // Statistics totalSent uint64 @@ -33,7 +33,7 @@ func NewDispatcher(gen generator.Generator, sender TxSender) *Dispatcher { return &Dispatcher{ generator: gen, sender: sender, - limiter: rate.NewLimiter(rate.Inf, 1), // No rate limiting by default + limiter: rate.NewLimiter(rate.Inf, 1), // No rate limiting by default } } @@ -41,7 +41,7 @@ func NewDispatcher(gen generator.Generator, sender TxSender) *Dispatcher { func (d *Dispatcher) SetRateLimit(duration time.Duration) { d.mu.Lock() defer d.mu.Unlock() - d.limiter = rate.NewLimiter(rate.Every(duration),1) + d.limiter = rate.NewLimiter(rate.Every(duration), 1) } // SetStatsCollector sets the statistics collector for this dispatcher @@ -65,7 +65,9 @@ func (d *Dispatcher) Prewarm(ctx context.Context) error { d.mu.RUnlock() gen, ok := prewarmGen.Get() - if !ok { return nil } // No prewarming configured + if !ok { + return nil + } // No prewarming configured log.Print("šŸ”„ Starting account prewarming...") processedAccounts := 0 @@ -103,7 +105,7 @@ func (d *Dispatcher) Run(ctx context.Context) error { d.mu.RUnlock() for { - if err:=limiter.Wait(ctx); err!=nil { + if err := limiter.Wait(ctx); err != nil { return err } // Generate a transaction from main generator @@ -116,7 +118,7 @@ func (d *Dispatcher) Run(ctx context.Context) error { // Send the transaction if err := d.sender.Send(ctx, tx); err != nil { return err - } + } d.mu.Lock() d.totalSent++ d.mu.Unlock() @@ -132,7 +134,7 @@ func (d *Dispatcher) RunBatch(ctx context.Context, count int) error { limiter := d.limiter d.mu.RUnlock() for i := range count { - if err:=limiter.Wait(ctx); err!=nil { + if err := limiter.Wait(ctx); err != nil { return err } // Generate a transaction diff --git a/sender/sender_test.go b/sender/sender_test.go index 59ac91e..aacdaa3 100644 --- a/sender/sender_test.go +++ b/sender/sender_test.go @@ -110,7 +110,7 @@ func TestShardDistributionVerification(t *testing.T) { mockAccount := &types.Account{ Address: common.HexToAddress("0x1234567890123456789012345678901234567890"), } - + mockTx := &types.LoadTx{ EthTx: ethtypes.NewTransaction(0, common.Address{}, big.NewInt(0), 21000, big.NewInt(1000000000), nil), Scenario: &types.TxScenario{ diff --git a/sender/sharded_sender.go b/sender/sharded_sender.go index d98bbc5..86085d1 100644 --- a/sender/sharded_sender.go +++ b/sender/sharded_sender.go @@ -1,9 +1,9 @@ package sender import ( + "context" "fmt" "sync" - "context" "github.com/sei-protocol/sei-load/config" "github.com/sei-protocol/sei-load/stats" diff --git a/sender/worker.go b/sender/worker.go index 8492cac..970989e 100644 --- a/sender/worker.go +++ b/sender/worker.go @@ -6,34 +6,34 @@ import ( "fmt" "github.com/ethereum/go-ethereum/ethclient" "io" - "net" "log" + "net" "net/http" "time" "github.com/sei-protocol/sei-load/stats" "github.com/sei-protocol/sei-load/types" - "github.com/sei-protocol/sei-load/utils/service" "github.com/sei-protocol/sei-load/utils" + "github.com/sei-protocol/sei-load/utils/service" ) // Worker handles sending transactions to a specific endpoint type Worker struct { - id int - endpoint string - txChan chan *types.LoadTx - sentTxs chan *types.LoadTx - dryRun bool - debug bool - collector *stats.Collector - logger *stats.Logger - workers int + id int + endpoint string + txChan chan *types.LoadTx + sentTxs chan *types.LoadTx + dryRun bool + debug bool + collector *stats.Collector + logger *stats.Logger + workers int trackReceipts bool } func newHttpClient() *http.Client { return &http.Client{ - Timeout: 30 * time.Second, + Timeout: 30 * time.Second, Transport: &http.Transport{ DialContext: (&net.Dialer{ Timeout: 10 * time.Second, @@ -52,11 +52,11 @@ func newHttpClient() *http.Client { // NewWorker creates a new worker for a specific endpoint func NewWorker(id int, endpoint string, bufferSize int, workers int) *Worker { return &Worker{ - id: id, - endpoint: endpoint, - txChan: make(chan *types.LoadTx, bufferSize), - sentTxs: make(chan *types.LoadTx, bufferSize), - workers: workers, + id: id, + endpoint: endpoint, + txChan: make(chan *types.LoadTx, bufferSize), + sentTxs: make(chan *types.LoadTx, bufferSize), + workers: workers, trackReceipts: true, } } @@ -73,7 +73,7 @@ func (w *Worker) Run(ctx context.Context) error { // Start multiple worker goroutines that share the same channel client := newHttpClient() for range w.workers { - s.Spawn(func() error { return w.processTransactions(ctx,client) }) + s.Spawn(func() error { return w.processTransactions(ctx, client) }) } return w.watchTransactions(ctx) }) @@ -109,11 +109,13 @@ func (w *Worker) watchTransactions(ctx context.Context) error { } for { tx, err := utils.Recv(ctx, w.sentTxs) - if err!=nil { return err } - ctx,cancel := context.WithTimeout(ctx, 10*time.Second) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() if err := w.waitForReceipt(ctx, eth, tx); err != nil { - log.Printf("āŒ %v",err) + log.Printf("āŒ %v", err) } } } @@ -121,7 +123,7 @@ func (w *Worker) watchTransactions(ctx context.Context) error { func (w *Worker) waitForReceipt(ctx context.Context, eth *ethclient.Client, tx *types.LoadTx) error { ticker := time.NewTicker(100 * time.Millisecond) for { - if _,err := utils.Recv(ctx, ticker.C); err != nil { + if _, err := utils.Recv(ctx, ticker.C); err != nil { return fmt.Errorf("timeout waiting for receipt for tx %s", tx.EthTx.Hash().Hex()) } receipt, err := eth.TransactionReceipt(context.Background(), tx.EthTx.Hash()) @@ -146,16 +148,18 @@ func (w *Worker) waitForReceipt(ctx context.Context, eth *ethclient.Client, tx * // processTransactions is the main worker loop that processes transactions func (w *Worker) processTransactions(ctx context.Context, client *http.Client) error { for { - tx,err := utils.Recv(ctx, w.txChan) - if err != nil { return err } + tx, err := utils.Recv(ctx, w.txChan) + if err != nil { + return err + } startTime := time.Now() err = w.sendTransaction(ctx, client, tx) // Record statistics if collector is available if w.collector != nil { - w.collector.RecordTransaction(tx.Scenario.Name, w.endpoint, time.Since(startTime), err==nil) + w.collector.RecordTransaction(tx.Scenario.Name, w.endpoint, time.Since(startTime), err == nil) } - if err!=nil { - log.Printf("%v",err) + if err != nil { + log.Printf("%v", err) } } } @@ -165,7 +169,7 @@ func (w *Worker) sendTransaction(ctx context.Context, client *http.Client, tx *t if w.dryRun { // In dry-run mode, simulate processing time and mark as successful // Use very minimal delay to avoid channel overflow - return utils.Sleep(ctx, 10 * time.Microsecond) // Much faster simulation + return utils.Sleep(ctx, 10*time.Microsecond) // Much faster simulation } // Create HTTP request with JSON-RPC payload diff --git a/stats/block_collector.go b/stats/block_collector.go index 8a32c43..e65831d 100644 --- a/stats/block_collector.go +++ b/stats/block_collector.go @@ -62,14 +62,18 @@ func (bc *BlockCollector) Run(ctx context.Context, firstEndpoint string) error { } defer sub.Unsubscribe() s.SpawnBg(func() error { - subErr,err := utils.Recv(ctx, sub.Err()) - if err != nil { return err } + subErr, err := utils.Recv(ctx, sub.Err()) + if err != nil { + return err + } return subErr }) log.Printf("šŸ“” Subscribed to new blocks on %s", wsEndpoint) for { - header,err := utils.Recv(ctx, headers) - if err != nil { return err } + header, err := utils.Recv(ctx, headers) + if err != nil { + return err + } bc.processNewBlock(header) } }) diff --git a/stats/collector.go b/stats/collector.go index 474a164..3c2ea7e 100644 --- a/stats/collector.go +++ b/stats/collector.go @@ -30,8 +30,8 @@ type Collector struct { blockCollector *BlockCollector // Global metrics - startTime time.Time - totalTxs uint64 + startTime time.Time + totalTxs uint64 lastWindowTime time.Time // Configuration @@ -316,7 +316,7 @@ func (c *Collector) GetStats() Stats { func (c *Collector) GetCumulativeBlockStats() *BlockStats { c.mu.RLock() defer c.mu.RUnlock() - + if c.blockCollector != nil { stats := c.blockCollector.GetBlockStats() return &stats @@ -340,13 +340,13 @@ func calculatePercentile(sorted []time.Duration, percentile int) time.Duration { // Stats represents comprehensive load test statistics type Stats struct { - StartTime time.Time - TotalTxs uint64 - TxCounts map[string]map[string]uint64 // [scenario][endpoint] -> count - EndpointStats map[string]EndpointStats - OverallMaxTPS float64 + StartTime time.Time + TotalTxs uint64 + TxCounts map[string]map[string]uint64 // [scenario][endpoint] -> count + EndpointStats map[string]EndpointStats + OverallMaxTPS float64 OverallCurrentTPS float64 - BlockStats *BlockStats // Block-related statistics + BlockStats *BlockStats // Block-related statistics } // EndpointStats represents statistics for a specific endpoint @@ -371,12 +371,12 @@ type EndpointStats struct { // WindowStats tracks metrics for the current reporting window type WindowStats struct { - windowStart time.Time - txCount uint64 - latencySum time.Duration - latencyCount int - maxLatency time.Duration - minLatency time.Duration + windowStart time.Time + txCount uint64 + latencySum time.Duration + latencyCount int + maxLatency time.Duration + minLatency time.Duration cumulativeMaxTPS float64 cumulativeMaxLatency time.Duration } diff --git a/stats/logger.go b/stats/logger.go index 4881c0c..c512d87 100644 --- a/stats/logger.go +++ b/stats/logger.go @@ -2,10 +2,10 @@ package stats import ( "context" + "github.com/sei-protocol/sei-load/utils" "log" "sync" "time" - "github.com/sei-protocol/sei-load/utils" ) // Logger handles periodic statistics logging and dry-run transaction printing @@ -32,7 +32,7 @@ func NewLogger(collector *Collector, interval time.Duration, debug bool) *Logger func (l *Logger) Run(ctx context.Context) error { ticker := time.NewTicker(l.interval) for { - if _,err := utils.Recv(ctx, ticker.C); err != nil { + if _, err := utils.Recv(ctx, ticker.C); err != nil { return err } l.logCurrentStats() diff --git a/types/types_test.go b/types/types_test.go index f5b03e8..54475c3 100644 --- a/types/types_test.go +++ b/types/types_test.go @@ -51,10 +51,10 @@ func TestAccountNonceConcurrency(t *testing.T) { const numGoroutines = 100 const noncesPerGoroutine = 10 - + var wg sync.WaitGroup nonces := make([]uint64, numGoroutines*noncesPerGoroutine) - + // Launch concurrent goroutines to increment nonce for i := 0; i < numGoroutines; i++ { wg.Add(1) @@ -66,9 +66,9 @@ func TestAccountNonceConcurrency(t *testing.T) { } }(i) } - + wg.Wait() - + // Verify all nonces are unique and in expected range nonceSet := make(map[uint64]bool) for _, nonce := range nonces { @@ -76,10 +76,10 @@ func TestAccountNonceConcurrency(t *testing.T) { nonceSet[nonce] = true assert.Less(t, nonce, uint64(numGoroutines*noncesPerGoroutine)) } - + // Verify we got exactly the expected number of unique nonces assert.Len(t, nonceSet, numGoroutines*noncesPerGoroutine) - + // Verify final nonce value assert.Equal(t, uint64(numGoroutines*noncesPerGoroutine), account.Nonce) } @@ -126,20 +126,20 @@ func TestAccountPoolRoundRobin(t *testing.T) { Accounts: accounts, NewAccountRate: 0.0, // No new accounts, pure round-robin } - + pool := NewAccountPool(config) - + // The account pool starts from index 1 (due to nextIndex() incrementing first) // So the first call returns accounts[1], second returns accounts[2], third returns accounts[0] expectedOrder := []int{1, 2, 0} // The actual order the pool returns accounts - + // Test multiple rounds of round-robin selection for round := 0; round < 3; round++ { for i, expectedIndex := range expectedOrder { selectedAccount := pool.NextAccount() expectedAccount := accounts[expectedIndex] - assert.Equal(t, expectedAccount.Address, selectedAccount.Address, - "Round %d, position %d: expected %s, got %s", + assert.Equal(t, expectedAccount.Address, selectedAccount.Address, + "Round %d, position %d: expected %s, got %s", round, i, expectedAccount.Address.Hex(), selectedAccount.Address.Hex()) } } @@ -151,19 +151,19 @@ func TestAccountPoolNewAccountRate(t *testing.T) { Accounts: accounts, NewAccountRate: 1.0, // Always generate new accounts } - + pool := NewAccountPool(config) - + // With 100% new account rate, should never get original accounts originalAddresses := make(map[common.Address]bool) for _, account := range accounts { originalAddresses[account.Address] = true } - + for i := 0; i < 10; i++ { selectedAccount := pool.NextAccount() - assert.False(t, originalAddresses[selectedAccount.Address], - "Iteration %d: got original account %s when expecting new account", + assert.False(t, originalAddresses[selectedAccount.Address], + "Iteration %d: got original account %s when expecting new account", i, selectedAccount.Address.Hex()) } } @@ -174,18 +174,18 @@ func TestAccountPoolMixedRate(t *testing.T) { Accounts: accounts, NewAccountRate: 0.5, // 50% new accounts } - + pool := NewAccountPool(config) - + originalAddresses := make(map[common.Address]bool) for _, account := range accounts { originalAddresses[account.Address] = true } - + const iterations = 100 originalCount := 0 newCount := 0 - + for i := 0; i < iterations; i++ { selectedAccount := pool.NextAccount() if originalAddresses[selectedAccount.Address] { @@ -194,12 +194,12 @@ func TestAccountPoolMixedRate(t *testing.T) { newCount++ } } - + // With 50% rate, expect roughly equal distribution (allow 20% variance) expectedNew := iterations / 2 tolerance := expectedNew / 5 // 20% tolerance - - assert.InDelta(t, expectedNew, newCount, float64(tolerance), + + assert.InDelta(t, expectedNew, newCount, float64(tolerance), "Expected ~%d new accounts, got %d (tolerance: ±%d)", expectedNew, newCount, tolerance) assert.Equal(t, iterations, originalCount+newCount, "Total accounts don't match iterations") } @@ -210,15 +210,15 @@ func TestAccountPoolConcurrency(t *testing.T) { Accounts: accounts, NewAccountRate: 0.0, // Pure round-robin for predictable testing } - + pool := NewAccountPool(config) - + const numGoroutines = 50 const selectionsPerGoroutine = 20 - + var wg sync.WaitGroup selectedAccounts := make([]common.Address, numGoroutines*selectionsPerGoroutine) - + // Launch concurrent goroutines to select accounts for i := 0; i < numGoroutines; i++ { wg.Add(1) @@ -230,17 +230,17 @@ func TestAccountPoolConcurrency(t *testing.T) { } }(i) } - + wg.Wait() - + // Verify all selected accounts are from the original pool originalAddresses := make(map[common.Address]bool) for _, account := range accounts { originalAddresses[account.Address] = true } - + for i, address := range selectedAccounts { - assert.True(t, originalAddresses[address], + assert.True(t, originalAddresses[address], "Selection %d: got unexpected address %s", i, address.Hex()) } } @@ -249,7 +249,7 @@ func TestCreateTxFromEthTx(t *testing.T) { // Create a test account and scenario account, err := NewAccount() require.NoError(t, err) - + receiver := common.HexToAddress("0x1234567890123456789012345678901234567890") scenario := &TxScenario{ Name: "TestScenario", @@ -257,7 +257,7 @@ func TestCreateTxFromEthTx(t *testing.T) { Sender: account, Receiver: receiver, } - + // Create a test transaction using DynamicFeeTx (EIP-1559) tx := types.NewTx(&types.DynamicFeeTx{ ChainID: big.NewInt(1329), // Sei testnet chain ID @@ -269,22 +269,22 @@ func TestCreateTxFromEthTx(t *testing.T) { Value: big.NewInt(1000000000000000000), // 1 ETH Data: nil, }) - + // Create LoadTx from the transaction loadTx := CreateTxFromEthTx(tx, scenario) - + // Verify LoadTx structure require.NotNil(t, loadTx) assert.Equal(t, tx, loadTx.EthTx) assert.Equal(t, scenario, loadTx.Scenario) assert.NotEmpty(t, loadTx.JSONRPCPayload) assert.NotEmpty(t, loadTx.Payload) - + // Verify JSON-RPC payload is valid JSON assert.Contains(t, string(loadTx.JSONRPCPayload), `"jsonrpc":"2.0"`) assert.Contains(t, string(loadTx.JSONRPCPayload), `"method":"eth_sendRawTransaction"`) assert.Contains(t, string(loadTx.JSONRPCPayload), `"id":0`) // Numeric ID, not string - + // Verify payload matches transaction binary data expectedPayload, err := tx.MarshalBinary() require.NoError(t, err) @@ -294,7 +294,7 @@ func TestCreateTxFromEthTx(t *testing.T) { func TestLoadTxShardID(t *testing.T) { // Create more test accounts to ensure better shard distribution accounts := GenerateAccounts(50) - + tests := []struct { name string numShards int @@ -305,11 +305,11 @@ func TestLoadTxShardID(t *testing.T) { {"Multiple shards", 5, 50}, {"Many shards", 16, 200}, // Increased iterations for better distribution } - + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { shardCounts := make(map[int]int) - + for i := 0; i < tt.iterations; i++ { account := accounts[i%len(accounts)] scenario := &TxScenario{ @@ -318,7 +318,7 @@ func TestLoadTxShardID(t *testing.T) { Sender: account, Receiver: common.Address{}, } - + // Create a simple transaction tx := types.NewTx(&types.DynamicFeeTx{ ChainID: big.NewInt(1329), // Sei testnet chain ID @@ -331,16 +331,16 @@ func TestLoadTxShardID(t *testing.T) { Data: nil, }) loadTx := CreateTxFromEthTx(tx, scenario) - + shardID := loadTx.ShardID(tt.numShards) - + // Verify shard ID is in valid range assert.GreaterOrEqual(t, shardID, 0, "Shard ID should be non-negative") assert.Less(t, shardID, tt.numShards, "Shard ID should be less than number of shards") - + shardCounts[shardID]++ } - + // For tests with sufficient iterations and accounts, expect reasonable distribution // Note: Hash-based shard distribution can be uneven, so we don't require all shards to be used // Instead, we verify that the distribution is reasonable and all shard IDs are valid @@ -351,16 +351,16 @@ func TestLoadTxShardID(t *testing.T) { assert.GreaterOrEqual(t, shardID, 0, "Shard ID should be non-negative") assert.Less(t, shardID, tt.numShards, "Shard ID should be less than number of shards") } - + // Verify total count matches iterations assert.Equal(t, tt.iterations, totalCount, "Total shard counts should match iterations") - + // For large numbers of shards, verify we're using a reasonable number of them // (at least 50% of available shards for sufficient iterations) if tt.numShards > 4 && tt.iterations >= tt.numShards*8 { usedShards := len(shardCounts) minExpectedShards := tt.numShards / 2 - assert.GreaterOrEqual(t, usedShards, minExpectedShards, + assert.GreaterOrEqual(t, usedShards, minExpectedShards, "Expected at least %d shards to be used, got %d", minExpectedShards, usedShards) } }) @@ -371,14 +371,14 @@ func TestLoadTxShardIDConsistency(t *testing.T) { // Test that the same sender always maps to the same shard account, err := NewAccount() require.NoError(t, err) - + scenario := &TxScenario{ Name: "TestScenario", Nonce: 0, Sender: account, Receiver: common.Address{}, } - + tx := types.NewTx(&types.DynamicFeeTx{ ChainID: big.NewInt(1329), // Sei testnet chain ID Nonce: scenario.Nonce, @@ -390,14 +390,14 @@ func TestLoadTxShardIDConsistency(t *testing.T) { Data: nil, }) loadTx := CreateTxFromEthTx(tx, scenario) - + const numShards = 8 expectedShardID := loadTx.ShardID(numShards) - + // Test multiple times with the same sender for i := 0; i < 10; i++ { shardID := loadTx.ShardID(numShards) - assert.Equal(t, expectedShardID, shardID, + assert.Equal(t, expectedShardID, shardID, "Shard ID should be consistent for the same sender (iteration %d)", i) } } @@ -405,16 +405,16 @@ func TestLoadTxShardIDConsistency(t *testing.T) { func TestTxScenario(t *testing.T) { account, err := NewAccount() require.NoError(t, err) - + receiver := common.HexToAddress("0xabcdefabcdefabcdefabcdefabcdefabcdefabcd") - + scenario := &TxScenario{ Name: "TestScenario", Nonce: 123, Sender: account, Receiver: receiver, } - + // Verify all fields are set correctly assert.Equal(t, "TestScenario", scenario.Name) assert.Equal(t, uint64(123), scenario.Nonce) @@ -425,10 +425,10 @@ func TestTxScenario(t *testing.T) { func TestJSONRPCPayloadFormat(t *testing.T) { // Test the internal JSON-RPC payload generation testData := []byte{0x01, 0x02, 0x03, 0x04} - + payload, err := toJSONRequestBytes(testData) require.NoError(t, err) - + expectedContent := `{"jsonrpc":"2.0","method":"eth_sendRawTransaction","params":["0x01020304"],"id":0}` // Numeric ID, not string assert.JSONEq(t, expectedContent, string(payload)) } @@ -450,7 +450,7 @@ func BenchmarkAccountPoolNextAccount(b *testing.B) { NewAccountRate: 0.0, } pool := NewAccountPool(config) - + b.ResetTimer() for i := 0; i < b.N; i++ { pool.NextAccount() @@ -462,7 +462,7 @@ func BenchmarkNonceIncrement(b *testing.B) { if err != nil { b.Fatal(err) } - + b.ResetTimer() for i := 0; i < b.N; i++ { account.GetAndIncrementNonce() @@ -474,14 +474,14 @@ func BenchmarkCreateTxFromEthTx(b *testing.B) { if err != nil { b.Fatal(err) } - + scenario := &TxScenario{ Name: "BenchmarkScenario", Nonce: 0, Sender: account, Receiver: common.Address{}, } - + tx := types.NewTx(&types.DynamicFeeTx{ ChainID: big.NewInt(1329), // Sei testnet chain ID Nonce: scenario.Nonce, @@ -492,7 +492,7 @@ func BenchmarkCreateTxFromEthTx(b *testing.B) { Value: big.NewInt(0), // 0 ETH Data: nil, }) - + b.ResetTimer() for i := 0; i < b.N; i++ { CreateTxFromEthTx(tx, scenario) From 3c29ad60b54cf849ac1102eeaadbee59456740f4 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Mon, 4 Aug 2025 12:58:34 +0200 Subject: [PATCH 6/7] missing files --- utils/channels.go | 74 +++++++++++ utils/mutex.go | 234 +++++++++++++++++++++++++++++++++ utils/option.go | 73 ++++++++++ utils/proto.go | 143 ++++++++++++++++++++ utils/semaphore.go | 24 ++++ utils/service/parallel.go | 41 ++++++ utils/service/parallel_test.go | 54 ++++++++ utils/service/start.go | 143 ++++++++++++++++++++ utils/testonly.go | 152 +++++++++++++++++++++ utils/wait.go | 119 +++++++++++++++++ utils/wait_test.go | 23 ++++ 11 files changed, 1080 insertions(+) create mode 100644 utils/channels.go create mode 100644 utils/mutex.go create mode 100644 utils/option.go create mode 100644 utils/proto.go create mode 100644 utils/semaphore.go create mode 100644 utils/service/parallel.go create mode 100644 utils/service/parallel_test.go create mode 100644 utils/service/start.go create mode 100644 utils/testonly.go create mode 100644 utils/wait.go create mode 100644 utils/wait_test.go diff --git a/utils/channels.go b/utils/channels.go new file mode 100644 index 0000000..9eed500 --- /dev/null +++ b/utils/channels.go @@ -0,0 +1,74 @@ +package utils + +import ( + "context" + + "github.com/pkg/errors" +) + +// Recv receives a value from a channel or returns an error if the context is canceled. +func Recv[T any](ctx context.Context, ch <-chan T) (zero T, err error) { + select { + case v, ok := <-ch: + if ok { + return v, nil + } + // We are not interested in channel closing, + // patiently wait for the context to be done instead. + <-ctx.Done() + return zero, ctx.Err() + case <-ctx.Done(): + return zero, ctx.Err() + } +} + +// RecvOrClosed receives a value from a channel, returns false if channel got closed, +// or returns an error if the context is canceled. +func RecvOrClosed[T any](ctx context.Context, ch <-chan T) (T, bool, error) { + select { + case v, ok := <-ch: + return v, ok, nil + case <-ctx.Done(): + var zero T + return zero, false, ctx.Err() + } +} + +// Send a value to channel or returns an error if the context is canceled. +func Send[T any](ctx context.Context, ch chan<- T, v T) error { + select { + case ch <- v: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// SendOrDrop send a value to channel if not full or drop the item if the channel is full. +func SendOrDrop[T any](ch chan<- T, v T) error { + select { + case ch <- v: + return nil + default: + // drop the item + return nil + } +} + +// ForEach is a helper function that reads from a channel and calls a handler for each item. +// this avoids needing a lot of for/select boilerplate everywhere. +func ForEach[T any](ctx context.Context, ch <-chan T, handler func(T) error) error { + for { + select { + case <-ctx.Done(): + return errors.WithStack(ctx.Err()) + case item, ok := <-ch: + if !ok { + return nil // Channel closed + } + if err := handler(item); err != nil { + return err // Stop on error + } + } + } +} diff --git a/utils/mutex.go b/utils/mutex.go new file mode 100644 index 0000000..88378b4 --- /dev/null +++ b/utils/mutex.go @@ -0,0 +1,234 @@ +package utils + +import ( + "context" + "iter" + "sync" + "sync/atomic" + + "golang.org/x/sync/errgroup" +) + +// Mutex guards access to object of type T. +type Mutex[T any] struct { + mu sync.Mutex + value T +} + +// NewMutex creates a new Mutex with given object. +func NewMutex[T any](value T) (m Mutex[T]) { + m.value = value + // nolint:nakedret + return +} + +// Lock returns an iterator which locks the mutex and yields the guarded object. +// The mutex is unlocked when the iterator is done. +// If the mutex is nil, the iterator is a no-op. +func (m *Mutex[T]) Lock() iter.Seq[T] { + return func(yield func(val T) bool) { + m.mu.Lock() + defer m.mu.Unlock() + _ = yield(m.value) + } +} + +// version of the value stored in an atomic watch. +type version[T any] struct { + updated chan struct{} + value T +} + +// newVersion constructs a new active version. +func newVersion[T any](value T) *version[T] { + return &version[T]{make(chan struct{}), value} +} + +type atomicWatch[T any] struct { + ptr atomic.Pointer[version[T]] +} + +type AtomicSend[T any] struct { + atomicWatch[T] +} + +// Store updates the value of the atomic watch. +func (w *AtomicSend[T]) Send(value T) { + close(w.ptr.Swap(newVersion(value)).updated) +} + +// Update conditionally updates the value of the atomic watch. +func (w *AtomicSend[T]) Update(f func(T) (T, bool)) { + old := w.ptr.Load() + if value, ok := f(old.value); ok { + w.ptr.Store(newVersion(value)) + close(old.updated) + } +} + +func NewAtomicSend[T any](value T) (w AtomicSend[T]) { + w.atomicWatch.ptr.Store(newVersion(value)) + // nolint:nakedret + return +} + +func (w *AtomicSend[T]) Subscribe() AtomicRecv[T] { + return AtomicRecv[T]{&w.atomicWatch} +} + +// AtomicWatch stores a pointer to an IMMUTABLE value. +// Loading and waiting for updates do NOT require locking. +// TODO(gprusak): remove mutex and rename to AtomicSend, +// this will allow for sharing a mutex across multiple AtomicSenders. +type AtomicWatch[T any] struct { + atomicWatch[T] + mu sync.Mutex +} + +// AtomicRecv is a read-only reference to AtomicWatch. +type AtomicRecv[T any] struct{ *atomicWatch[T] } + +// NewAtomicWatch creates a new AtomicWatch with the given initial value. +func NewAtomicWatch[T any](value T) (w AtomicWatch[T]) { + w.ptr.Store(newVersion(value)) + // nolint:nakedret + return +} + +// Subscribe returns a view-only API of the atomic watch. +func (w *AtomicWatch[T]) Subscribe() AtomicRecv[T] { + return AtomicRecv[T]{&w.atomicWatch} +} + +// Load returns the current value of the atomic watch. +// Does not do any locking. +func (w *atomicWatch[T]) Load() T { return w.ptr.Load().value } + +// Store updates the value of the atomic watch. +func (w *AtomicWatch[T]) Store(value T) { + w.mu.Lock() + defer w.mu.Unlock() + close(w.ptr.Swap(newVersion(value)).updated) +} + +// Update conditionally updates the value of the atomic watch. +func (w *AtomicWatch[T]) Update(f func(T) (T, bool)) { + w.mu.Lock() + defer w.mu.Unlock() + old := w.ptr.Load() + if value, ok := f(old.value); ok { + w.ptr.Store(newVersion(value)) + close(old.updated) + } +} + +// Wait waits for the value of the atomic watch to satisfy the predicate. +// Does not do any locking. +func (w *atomicWatch[T]) Wait(ctx context.Context, pred func(T) bool) (T, error) { + for { + v := w.ptr.Load() + if pred(v.value) { + return v.value, nil + } + select { + case <-ctx.Done(): + return Zero[T](), ctx.Err() + case <-v.updated: + } + } +} + +// Iter executes sequentially the function f on each value of the atomic watch. +// Context passed to f is canceled when the next value is available. +// Exits when the returned error is different from nil and context.Canceled, +// or when the context passed to Iter is canceled (after f exits). +func (w *atomicWatch[T]) Iter(ctx context.Context, f func(ctx context.Context, v T) error) error { + for ctx.Err() == nil { + v := w.ptr.Load() + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { return f(ctx, v.value) }) + g.Go(func() error { + select { + case <-ctx.Done(): + case <-v.updated: + } + return context.Canceled + }) + if err := IgnoreCancel(g.Wait()); err != nil { + return err + } + } + return ctx.Err() +} + +// WatchCtrl controls the locked object in a Watch. +// It is provided only in the iterator returned by Lock(). +// Should NOT be stored anywhere. +type WatchCtrl struct { + mu sync.Mutex + updated chan struct{} +} + +// Watch stores a value of type T. +// Essentially a mutex, that can be awaited for updates. +type Watch[T any] struct { + ctrl WatchCtrl + val T +} + +// NewWatch constructs a new watch with the given value. +// Note that value in the watch cannot be changed, so T +// should be a pointer type if updates are required. +func NewWatch[T any](val T) Watch[T] { + return Watch[T]{ + WatchCtrl{updated: make(chan struct{})}, + val, + } +} + +// Wait waits for the value in the watch to be updated. +// Should be called only after locking the watch, i.e. within Lock() iterator. +// It unlocks -> waits for the update -> locks again. +func (c *WatchCtrl) Wait(ctx context.Context) error { + updated := c.updated + c.mu.Unlock() + defer c.mu.Lock() + select { + case <-ctx.Done(): + return ctx.Err() + case <-updated: + return nil + } +} + +// WaitUntil waits for the value in the watch to satisfy the predicate. +// Should be called only after locking the watch, i.e. within Lock() iterator. +// The predicate is evaluated under the lock, so it can access the guarded object. +func (c *WatchCtrl) WaitUntil(ctx context.Context, pred func() bool) error { + for !pred() { + if err := c.Wait(ctx); err != nil { + return err + } + } + return nil +} + +// Updated signals waiters that the value in the watch has been updated. +func (c *WatchCtrl) Updated() { + close(c.updated) + c.updated = make(chan struct{}) +} + +// Lock returns an iterator which locks the watch and yields the guarded object. +// The watch is unlocked when the iterator is done. +// If the watch is nil, the iterator is a no-op. +// Additionally the WatchCtrl object is provided to the yield function: +// * to unlock -> wait for the update -> lock again, call ctrl.Wait(ctx) +// * to signal an update, call ctrl.Updated(). +func (w *Watch[T]) Lock() iter.Seq2[T, *WatchCtrl] { + return func(yield func(val T, ctrl *WatchCtrl) bool) { + w.ctrl.mu.Lock() + defer w.ctrl.mu.Unlock() + _ = yield(w.val, &w.ctrl) + } +} diff --git a/utils/option.go b/utils/option.go new file mode 100644 index 0000000..85fd6a4 --- /dev/null +++ b/utils/option.go @@ -0,0 +1,73 @@ +package utils + +import ( + "encoding/json" +) + +// Option type inspired https://pkg.go.dev/github.com/samber/mo. +type Option[T any] struct { + ReadOnly + isPresent bool + value T +} + +// Some creates an Option with a value. +func Some[T any](value T) Option[T] { + return Option[T]{isPresent: true, value: value} +} + +// None creates an Option without a value. +func None[T any]() (zero Option[T]) { return } + +// Get unpacks the value from the Option, returning true if it was present. +func (o Option[T]) Get() (T, bool) { + if o.isPresent { + return o.value, true + } + return Zero[T](), false +} + +// IsPresent checks if the Option contains a value. +func (o Option[T]) IsPresent() bool { + return o.isPresent +} + +// Or returns the value if present, otherwise returns the default value. +func (o *Option[T]) Or(def T) T { + if o.isPresent { + return o.value + } + return def +} + +// MapOpt applies a function to the value if present, returning a new Option. +func MapOpt[T, R any](o Option[T], f func(T) R) Option[R] { + if o.isPresent { + return Some(f(o.value)) + } + return None[R]() +} + +// MarshalJSON implements the json.Marshaler interface. +// Note that it is defined on value, not pointer, because +// json.Marshal cannot call pointer methods on fields +// (i.e. it is broken by design). +func (o Option[T]) MarshalJSON() ([]byte, error) { + if o.isPresent { + return json.Marshal(o.value) + } + return []byte("null"), nil +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +func (o *Option[T]) UnmarshalJSON(data []byte) error { + if string(data) == "null" { + o.isPresent = false + return nil + } + if err := json.Unmarshal(data, &o.value); err != nil { + return err + } + o.isPresent = true + return nil +} diff --git a/utils/proto.go b/utils/proto.go new file mode 100644 index 0000000..5f5ad7a --- /dev/null +++ b/utils/proto.go @@ -0,0 +1,143 @@ +package utils + +import ( + "crypto/sha256" + "errors" + "fmt" + "sync" + + "google.golang.org/protobuf/proto" +) + +// Hash is a SHA-256 hash. +type Hash [sha256.Size]byte + +// GetHash computes a hash of the given data. +func GetHash(data []byte) Hash { + return sha256.Sum256(data) +} + +// ParseHash parses a Hash from bytes. +func ParseHash(raw []byte) (Hash, error) { + if got, want := len(raw), sha256.Size; got != want { + return Hash{}, fmt.Errorf("hash size = %v, want %v", got, want) + } + return Hash(raw), nil +} + +// ProtoClone clones a proto.Message object. +func ProtoClone[T proto.Message](item T) T { + return proto.Clone(item).(T) +} + +// ProtoEqual compares two proto.Message objects. +func ProtoEqual[T proto.Message](a, b T) bool { + return proto.Equal(a, b) +} + +// ProtoHash hashes a proto.Message object. +// TODO(gprusak): make it deterministic. +func ProtoHash(a proto.Message) Hash { + raw, err := proto.Marshal(a) + if err != nil { + panic(err) + } + return sha256.Sum256(raw) +} + +// ProtoMessage is comparable proto.Message. +type ProtoMessage interface { + comparable + proto.Message +} + +// ProtoConv is a pair of functions to encode and decode between a type and a ProtoMessage. +type ProtoConv[T any, P ProtoMessage] struct { + Encode func(T) P + Decode func(P) (T, error) +} + +// EncodeSlice encodes a slice of T into a slice of P. +func (c ProtoConv[T, P]) EncodeSlice(t []T) []P { + p := make([]P, len(t)) + for i := range t { + p[i] = c.Encode(t[i]) + } + return p +} + +// DecodeSlice decodes a slice of P into a slice of T. +func (c ProtoConv[T, P]) DecodeSlice(p []P) ([]T, error) { + t := make([]T, len(p)) + var err error + for i := range p { + if t[i], err = c.Decode(p[i]); err != nil { + return nil, fmt.Errorf("[%d]: %w", i, err) + } + } + return t, nil +} + +// Slice constructs a slice. +// It is a syntax sugar for `[]T{v...}`, which avoids +// spelling out T. Not very useful if you need to spell +// out T to construct the elements: in that case +// you might prefer the []T{{...},{...}} syntax instead. +func Slice[T any](v ...T) []T { return v } + +// Alloc moves value to heap. +func Alloc[T any](v T) *T { return &v } + +// Zero returns a zero value of type T. +func Zero[T any]() (zero T) { return } + +// NoCopy may be added to structs which must not be copied +// after the first use. +// +// See https://golang.org/issues/8005#issuecomment-190753527 +// for details. +// +// Note that it must not be embedded, otherwise Lock and Unlock methods +// will be exported. +type NoCopy struct{} + +// Lock implements sync.Locker. +func (*NoCopy) Lock() {} + +// Unlock implements sync.Locker. +func (*NoCopy) Unlock() {} + +var _ sync.Locker = (*NoCopy)(nil) + +// NoCompare may be added to structs which must not be used as +// map keys. +type NoCompare [0]func() + +// EncodeOpt encodes Option[T], mapping None to Zero[P](). +func (c ProtoConv[T, P]) EncodeOpt(mv Option[T]) P { + v, ok := mv.Get() + if !ok { + return Zero[P]() + } + return c.Encode(v) +} + +// DecodeReq decodes a ProtoMessage into a T, returning an error if p is nil. +func (c ProtoConv[T, P]) DecodeReq(p P) (T, error) { + if p == Zero[P]() { + return Zero[T](), errors.New("missing") + } + return c.Decode(p) +} + +// DecodeOpt decodes a ProtoMessage into a T, returning nil if p is nil. +func (c ProtoConv[T, P]) DecodeOpt(p P) (Option[T], error) { + if p == Zero[P]() { + return None[T](), nil + } + t, err := c.DecodeReq(p) + if err != nil { + return None[T](), err + } + return Some(t), nil +} diff --git a/utils/semaphore.go b/utils/semaphore.go new file mode 100644 index 0000000..728c12a --- /dev/null +++ b/utils/semaphore.go @@ -0,0 +1,24 @@ +package utils + +import ( + "context" +) + +// Semaphore provides a way to bound concurrenct access to a resource. +type Semaphore struct { + ch chan struct{} +} + +// NewSemaphore constructs a new semaphore with n permits. +func NewSemaphore(n int) *Semaphore { + return &Semaphore{ch: make(chan struct{}, n)} +} + +// Acquire acquires a permit from the semaphore. +// Blocks until a permit is available. +func (s *Semaphore) Acquire(ctx context.Context) (relase func(), err error) { + if err := Send(ctx, s.ch, struct{}{}); err != nil { + return nil, err + } + return func() { <-s.ch }, nil +} diff --git a/utils/service/parallel.go b/utils/service/parallel.go new file mode 100644 index 0000000..f2bcea9 --- /dev/null +++ b/utils/service/parallel.go @@ -0,0 +1,41 @@ +package service + +import ( + "sync" + "sync/atomic" +) + +type parallelScope struct { + wg sync.WaitGroup + err atomic.Pointer[error] +} + +// ParallelScope is a scope which doesn't require cancellation token, +// just parallelization. +type ParallelScope struct{ *parallelScope } + +// Spawn spawns a new task in the scope. +func (s *parallelScope) Spawn(t func() error) { + s.wg.Add(1) + go func() { + if err := t(); err != nil { + s.err.CompareAndSwap(nil, &err) + } + s.wg.Done() + }() +} + +// Parallel executes a function in parallel scope. +// Compared to Run, it does not allow for early cancellation, +// therefore is suitable for non-blocking computations. +// Returns the first error returned by any of the spawned tasks. +// Waits until all the tasks complete, before returning. +func Parallel(main func(ParallelScope) error) error { + var s parallelScope + s.Spawn(func() error { return main(ParallelScope{&s}) }) + s.wg.Wait() + if perr := s.err.Load(); perr != nil { + return *perr + } + return nil +} diff --git a/utils/service/parallel_test.go b/utils/service/parallel_test.go new file mode 100644 index 0000000..2079e29 --- /dev/null +++ b/utils/service/parallel_test.go @@ -0,0 +1,54 @@ +package service + +import ( + "errors" + "testing" +) + +func TestParallelOk(t *testing.T) { + x := [10]int{} + if err := Parallel(func(s ParallelScope) error { + for i := range x { + s.Spawn(func() error { + x[i] = i + return nil + }) + } + return nil + }); err != nil { + t.Fatal(err) + } + for want, got := range x { + if want != got { + t.Fatalf("x[%d] = %d, want %d", want, got, want) + } + } +} + +func TestParallelFail(t *testing.T) { + var wantErr = errors.New("custom err") + x := [10]int{} + err := Parallel(func(s ParallelScope) error { + for i := range x { + s.Spawn(func() error { + if i%2 == 0 { + return wantErr + } + x[i] = i + return nil + }) + } + return nil + }) + if !errors.Is(err, wantErr) { + t.Fatalf("err = %v, want %v", err, wantErr) + } + for want, got := range x { + if want%2 == 0 { + want = 0 + } + if want != got { + t.Fatalf("x[%d] = %d, want %d", want, got, want) + } + } +} diff --git a/utils/service/start.go b/utils/service/start.go new file mode 100644 index 0000000..a077f95 --- /dev/null +++ b/utils/service/start.go @@ -0,0 +1,143 @@ +package service + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "golang.org/x/sync/errgroup" + + "github.com/sei-protocol/sei-load/utils" +) + +// Scope of concurrenct tasks. +type Scope struct { + // scope is a concurrecy primitive, so no-ctx-in-struct rule does not apply + // nolint:containedctx + ctx context.Context + all *errgroup.Group + main *sync.WaitGroup +} + +// Spawn spawns a main task. +// Scope gets automatically canceled when all the main tasks return. +func (s Scope) Spawn(t func() error) { + s.main.Add(1) + s.all.Go(func() error { + defer s.main.Done() + return t() + }) +} + +// JoinHandle is a handle to an awaitable task. +type JoinHandle[R any] struct { + result utils.AtomicRecv[*R] +} + +// Spawn1 is the same as Scope.Spawn, but allows awaiting completion of a task and getting its result. +func Spawn1[R any](s Scope, t func() (R, error)) JoinHandle[R] { + send := utils.NewAtomicSend[*R](nil) + s.Spawn(func() error { + v, err := t() + if err != nil { + return err + } + send.Send(&v) + return nil + }) + return JoinHandle[R]{send.Subscribe()} +} + +// Join awaits completion of a task and returns its result. +// WARNING: it does NOT return the error of the task - error is returned from the Run() command. +// Join() can only fail when context is canceled. +func (h JoinHandle[R]) Join(ctx context.Context) (R, error) { + res, err := h.result.Wait(ctx, func(v *R) bool { return v != nil }) + if err != nil { + return utils.Zero[R](), err + } + return *res, nil +} + +// If true, tasks that do not respect context cancellation will be logged. +// This is useful for debugging, but causes unnecessary overhead. +// Since this is a constant, debug guard should be optimized out by the compiler. +const enableDebugGuard = false + +func (s Scope) debugGuard(name string, done chan struct{}) { + select { + case <-done: + return + case <-s.ctx.Done(): + } + for { + select { + case <-done: + return + case <-time.After(10 * time.Second): + } + log.Printf("task %q still running", name) + } +} + +// SpawnNamed spawns a named main task. +func (s Scope) SpawnNamed(name string, t func() error) { + done := make(chan struct{}) + s.Spawn(func() error { + defer close(done) + if err := t(); err != nil { + return fmt.Errorf("%s: %w", name, err) + } + return nil + }) + if enableDebugGuard { + go s.debugGuard(name, done) + } +} + +// SpawnBgNamed spawns a named background task. +func (s Scope) SpawnBgNamed(name string, t func() error) { + done := make(chan struct{}) + s.SpawnBg(func() error { + defer close(done) + if err := t(); err != nil { + return fmt.Errorf("%s: %w", name, err) + } + return nil + }) + if enableDebugGuard { + go s.debugGuard(name, done) + } +} + +// SpawnBg spawns a background task. +// Background tasks get canceled when all the main tasks return. +func (s Scope) SpawnBg(t func() error) { s.all.Go(t) } + +// Run runs a scope capable of spawning tasks. +// It is guaranteed that all the spawned tasks will be executed (even if spawned after the context is cancelled), +// and that `Run` will return only after all the tasks have completed. +// Context of the tasks will be automatically cancelled as soon as ANY task returns an error. +// Returns the first error returned by any task (main or background). +func Run(ctx context.Context, main func(context.Context, Scope) error) error { + ctx, cancel := context.WithCancel(ctx) + all, ctx := errgroup.WithContext(ctx) + s := Scope{ctx, all, &sync.WaitGroup{}} + s.Spawn(func() error { return main(ctx, s) }) + s.main.Wait() + cancel() + return s.all.Wait() +} + +// Run1 is the same as Run, but returns the result of the main task. +func Run1[R any](ctx context.Context, main func(context.Context, Scope) (R, error)) (res R, err error) { + err = Run(ctx, func(ctx context.Context, s Scope) error { + var err error + res, err = main(ctx, s) + return err + }) + //nolint:nakedret + return +} diff --git a/utils/testonly.go b/utils/testonly.go new file mode 100644 index 0000000..7c0ddf0 --- /dev/null +++ b/utils/testonly.go @@ -0,0 +1,152 @@ +package utils + +import ( + "fmt" + "math/big" + "math/rand" + "reflect" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/testing/protocmp" +) + +// ReadOnly - if a struct embeds ReadOnly, +// its private fields will be compared by TestEqual. +type ReadOnly struct{} + +// isReadOnly returns true if t embeds ReadOnly. +func isReadOnly(t reflect.Type) bool { + want := reflect.TypeOf(ReadOnly{}) + if t.Kind() != reflect.Struct { + return false + } + for i := range t.NumField() { + if f := t.Field(i); f.Anonymous || f.Type == want { + return true + } + } + return false +} + +func cmpComparer[T any, PT interface { + Cmp(b *T) int + *T +}](a PT, b PT) bool { + if a == nil || b == nil { + return a == b + } + return a.Cmp(b) == 0 +} + +var cmpOpts = []cmp.Option{ + protocmp.Transform(), + cmp.Exporter(isReadOnly), + cmpopts.EquateEmpty(), + cmp.Comparer(cmpComparer[big.Int]), +} + +// TestDiff generates a human-readable diff between two objects. +func TestDiff[T any](want, got T) error { + if diff := cmp.Diff(want, got, cmpOpts...); diff != "" { + return fmt.Errorf("want (-) got (+):\n%s", diff) + } + return nil +} + +// TestEqual is a more robust replacement for reflect.DeepEqual for tests. +func TestEqual[T any](a, b T) bool { + return cmp.Equal(a, b, cmpOpts...) +} + +// TestRngSplit returns a new random number splitted from the given one. +// This is a very primitive splitting, known to result with dependent randomness. +// If that ever causes a problem, we can switch to SplitMix. +func TestRngSplit(rng *rand.Rand) *rand.Rand { + return rand.New(rand.NewSource(rng.Int63())) +} + +// TestRng returns a deterministic random number generator. +func TestRng() *rand.Rand { + return rand.New(rand.NewSource(789345342)) +} + +var alphanum = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") + +// GenString generates a random string of length n. +func GenString(rng *rand.Rand, n int) string { + s := make([]rune, n) + for i := range n { + s[i] = alphanum[rand.Intn(len(alphanum))] + } + return string(s) +} + +// GenBytes generates a random byte slice. +func GenBytes(rng *rand.Rand, n int) []byte { + s := make([]byte, n) + _, _ = rng.Read(s) + return s +} + +// GenF is a function which generates T. +type GenF[T any] = func(rng *rand.Rand) T + +// GenSlice generates a slice of small random length. +func GenSlice[T any](rng *rand.Rand, gen GenF[T]) []T { + return GenSliceN(rng, 2+rng.Intn(3), gen) +} + +// GenSliceN generates a slice of n elements. +func GenSliceN[T any](rng *rand.Rand, n int, gen GenF[T]) []T { + s := make([]T, n) + for i := range s { + s[i] = gen(rng) + } + return s +} + +// GenMap generates a map of small random length. +func GenMap[K comparable, V any](rng *rand.Rand, genK GenF[K], genV GenF[V]) map[K]V { + return GenMapN(rng, 2+rng.Intn(3), genK, genV) +} + +// GenMapN generates a map of n elements. +func GenMapN[K comparable, V any](rng *rand.Rand, n int, genK GenF[K], genV GenF[V]) map[K]V { + m := make(map[K]V, n) + for len(m) < n { + m[genK(rng)] = genV(rng) + } + return m +} + +// GenTimestamp generates a random timestamp. +func GenTimestamp(rng *rand.Rand) time.Time { + return time.Unix(0, rng.Int63()) +} + +// GenHash generates a random Hash. +func GenHash(rng *rand.Rand) Hash { + var h Hash + _, _ = rng.Read(h[:]) + return h +} + +// Test tests whether reencoding a value is an identity operation. +func (c ProtoConv[T, P]) Test(want T) error { + p := c.Encode(want) + raw, err := proto.Marshal(p) + if err != nil { + return fmt.Errorf("Marshal(): %w", err) + } + if err := proto.Unmarshal(raw, p); err != nil { + return fmt.Errorf("Unmarshal(): %w", err) + } + got, err := c.Decode(p) + if err != nil { + return fmt.Errorf("Decode(Encode()): %w", err) + } + return TestDiff(want, got) +} diff --git a/utils/wait.go b/utils/wait.go new file mode 100644 index 0000000..4c8c663 --- /dev/null +++ b/utils/wait.go @@ -0,0 +1,119 @@ +package utils + +import ( + "context" + "encoding" + "errors" + "time" +) + +// IgnoreCancel returns nil if the error is context.Canceled, err otherwise. +func IgnoreCancel(err error) error { + if errors.Is(err, context.Canceled) { + return nil + } + return err +} + +// WithTimeout executes a function with a timeout. +func WithTimeout(ctx context.Context, d time.Duration, f func(ctx context.Context) error) error { + ctx, cancel := context.WithTimeout(ctx, d) + defer cancel() + return f(ctx) +} + +// WithTimeout1 executes a function with a timeout. +func WithTimeout1[R any](ctx context.Context, d time.Duration, f func(ctx context.Context) (R, error)) (R, error) { + ctx, cancel := context.WithTimeout(ctx, d) + defer cancel() + return f(ctx) +} + +// Sleep sleeps for a duration or until the context is canceled. +func Sleep(ctx context.Context, d time.Duration) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(d): + return nil + } +} + +// SleepUntil sleeps until deadline t or until the context is canceled. +func SleepUntil(ctx context.Context, t time.Time) error { + return Sleep(ctx, time.Until(t)) +} + +// WaitFor polls a check function until it returns true or the context is canceled. +func WaitFor(ctx context.Context, interval time.Duration, check func() bool) error { + if check() { + return nil + } + ticker := time.NewTicker(interval) + for { + if check() { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } +} + +// WaitForWithTimeout polls a check function until it returns true, the context is canceled, or the timeout is reached. +func WaitForWithTimeout(ctx context.Context, interval, timeout time.Duration, check func() bool) error { + if check() { + return nil + } + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + if check() { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } +} + +// Duration is a wrapper type around time.Duration that supports JSON marshaling/unmarshaling. +// nolint:recvcheck +type Duration time.Duration + +// MarshalText implements json.TextMarshaler interface to convert Duration to JSON string. +func (d Duration) MarshalText() ([]byte, error) { + return []byte(time.Duration(d).String()), nil +} + +// UnmarshalText implements json.TextUnmarshaler. +func (d *Duration) UnmarshalText(b []byte) error { + tmp, err := time.ParseDuration(string(b)) + if err != nil { + return err + } + *d = Duration(tmp) + return nil +} + +var _ encoding.TextMarshaler = Zero[Duration]() +var _ encoding.TextUnmarshaler = (*Duration)(nil) + +// Duration returns the underlying time.Duration value. +func (d Duration) Duration() time.Duration { + return time.Duration(d) +} + +// Seconds returns the underlying time.Duration value in seconds. +func (d Duration) Seconds() float64 { + return time.Duration(d).Seconds() +} diff --git a/utils/wait_test.go b/utils/wait_test.go new file mode 100644 index 0000000..91edc12 --- /dev/null +++ b/utils/wait_test.go @@ -0,0 +1,23 @@ +package utils + +import ( + "encoding/json" + "testing" + "time" +) + +func TestJSON(t *testing.T) { + var got, want struct{ X Duration } + want.X = Duration(100 * time.Millisecond) + j, err := json.Marshal(want) + if err != nil { + t.Fatal(err) + } + t.Logf("%s", j) + if err := json.Unmarshal(j, &got); err != nil { + t.Fatal(err) + } + if err := TestDiff(want, got); err != nil { + t.Fatal(err) + } +} From 75f22ab009138f01449c3a7f8b1cb43734fcc054 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Mon, 4 Aug 2025 13:11:40 +0200 Subject: [PATCH 7/7] lint --- sender/worker.go | 1 + stats/logger.go | 5 ----- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/sender/worker.go b/sender/worker.go index 970989e..6e06e36 100644 --- a/sender/worker.go +++ b/sender/worker.go @@ -193,6 +193,7 @@ func (w *Worker) sendTransaction(ctx context.Context, client *http.Client, tx *t // Limit read to prevent memory issues with large responses _, err = io.CopyN(io.Discard, resp.Body, 64*1024) // Read up to 64KB if err != nil && err != io.EOF { + log.Printf("Worker %d: Failed to read response body: %v", w.id, err) // Log but don't fail - this is just for connection reuse } diff --git a/stats/logger.go b/stats/logger.go index c512d87..5582c91 100644 --- a/stats/logger.go +++ b/stats/logger.go @@ -4,7 +4,6 @@ import ( "context" "github.com/sei-protocol/sei-load/utils" "log" - "sync" "time" ) @@ -13,10 +12,6 @@ type Logger struct { collector *Collector interval time.Duration debug bool - - // Dry-run transaction logging - txCounter uint64 - txCounterMu sync.Mutex } // NewLogger creates a new statistics logger