Skip to content

Commit

Permalink
Add rollbacks.
Browse files Browse the repository at this point in the history
  • Loading branch information
lesovsky committed Jun 23, 2020
1 parent 2d057f6 commit 7496042
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 6 deletions.
2 changes: 1 addition & 1 deletion README.md
@@ -1,13 +1,13 @@
# Noisia
Harmful workload generator for PostgreSQL.
- idle transactions
- rollbacks
- see built-in help for more runtime options.

**ATTENTION: Use only for testing purposes, don't execute against production, reckless usage might cause problems.**

---
#### TODO
- rollbacks
- waiting transactions
- deadlocks
- temporary files
30 changes: 26 additions & 4 deletions app/app.go
Expand Up @@ -3,15 +3,37 @@ package app
import (
"context"
"github.com/lesovsky/noisia/app/internal/log"
"sync"
)

func Start(ctx context.Context, c *Config) error {
var wg sync.WaitGroup

if c.IdleXact {
err := runIdleXactWorkload(ctx, c)
if err != nil {
log.Errorf("idle transactions workload failed: %s", err)
}
wg.Add(1)
go func() {
defer wg.Done()

err := runIdleXactWorkload(ctx, c)
if err != nil {
log.Errorf("idle transactions workload failed: %s", err)
}
}()
}

if c.Rollbacks {
wg.Add(1)
go func() {
defer wg.Done()

err := runRollbacksWorkload(ctx, c)
if err != nil {
log.Errorf("rollbacks workload failed: %s", err)
}
}()
}

wg.Wait()

return nil
}
4 changes: 4 additions & 0 deletions app/cmd/main.go
Expand Up @@ -24,6 +24,8 @@ func main() {
idleXact = kingpin.Flag("idle-xact", "Run idle transactions workload").Default("false").Envar("NOISIA_IDLE_XACT").Bool()
idleXactNaptimeMin = kingpin.Flag("idle-xact.naptime-min", "Min transactions naptime, in seconds").Default("5").Envar("NOISIA_IDLE_XACT_NAPTIME_MIN").Int()
idleXactNaptimeMax = kingpin.Flag("idle-xact.naptime-max", "Max transactions naptime, in seconds").Default("20").Envar("NOISIA_IDLE_XACT_NAPTIME_MAX").Int()
rollbacks = kingpin.Flag("rollbacks", "Run rollbacks workload").Default("false").Envar("NOISIA_ROLLBACKS").Bool()
rollbacksRate = kingpin.Flag("rollbacks.rate", "Number of transactions per second (per worker)").Default("10").Envar("NOISIA_ROLLBACKS_RATE").Int()
)
kingpin.Parse()
log.SetLevel(*logLevel)
Expand All @@ -39,6 +41,8 @@ func main() {
IdleXact: *idleXact,
IdleXactNaptimeMin: *idleXactNaptimeMin,
IdleXactNaptimeMax: *idleXactNaptimeMax,
Rollbacks: *rollbacks,
RollbacksRate: *rollbacksRate,
}

if err := config.Validate(); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions app/config.go
Expand Up @@ -10,6 +10,8 @@ type Config struct {
IdleXact bool
IdleXactNaptimeMin int
IdleXactNaptimeMax int
Rollbacks bool
RollbacksRate int
}

func (c *Config) Validate() error {
Expand Down
2 changes: 1 addition & 1 deletion app/idle_xacts.go
Expand Up @@ -26,7 +26,7 @@ func runIdleXactWorkload(ctx context.Context, config *Config) error {
go func() {
naptime := time.Duration(rand.Intn(config.IdleXactNaptimeMax-config.IdleXactNaptimeMin)+config.IdleXactNaptimeMin) * time.Second

log.Infof("starting xact with naptime %s", naptime)
log.Debugln("starting xact with naptime %s", naptime)
err := startSingleIdleXact(context.Background(), pool, naptime)
if err != nil {
log.Errorln(err)
Expand Down
60 changes: 60 additions & 0 deletions app/rollbacks.go
@@ -0,0 +1,60 @@
package app

import (
"context"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/lesovsky/noisia/app/internal/log"
"time"
)

func runRollbacksWorkload(ctx context.Context, config *Config) error {
log.Infoln("Starting rollbacks workload")

pool, err := pgxpool.Connect(context.Background(), config.PostgresConninfo)
if err != nil {
return err
}
defer pool.Close()

// keep specified number of workers using channel - run new workers until there is any free slot
guard := make(chan struct{}, config.Jobs)
for {
select {
// run workers only when it's possible to write into channel (channel is limited by number of jobs)
case guard <- struct{}{}:
go func() {
log.Debugln("starting xact with rollback")
var interval = 1000000000 / int64(config.RollbacksRate)

err := startXactRollback(context.Background(), pool)
if err != nil {
log.Errorln(err)
}
time.Sleep(time.Duration(interval) * time.Nanosecond)

<-guard
}()
case <-ctx.Done():
log.Info("exit signaled, stop rollbacks workload")
return nil
}
}
}

func startXactRollback(ctx context.Context, pool *pgxpool.Pool) error {
tx, err := pool.Begin(ctx)
if err != nil {
return err
}

rows, err := tx.Query(ctx, "SELECT * FROM pg_stat_replication")
if err != nil {
return err
}
rows.Close()

if err := tx.Rollback(ctx); err != nil {
log.Warnln(err)
}
return nil
}

0 comments on commit 7496042

Please sign in to comment.