Skip to content

Commit

Permalink
Rollbacks: add support for min/max rollbacks rate.
Browse files Browse the repository at this point in the history
  • Loading branch information
lesovsky committed Aug 9, 2020
1 parent 9e9acb8 commit 78233ad
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 17 deletions.
6 changes: 4 additions & 2 deletions cmd/app.go
Expand Up @@ -26,7 +26,8 @@ type config struct {
idleXactsNaptimeMin int
idleXactsNaptimeMax int
rollbacks bool
rollbacksRate int
rollbacksMinRate int
rollbacksMaxRate int
waitXacts bool
waitXactsLocktimeMin int
waitXactsLocktimeMax int
Expand Down Expand Up @@ -123,7 +124,8 @@ func startRollbacksWorkload(ctx context.Context, wg *sync.WaitGroup, c *config)
workload := rollbacks.NewWorkload(&rollbacks.Config{
PostgresConninfo: c.postgresConninfo,
Jobs: c.jobs,
RollbacksRate: c.rollbacksRate,
MinRate: c.rollbacksMinRate,
MaxRate: c.rollbacksMaxRate,
})

err := workload.Run(ctx)
Expand Down
6 changes: 4 additions & 2 deletions cmd/main.go
Expand Up @@ -26,7 +26,8 @@ func main() {
idleXactsNaptimeMin = kingpin.Flag("idle-xacts.naptime-min", "Min transactions naptime, in seconds").Default("5").Envar("NOISIA_IDLE_XACTS_NAPTIME_MIN").Int()
idleXactsNaptimeMax = kingpin.Flag("idle-xacts.naptime-max", "Max transactions naptime, in seconds").Default("20").Envar("NOISIA_IDLE_XACTS_NAPTIME_MAX").Int()
rollbacks = kingpin.Flag("rollbacks", "Run rollbacks workload").Default("false").Envar("NOISIA_ROLLBACKS").Bool()
rollbacksRate = kingpin.Flag("rollbacks.rate", "Number of transactions per second (per worker)").Default("10").Envar("NOISIA_ROLLBACKS_RATE").Int()
rollbacksMinRate = kingpin.Flag("rollbacks.min-rate", "Approximate minimum number of rollbacks per second (per worker)").Default("10").Envar("NOISIA_ROLLBACKS_MIN_RATE").Int()
rollbacksMaxRate = kingpin.Flag("rollbacks.max-rate", "Approximate maximum number of rollbacks per second (per worker)").Default("10").Envar("NOISIA_ROLLBACKS_MAX_RATE").Int()
waitXacts = kingpin.Flag("wait-xacts", "Run idle transactions workload").Default("false").Envar("NOISIA_IDLE_XACTS").Bool()
waitXactsLocktimeMin = kingpin.Flag("wait-xacts.locktime-min", "Min transactions locking time, in seconds").Default("5").Envar("NOISIA_WAIT_XACTS_LOCKTIME_MIN").Int()
waitXactsLocktimeMax = kingpin.Flag("wait-xacts.locktime-max", "Max transactions locking time, in seconds").Default("20").Envar("NOISIA_WAIT_XACTS_LOCKTIME_MAX").Int()
Expand Down Expand Up @@ -60,7 +61,8 @@ func main() {
idleXactsNaptimeMin: *idleXactsNaptimeMin,
idleXactsNaptimeMax: *idleXactsNaptimeMax,
rollbacks: *rollbacks,
rollbacksRate: *rollbacksRate,
rollbacksMinRate: *rollbacksMinRate,
rollbacksMaxRate: *rollbacksMaxRate,
waitXacts: *waitXacts,
waitXactsLocktimeMin: *waitXactsLocktimeMin,
waitXactsLocktimeMax: *waitXactsLocktimeMax,
Expand Down
28 changes: 20 additions & 8 deletions rollbacks/rollbacks.go
Expand Up @@ -4,12 +4,13 @@ import (
"context"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/lesovsky/noisia"
"math/rand"
"time"
)

const (
// defaultRollbacksRate defines default rate of produced rollbacks per second.
defaultRollbacksRate = 10
// defaultRate defines default rate of produced rollbacks per second.
defaultRate = 10
)

// Config defines configuration settings for rollbacks workload.
Expand All @@ -18,13 +19,20 @@ type Config struct {
PostgresConninfo string
// Jobs defines how many workers should be created for producing rollbacks.
Jobs uint16
// RollbacksRate defines target rate for produced rollbacks per second (per single worker).
RollbacksRate int
// MinRate defines minimum approximate target rate for produced rollbacks per second (per single worker).
MinRate int
// MaxRate defines maximum approximate target rate for produced rollbacks per second (per single worker).
MaxRate int
}

func (c *Config) defaults() {
if c.RollbacksRate == 0 {
c.RollbacksRate = defaultRollbacksRate
if c.MinRate == 0 && c.MaxRate == 0 {
c.MinRate, c.MaxRate = defaultRate, defaultRate
}

// Min rate cannot be higher than max rate.
if c.MinRate > c.MaxRate {
c.MinRate = c.MaxRate
}
}

Expand All @@ -46,12 +54,16 @@ func (w *workload) Run(ctx context.Context) error {
}
defer pool.Close()

// calculate inter-query interval for rate throttling
interval := 1000000000 / int64(w.config.RollbacksRate)
// Calculate interval for rate throttling. Use 900ms as working time and take 100ms as calculation overhead.
interval := 900000000 / int64(w.config.MinRate)

// keep specified number of workers using channel - run new workers until there is any free slot
guard := make(chan struct{}, w.config.Jobs)
for {
if w.config.MinRate != w.config.MaxRate {
interval = 900000000 / int64(rand.Intn(w.config.MaxRate-w.config.MinRate)+w.config.MinRate)
}

select {
// run workers only when it's possible to write into channel (channel is limited by number of jobs)
case guard <- struct{}{}:
Expand Down
6 changes: 1 addition & 5 deletions rollbacks/rollbacks_test.go
Expand Up @@ -9,11 +9,7 @@ import (
)

func TestWorkload_Run(t *testing.T) {
config := &Config{
PostgresConninfo: "host=127.0.0.1",
Jobs: 2,
RollbacksRate: 2,
}
config := &Config{PostgresConninfo: "host=127.0.0.1", Jobs: 2, MinRate: 2, MaxRate: 2}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
Expand Down

0 comments on commit 78233ad

Please sign in to comment.