diff --git a/CHANGELOG.md b/CHANGELOG.md index 22654d4..0b48148 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## v0.7.0 +* [\#39](https://github.com/interchainio/tm-load-test/issues/39) - Add basic + aggregate statistics output to CSV file. +* Added integration test for standalone execution happy path. + ## v0.6.2 * [\#37](https://github.com/interchainio/tm-load-test/pull/37) - Fix average transaction throughput rate in reporting/metrics. diff --git a/README.md b/README.md index cc0a9b1..514be2f 100644 --- a/README.md +++ b/README.md @@ -147,6 +147,37 @@ The following kinds of metrics are made available here: * Standard Prometheus-provided metrics about the garbage collector in `tm-load-test` +## Aggregate Statistics +As of `tm-load-test` v0.7.0, one can now write simple aggregate statistics to +a CSV file once testing completes by specifying the `--stats-output` flag: + +```bash +# In standalone mode +tm-load-test -c 1 -T 10 -r 1000 -s 250 \ + --broadcast-tx-method async \ + --endpoints ws://tm-endpoint1.somewhere.com:26657/websocket,ws://tm-endpoint2.somewhere.com:26657/websocket \ + --stats-output /path/to/save/stats.csv + +# From the master in master/slave mode +tm-load-test \ + master \ + --expect-slaves 2 \ + --bind localhost:26670 \ + -c 1 -T 10 -r 1000 -s 250 \ + --broadcast-tx-method async \ + --endpoints ws://tm-endpoint1.somewhere.com:26657/websocket,ws://tm-endpoint2.somewhere.com:26657/websocket \ + --stats-output /path/to/save/stats.csv +``` + +The output CSV file has the following format at present: + +```csv +Parameter,Value,Units +total_time,10.002,seconds +total_txs,9000,count +avg_tx_rate,899.818398,transactions per second +``` + ## Development To run the linter and the tests: diff --git a/pkg/loadtest/cli.go b/pkg/loadtest/cli.go index 0a2e04d..a9541dc 100644 --- a/pkg/loadtest/cli.go +++ b/pkg/loadtest/cli.go @@ -12,7 +12,7 @@ import ( ) // CLIVersion must be manually updated as new versions are released. -const CLIVersion = "v0.6.2" +const CLIVersion = "v0.7.0" // cliVersionCommitID must be set through linker settings. See // https://stackoverflow.com/a/11355611/1156132 for details. @@ -44,7 +44,7 @@ func buildCLI(cli *CLIConfig, logger logging.Logger) *cobra.Command { os.Exit(1) } - if err := executeLoadTest(cfg); err != nil { + if err := ExecuteStandalone(cfg); err != nil { os.Exit(1) } }, @@ -63,6 +63,7 @@ func buildCLI(cli *CLIConfig, logger logging.Logger) *cobra.Command { rootCmd.PersistentFlags().IntVar(&cfg.MaxEndpoints, "max-endpoints", 0, "The maximum number of endpoints to use for testing, where 0 means unlimited") rootCmd.PersistentFlags().IntVar(&cfg.PeerConnectTimeout, "peer-connect-timeout", 600, "The number of seconds to wait for all required peers to connect if expect-peers > 0") rootCmd.PersistentFlags().IntVar(&cfg.MinConnectivity, "min-peer-connectivity", 0, "The minimum number of peers to which each peer must be connected before starting the load test") + rootCmd.PersistentFlags().StringVar(&cfg.StatsOutputFile, "stats-output", "", "Where to store aggregate statistics (in CSV format) for the load test") rootCmd.PersistentFlags().BoolVarP(&flagVerbose, "verbose", "v", false, "Increase output logging verbosity to DEBUG level") var masterCfg MasterConfig diff --git a/pkg/loadtest/config.go b/pkg/loadtest/config.go index 7c37eb1..54162b5 100644 --- a/pkg/loadtest/config.go +++ b/pkg/loadtest/config.go @@ -34,6 +34,8 @@ type Config struct { MaxEndpoints int `json:"max_endpoints"` // The maximum number of endpoints to use for load testing. Set to 0 by default (no maximum). MinConnectivity int `json:"min_connectivity"` // The minimum number of peers to which each peer must be connected before starting the load test. Set to 0 by default (no minimum). PeerConnectTimeout int `json:"peer_connect_timeout"` // The maximum time to wait (in seconds) for all peers to connect, if ExpectPeers > 0. + StatsOutputFile string `json:"stats_output_file"` // Where to store the final aggregate statistics file (in CSV format). + NoTrapInterrupts bool `json:"no_trap_interrupts"` // Should we avoid trapping Ctrl+Break? Only relevant for standalone execution mode. } // MasterConfig is the configuration options specific to a master node. diff --git a/pkg/loadtest/integration_test.go b/pkg/loadtest/integration_test.go index f4995c3..f800b35 100644 --- a/pkg/loadtest/integration_test.go +++ b/pkg/loadtest/integration_test.go @@ -1,11 +1,15 @@ package loadtest_test import ( + "encoding/csv" "fmt" "io/ioutil" + "math" "net" "net/http" "net/url" + "os" + "path" "strconv" "strings" "testing" @@ -18,6 +22,21 @@ import ( const totalTxsPerSlave = 50 +type aggregateStats struct { + totalTime float64 + totalTxs int + avgTxRate float64 +} + +func (s *aggregateStats) String() string { + return fmt.Sprintf( + "aggregateStats{totalTime: %.3f, totalTxs: %d, avgTxRate: %.3f}", + s.totalTime, + s.totalTxs, + s.avgTxRate, + ) +} + func TestMasterSlaveHappyPath(t *testing.T) { app := kvstore.NewKVStoreApplication() node := rpctest.StartTendermint(app, rpctest.SuppressStdout, rpctest.RecreateConfig) @@ -28,7 +47,14 @@ func TestMasterSlaveHappyPath(t *testing.T) { t.Fatal(err) } - cfg := testConfig() + tempDir, err := ioutil.TempDir("", "tmloadtest-masterslavehappypath") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tempDir) + + expectedTotalTxs := totalTxsPerSlave * 2 + cfg := testConfig(tempDir) masterCfg := loadtest.MasterConfig{ BindAddr: fmt.Sprintf("localhost:%d", freePort), ExpectSlaves: 2, @@ -116,8 +142,8 @@ func TestMasterSlaveHappyPath(t *testing.T) { if err != nil { t.Fatal(err) } - if txCount != (totalTxsPerSlave * 2) { - t.Fatalf("Expected %d transactions to have been recorded by the master, but got %d", totalTxsPerSlave, txCount) + if txCount != expectedTotalTxs { + t.Fatalf("Expected %d transactions to have been recorded by the master, but got %d", expectedTotalTxs, txCount) } } } @@ -127,6 +153,60 @@ func TestMasterSlaveHappyPath(t *testing.T) { if !metricsTested { t.Fatal("Expected to have tested Prometheus metrics, but did not") } + + // ensure the aggregate stats were generated and computed correctly + stats, err := parseStats(cfg.StatsOutputFile) + if err != nil { + t.Fatal("Failed to parse output stats", err) + } + t.Logf("Got aggregate statistics from CSV: %v", stats) + if stats.totalTxs != expectedTotalTxs { + t.Fatalf("Expected %d transactions to have been recorded in aggregate stats, but got %d", expectedTotalTxs, stats.totalTxs) + } + if !floatsEqualWithTolerance(stats.avgTxRate, float64(stats.totalTxs)/stats.totalTime, 0.1) { + t.Fatalf( + "Average transaction rate (%.3f) does not compute from total time (%.3f) and total transactions (%d)", + stats.avgTxRate, + stats.totalTime, + stats.totalTxs, + ) + } +} + +func TestStandaloneHappyPath(t *testing.T) { + app := kvstore.NewKVStoreApplication() + node := rpctest.StartTendermint(app, rpctest.SuppressStdout, rpctest.RecreateConfig) + defer rpctest.StopTendermint(node) + + tempDir, err := ioutil.TempDir("", "tmloadtest-standalonehappypath") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tempDir) + + expectedTotalTxs := totalTxsPerSlave + cfg := testConfig(tempDir) + if err := loadtest.ExecuteStandalone(cfg); err != nil { + t.Fatal(err) + } + + // ensure the aggregate stats were generated and computed correctly + stats, err := parseStats(cfg.StatsOutputFile) + if err != nil { + t.Fatal("Failed to parse output stats", err) + } + t.Logf("Got aggregate statistics from CSV: %v", stats) + if stats.totalTxs != expectedTotalTxs { + t.Fatalf("Expected %d transactions to have been recorded in aggregate stats, but got %d", expectedTotalTxs, stats.totalTxs) + } + if !floatsEqualWithTolerance(stats.avgTxRate, float64(stats.totalTxs)/stats.totalTime, 0.1) { + t.Fatalf( + "Average transaction rate (%.3f) does not compute from total time (%.3f) and total transactions (%d)", + stats.avgTxRate, + stats.totalTime, + stats.totalTxs, + ) + } } func getRPCAddress() string { @@ -137,7 +217,7 @@ func getRPCAddress() string { return fmt.Sprintf("ws://localhost:%s/websocket", listenURL.Port()) } -func testConfig() loadtest.Config { +func testConfig(tempDir string) loadtest.Config { return loadtest.Config{ ClientFactory: "kvstore", Connections: 1, @@ -149,6 +229,8 @@ func testConfig() loadtest.Config { BroadcastTxMethod: "async", Endpoints: []string{getRPCAddress()}, EndpointSelectMethod: loadtest.SelectSuppliedEndpoints, + StatsOutputFile: path.Join(tempDir, "stats.csv"), + NoTrapInterrupts: true, } } @@ -165,3 +247,55 @@ func getFreePort() (int, error) { defer l.Close() return l.Addr().(*net.TCPAddr).Port, nil } + +func parseStats(filename string) (*aggregateStats, error) { + f, err := os.Open(filename) + if err != nil { + return nil, err + } + defer f.Close() + + reader := csv.NewReader(f) + records, err := reader.ReadAll() + if err != nil { + return nil, err + } + + if len(records) < 3 { + return nil, fmt.Errorf("expected at least 3 records in aggregate stats CSV, but got %d", len(records)) + } + stats := &aggregateStats{} + for _, record := range records { + if len(record) > 0 { + if len(record) < 3 { + return nil, fmt.Errorf("expected at least 3 columns for each non-empty row in aggregate stats CSV") + } + switch record[0] { + case "total_txs": + totalTxs, err := strconv.ParseInt(record[1], 10, 32) + if err != nil { + return nil, err + } + stats.totalTxs = int(totalTxs) + + case "total_time": + stats.totalTime, err = strconv.ParseFloat(record[1], 64) + if err != nil { + return nil, err + } + + case "avg_tx_rate": + stats.avgTxRate, err = strconv.ParseFloat(record[1], 64) + if err != nil { + return nil, err + } + } + } + } + + return stats, nil +} + +func floatsEqualWithTolerance(a, b, tolerance float64) bool { + return math.Abs(a-b) < tolerance +} diff --git a/pkg/loadtest/loadtest.go b/pkg/loadtest/loadtest.go index 1078ad9..6c70f00 100644 --- a/pkg/loadtest/loadtest.go +++ b/pkg/loadtest/loadtest.go @@ -6,7 +6,8 @@ import ( "github.com/interchainio/tm-load-test/internal/logging" ) -func executeLoadTest(cfg Config) error { +// ExecuteStandalone will run a standalone (non-master/slave) load test. +func ExecuteStandalone(cfg Config) error { logger := logging.NewLogrusLogger("loadtest") // if we need to wait for the network to stabilize first @@ -35,14 +36,29 @@ func executeLoadTest(cfg Config) error { logger.Info("Initiating load test") tg.Start() - // we want to know if the user hits Ctrl+Break - cancelTrap := trapInterrupts(func() { tg.Cancel() }, logger) - defer close(cancelTrap) + var cancelTrap chan struct{} + if !cfg.NoTrapInterrupts { + // we want to know if the user hits Ctrl+Break + cancelTrap = trapInterrupts(func() { tg.Cancel() }, logger) + defer close(cancelTrap) + } else { + logger.Debug("Skipping trapping of interrupts (e.g. Ctrl+Break)") + } if err := tg.Wait(); err != nil { logger.Error("Failed to execute load test", "err", err) return err } + + // if we need to write the final statistics + if len(cfg.StatsOutputFile) > 0 { + logger.Info("Writing aggregate statistics", "outputFile", cfg.StatsOutputFile) + if err := tg.WriteAggregateStats(cfg.StatsOutputFile); err != nil { + logger.Error("Failed to write aggregate statistics", "err", err) + return err + } + } + logger.Info("Load test complete!") return nil } diff --git a/pkg/loadtest/master.go b/pkg/loadtest/master.go index 2bcbab7..695573c 100644 --- a/pkg/loadtest/master.go +++ b/pkg/loadtest/master.go @@ -55,10 +55,11 @@ type Master struct { totalTxsPerSlave map[string]int // The number of transactions sent by each slave. // Prometheus metrics - stateMetric prometheus.Gauge // A code-based status metric for representing the master's current state. - totalTxsMetric prometheus.Gauge // The total number of transactions sent by all slaves. - txRateMetric prometheus.Gauge // The transaction throughput rate (tx/sec) as measured by the master since the last metrics update. - overallTxRateMetric prometheus.Gauge // The overall transaction throughput rate (tx/sec) as measured by the master since the beginning of the load test. + stateMetric prometheus.Gauge // A code-based status metric for representing the master's current state. + totalTxsMetric prometheus.Gauge // The total number of transactions sent by all slaves. + txRateMetric prometheus.Gauge // The transaction throughput rate (tx/sec) as measured by the master since the last metrics update. + overallTxRateMetric prometheus.Gauge // The overall transaction throughput rate (tx/sec) as measured by the master since the beginning of the load test. + slavesCompletedMetric prometheus.Gauge // The total number of slaves that have completed their testing. mtx sync.Mutex cancelled bool @@ -108,6 +109,10 @@ func NewMaster(cfg *Config, masterCfg *MasterConfig) *Master { Name: "tmloadtest_master_overall_tx_rate", Help: "The overall transaction throughput rate as seen by the tm-load-test master since the beginning of the load test", }), + slavesCompletedMetric: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "tmloadtest_master_slaves_completed", + Help: "The total number of slaves that have completed their testing so far", + }), } mux := http.NewServeMux() mux.HandleFunc("/", master.newWebSocketHandler()) @@ -254,7 +259,7 @@ func (m *Master) receiveTestingUpdates() error { completed++ if completed >= m.masterCfg.ExpectSlaves { m.logger.Info("All slaves completed their load testing") - m.logTestingProgress() + m.logTestingProgress(completed) return nil } @@ -272,7 +277,7 @@ func (m *Master) receiveTestingUpdates() error { } case <-progressTicker.C: - m.logTestingProgress() + m.logTestingProgress(completed) case <-m.stop: m.logger.Debug("Load testing cancel signal received") @@ -328,7 +333,7 @@ func (m *Master) ReceiveSlaveUpdate(msg slaveMsg) { m.slaveUpdate <- msg } -func (m *Master) logTestingProgress() { +func (m *Master) logTestingProgress(completed int) { totalTxs := 0 for _, txCount := range m.totalTxsPerSlave { totalTxs += txCount @@ -358,6 +363,14 @@ func (m *Master) logTestingProgress() { m.totalTxsMetric.Set(float64(totalTxs)) m.txRateMetric.Set(avgRate) m.overallTxRateMetric.Set(overallAvgRate) + m.slavesCompletedMetric.Set(float64(completed)) + + // if we're done and we need to write aggregate statistics + if completed >= m.masterCfg.ExpectSlaves && len(m.cfg.StatsOutputFile) > 0 { + if err := writeAggregateStats(m.cfg.StatsOutputFile, totalTxs, overallElapsed); err != nil { + m.logger.Error("Failed to write aggregate statistics", "err", err) + } + } } func (m *Master) startLoadTest() error { @@ -411,7 +424,7 @@ func (m *Master) shutdownServer() { if !m.wasCancelled() && m.masterCfg.ShutdownWait > 0 { m.logger.Info("Entering post-shutdown wait period", "wait", fmt.Sprintf("%ds", m.masterCfg.ShutdownWait)) cancelSleep := make(chan struct{}) - cancelTrap := trapInterrupts(func() { close(cancelSleep); }, m.logger) + cancelTrap := trapInterrupts(func() { close(cancelSleep) }, m.logger) select { case <-cancelSleep: m.logger.Info("Cancelling shutdown wait") diff --git a/pkg/loadtest/slave.go b/pkg/loadtest/slave.go index fa701d8..0872a1e 100644 --- a/pkg/loadtest/slave.go +++ b/pkg/loadtest/slave.go @@ -255,8 +255,8 @@ func (s *Slave) executeLoadTest() error { s.logger.Info("Initiating load test") tg.Start() - s.setInterrupt("executeLoadTest", func() { tg.Cancel() }) - defer s.removeInterrupt("executeLoadTest") + s.setInterrupt("ExecuteStandalone", func() { tg.Cancel() }) + defer s.removeInterrupt("ExecuteStandalone") if err := tg.Wait(); err != nil { s.logger.Error("Failed to execute load test", "err", err) diff --git a/pkg/loadtest/stats.go b/pkg/loadtest/stats.go new file mode 100644 index 0000000..8aa1618 --- /dev/null +++ b/pkg/loadtest/stats.go @@ -0,0 +1,30 @@ +package loadtest + +import ( + "encoding/csv" + "fmt" + "os" +) + +func writeAggregateStats(filename string, totalTxs int, totalTimeSeconds float64) error { + avgTxRate := float64(0) + if totalTimeSeconds > 0.0 { + avgTxRate = float64(totalTxs) / totalTimeSeconds + } + f, err := os.Create(filename) + if err != nil { + return err + } + defer f.Close() + + w := csv.NewWriter(f) + defer w.Flush() + + records := [][]string{ + {"Parameter", "Value", "Units"}, + {"total_time", fmt.Sprintf("%.3f", totalTimeSeconds), "seconds"}, + {"total_txs", fmt.Sprintf("%d", totalTxs), "count"}, + {"avg_tx_rate", fmt.Sprintf("%.6f", avgTxRate), "transactions per second"}, + } + return w.WriteAll(records) +} diff --git a/pkg/loadtest/transactor_group.go b/pkg/loadtest/transactor_group.go index 414de17..2405f16 100644 --- a/pkg/loadtest/transactor_group.go +++ b/pkg/loadtest/transactor_group.go @@ -10,8 +10,9 @@ import ( type TransactorGroup struct { transactors []*Transactor - statsMtx sync.RWMutex - txCounts map[int]int // The counts of all of the total transactions per transactor. + statsMtx sync.RWMutex + startTime time.Time + txCounts map[int]int // The counts of all of the total transactions per transactor. progressCallbackMtx sync.RWMutex progressCallbackInterval time.Duration @@ -70,6 +71,7 @@ func (g *TransactorGroup) Start() { for _, t := range g.transactors { t.Start() } + g.setStartTime(time.Now()) } // Cancel signals to all transactors to stop their operations. @@ -110,6 +112,12 @@ func (g *TransactorGroup) Wait() error { return err } +func (g *TransactorGroup) WriteAggregateStats(filename string) error { + totalTxs := g.totalTxs() + totalTimeSeconds := time.Since(g.getStartTime()).Seconds() + return writeAggregateStats(filename, totalTxs, totalTimeSeconds) +} + func (g *TransactorGroup) progressReporter() { defer close(g.progressReporterStopped) @@ -127,6 +135,18 @@ func (g *TransactorGroup) progressReporter() { } } +func (g *TransactorGroup) setStartTime(startTime time.Time) { + g.statsMtx.Lock() + g.startTime = startTime + g.statsMtx.Unlock() +} + +func (g *TransactorGroup) getStartTime() time.Time { + g.statsMtx.RLock() + defer g.statsMtx.RUnlock() + return g.startTime +} + func (g *TransactorGroup) trackTransactorProgress(id int, txCount int) { g.statsMtx.Lock() g.txCounts[id] = txCount