From 749604244d589e3dab5818032f014c57cd220814 Mon Sep 17 00:00:00 2001 From: lesovsky Date: Tue, 23 Jun 2020 19:18:13 +0500 Subject: [PATCH] Add rollbacks. --- README.md | 2 +- app/app.go | 30 ++++++++++++++++++++---- app/cmd/main.go | 4 ++++ app/config.go | 2 ++ app/idle_xacts.go | 2 +- app/rollbacks.go | 60 +++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 94 insertions(+), 6 deletions(-) create mode 100644 app/rollbacks.go diff --git a/README.md b/README.md index 29e082a..ebcf805 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,13 @@ # Noisia Harmful workload generator for PostgreSQL. - idle transactions +- rollbacks - see built-in help for more runtime options. **ATTENTION: Use only for testing purposes, don't execute against production, reckless usage might cause problems.** --- #### TODO -- rollbacks - waiting transactions - deadlocks - temporary files diff --git a/app/app.go b/app/app.go index 63e76d8..94326d0 100644 --- a/app/app.go +++ b/app/app.go @@ -3,15 +3,37 @@ package app import ( "context" "github.com/lesovsky/noisia/app/internal/log" + "sync" ) func Start(ctx context.Context, c *Config) error { + var wg sync.WaitGroup + if c.IdleXact { - err := runIdleXactWorkload(ctx, c) - if err != nil { - log.Errorf("idle transactions workload failed: %s", err) - } + wg.Add(1) + go func() { + defer wg.Done() + + err := runIdleXactWorkload(ctx, c) + if err != nil { + log.Errorf("idle transactions workload failed: %s", err) + } + }() + } + + if c.Rollbacks { + wg.Add(1) + go func() { + defer wg.Done() + + err := runRollbacksWorkload(ctx, c) + if err != nil { + log.Errorf("rollbacks workload failed: %s", err) + } + }() } + wg.Wait() + return nil } diff --git a/app/cmd/main.go b/app/cmd/main.go index b98556d..7c25745 100644 --- a/app/cmd/main.go +++ b/app/cmd/main.go @@ -24,6 +24,8 @@ func main() { idleXact = kingpin.Flag("idle-xact", "Run idle transactions workload").Default("false").Envar("NOISIA_IDLE_XACT").Bool() idleXactNaptimeMin = kingpin.Flag("idle-xact.naptime-min", "Min transactions naptime, in seconds").Default("5").Envar("NOISIA_IDLE_XACT_NAPTIME_MIN").Int() idleXactNaptimeMax = kingpin.Flag("idle-xact.naptime-max", "Max transactions naptime, in seconds").Default("20").Envar("NOISIA_IDLE_XACT_NAPTIME_MAX").Int() + rollbacks = kingpin.Flag("rollbacks", "Run rollbacks workload").Default("false").Envar("NOISIA_ROLLBACKS").Bool() + rollbacksRate = kingpin.Flag("rollbacks.rate", "Number of transactions per second (per worker)").Default("10").Envar("NOISIA_ROLLBACKS_RATE").Int() ) kingpin.Parse() log.SetLevel(*logLevel) @@ -39,6 +41,8 @@ func main() { IdleXact: *idleXact, IdleXactNaptimeMin: *idleXactNaptimeMin, IdleXactNaptimeMax: *idleXactNaptimeMax, + Rollbacks: *rollbacks, + RollbacksRate: *rollbacksRate, } if err := config.Validate(); err != nil { diff --git a/app/config.go b/app/config.go index 3813a9c..3d959d2 100644 --- a/app/config.go +++ b/app/config.go @@ -10,6 +10,8 @@ type Config struct { IdleXact bool IdleXactNaptimeMin int IdleXactNaptimeMax int + Rollbacks bool + RollbacksRate int } func (c *Config) Validate() error { diff --git a/app/idle_xacts.go b/app/idle_xacts.go index 60e6ff3..c8b425d 100644 --- a/app/idle_xacts.go +++ b/app/idle_xacts.go @@ -26,7 +26,7 @@ func runIdleXactWorkload(ctx context.Context, config *Config) error { go func() { naptime := time.Duration(rand.Intn(config.IdleXactNaptimeMax-config.IdleXactNaptimeMin)+config.IdleXactNaptimeMin) * time.Second - log.Infof("starting xact with naptime %s", naptime) + log.Debugln("starting xact with naptime %s", naptime) err := startSingleIdleXact(context.Background(), pool, naptime) if err != nil { log.Errorln(err) diff --git a/app/rollbacks.go b/app/rollbacks.go new file mode 100644 index 0000000..dc9521e --- /dev/null +++ b/app/rollbacks.go @@ -0,0 +1,60 @@ +package app + +import ( + "context" + "github.com/jackc/pgx/v4/pgxpool" + "github.com/lesovsky/noisia/app/internal/log" + "time" +) + +func runRollbacksWorkload(ctx context.Context, config *Config) error { + log.Infoln("Starting rollbacks workload") + + pool, err := pgxpool.Connect(context.Background(), config.PostgresConninfo) + if err != nil { + return err + } + defer pool.Close() + + // keep specified number of workers using channel - run new workers until there is any free slot + guard := make(chan struct{}, config.Jobs) + for { + select { + // run workers only when it's possible to write into channel (channel is limited by number of jobs) + case guard <- struct{}{}: + go func() { + log.Debugln("starting xact with rollback") + var interval = 1000000000 / int64(config.RollbacksRate) + + err := startXactRollback(context.Background(), pool) + if err != nil { + log.Errorln(err) + } + time.Sleep(time.Duration(interval) * time.Nanosecond) + + <-guard + }() + case <-ctx.Done(): + log.Info("exit signaled, stop rollbacks workload") + return nil + } + } +} + +func startXactRollback(ctx context.Context, pool *pgxpool.Pool) error { + tx, err := pool.Begin(ctx) + if err != nil { + return err + } + + rows, err := tx.Query(ctx, "SELECT * FROM pg_stat_replication") + if err != nil { + return err + } + rows.Close() + + if err := tx.Rollback(ctx); err != nil { + log.Warnln(err) + } + return nil +}