Skip to content

Commit

Permalink
Expose the Boomer type and the RateLimiter interface
Browse files Browse the repository at this point in the history
Now users are able to control more options programatically instead of passing
command line parameters. And they can write their own rate limiters.

The cache of current time in second is removed, it may be a premature optimization.

Resolves: #54, #56
  • Loading branch information
myzhan committed Mar 11, 2019
1 parent cb960fc commit 366119e
Show file tree
Hide file tree
Showing 17 changed files with 500 additions and 345 deletions.
215 changes: 106 additions & 109 deletions boomer.go
Expand Up @@ -3,45 +3,102 @@ package boomer
import (
"flag"
"log"
"math"
"os"
"os/signal"
"runtime/pprof"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/asaskevich/EventBus"
)

var masterHost string
var masterPort int
// Events is the global event bus instance.
var Events = EventBus.New()

var maxRPS int64
var requestIncreaseRate string
var hatchType string
var runTasks string
var memoryProfile string
var memoryProfileDuration time.Duration
var cpuProfile string
var cpuProfileDuration time.Duration
var defaultBoomer *Boomer

var defaultRunner *runner
// A Boomer is used to run tasks.
type Boomer struct {
masterHost string
masterPort int

var initiated uint32
var initMutex = sync.Mutex{}
hatchType string
rateLimiter RateLimiter
runner *runner
}

// Init boomer
func initBoomer() {
if atomic.LoadUint32(&initiated) == 1 {
panic("Don't call boomer.Run() more than once.")
// NewBoomer returns a new Boomer.
func NewBoomer(masterHost string, masterPort int) *Boomer {
return &Boomer{
masterHost: masterHost,
masterPort: masterPort,
hatchType: "asap",
}
}

// TODO: to be removed
initLegacyEventHandlers()
// SetRateLimiter allows user to use their own rate limiter.
// It must be called before the test is started.
func (b *Boomer) SetRateLimiter(rateLimiter RateLimiter) {
b.rateLimiter = rateLimiter
}

// SetHatchType only accepts "asap" or "smooth".
// "asap" means spawning goroutines as soon as possible when the test is started.
// "smooth" means a constant pace.
func (b *Boomer) SetHatchType(hatchType string) {
if hatchType != "asap" && hatchType != "smooth" {
log.Printf("Wrong hatch-type, expected asap or smooth, was %s\n", hatchType)
return
}
b.hatchType = hatchType
}

func (b *Boomer) setRunner(runner *runner) {
b.runner = runner
}

// Run accepts a slice of Task and connects to the locust master.
func (b *Boomer) Run(tasks ...*Task) {
b.runner = newRunner(tasks, b.rateLimiter, b.hatchType)
b.runner.masterHost = b.masterHost
b.runner.masterPort = b.masterPort
b.runner.getReady()
}

// RecordSuccess reports a success
func (b *Boomer) RecordSuccess(requestType, name string, responseTime int64, responseLength int64) {
b.runner.stats.requestSuccessChannel <- &requestSuccess{
requestType: requestType,
name: name,
responseTime: responseTime,
responseLength: responseLength,
}
}

// done
atomic.StoreUint32(&initiated, 1)
// RecordFailure reports a failure.
func (b *Boomer) RecordFailure(requestType, name string, responseTime int64, exception string) {
b.runner.stats.requestFailureChannel <- &requestFailure{
requestType: requestType,
name: name,
responseTime: responseTime,
error: exception,
}
}

// Quit will send a quit message to the master.
func (b *Boomer) Quit() {
Events.Publish("boomer:quit")
var ticker = time.NewTicker(3 * time.Second)
for {
// wait for quit message is sent to master
select {
case <-b.runner.client.disconnectedChannel():
return
case <-ticker.C:
log.Println("Timeout waiting for sending quit message to master, boomer will quit any way.")
return
}
}
}

// Run tasks without connecting to the master.
Expand All @@ -61,26 +118,8 @@ func runTasksForTest(tasks ...*Task) {
}
}

func createRateLimiter(maxRPS int64, requestIncreaseRate string) (rateLimiter rateLimiter, err error) {
if requestIncreaseRate != "-1" {
if maxRPS > 0 {
log.Println("The max RPS that boomer may generate is limited to", maxRPS, "with a increase rate", requestIncreaseRate)
rateLimiter, err = newRampUpRateLimiter(maxRPS, requestIncreaseRate, time.Second)
} else {
log.Println("The max RPS that boomer may generate is limited by a increase rate", requestIncreaseRate)
rateLimiter, err = newRampUpRateLimiter(math.MaxInt64, requestIncreaseRate, time.Second)
}
} else {
if maxRPS > 0 {
log.Println("The max RPS that boomer may generate is limited to", maxRPS)
rateLimiter = newStableRateLimiter(maxRPS, time.Second)
}
}
return rateLimiter, err
}

// Run accepts a slice of Task and connects
// to a locust master.
// Run accepts a slice of Task and connects to a locust master.
// It's a convenience function to use the defaultBoomer.
func Run(tasks ...*Task) {
if !flag.Parsed() {
flag.Parse()
Expand All @@ -91,85 +130,43 @@ func Run(tasks ...*Task) {
return
}

// init boomer
initMutex.Lock()
initBoomer()
initMutex.Unlock()

rateLimiter, err := createRateLimiter(maxRPS, requestIncreaseRate)
if err != nil {
log.Fatalf("Failed to create rate limiter, %v\n", err)
}

defaultRunner = newRunner(tasks, rateLimiter, hatchType)
defaultRunner.masterHost = masterHost
defaultRunner.masterPort = masterPort
defaultRunner.getReady()
defaultBoomer = NewBoomer(masterHost, masterPort)
initLegacyEventHandlers()

if memoryProfile != "" {
startMemoryProfile(memoryProfile, memoryProfileDuration)
StartMemoryProfile(memoryProfile, memoryProfileDuration)
}

if cpuProfile != "" {
startCPUProfile(cpuProfile, cpuProfileDuration)
StartCPUProfile(cpuProfile, cpuProfileDuration)
}

rateLimiter, err := createRateLimiter(maxRPS, requestIncreaseRate)
if err != nil {
log.Fatalf("%v\n", err)
}
defaultBoomer.SetRateLimiter(rateLimiter)
defaultBoomer.hatchType = hatchType

defaultBoomer.Run(tasks...)

c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)

<-c
Events.Publish("boomer:quit")
defaultBoomer.Quit()

// wait for quit message is sent to master
<-defaultRunner.client.disconnectedChannel()
log.Println("shut down")
}

func startMemoryProfile(file string, duration time.Duration) {
f, err := os.Create(file)
if err != nil {
log.Fatal(err)
}
time.AfterFunc(duration, func() {
err = pprof.WriteHeapProfile(f)
if err != nil {
log.Println(err)
return
}
f.Close()
log.Println("Stop memory profiling after", duration)
})
}

func startCPUProfile(file string, duration time.Duration) {
f, err := os.Create(file)
if err != nil {
log.Fatal(err)
}

err = pprof.StartCPUProfile(f)
if err != nil {
log.Println(err)
f.Close()
return
}

time.AfterFunc(duration, func() {
pprof.StopCPUProfile()
f.Close()
log.Println("Stop CPU profiling after", duration)
})
// RecordSuccess reports a success.
// It's a convenience function to use the defaultBoomer.
func RecordSuccess(requestType, name string, responseTime int64, responseLength int64) {
defaultBoomer.RecordSuccess(requestType, name, responseTime, responseLength)
}

func init() {
flag.Int64Var(&maxRPS, "max-rps", 0, "Max RPS that boomer can generate, disabled by default.")
flag.StringVar(&requestIncreaseRate, "request-increase-rate", "-1", "Request increase rate, disabled by default.")
flag.StringVar(&hatchType, "hatch-type", "asap", "How to create goroutines according to hatch rate, 'asap' will do it as soon as possible while 'smooth' means a constant pace.")
flag.StringVar(&runTasks, "run-tasks", "", "Run tasks without connecting to the master, multiply tasks is separated by comma. Usually, it's for debug purpose.")
flag.StringVar(&masterHost, "master-host", "127.0.0.1", "Host or IP address of locust master for distributed load testing.")
flag.IntVar(&masterPort, "master-port", 5557, "The port to connect to that is used by the locust master for distributed load testing.")
flag.StringVar(&memoryProfile, "mem-profile", "", "Enable memory profiling.")
flag.DurationVar(&memoryProfileDuration, "mem-profile-duration", 30*time.Second, "Memory profile duration.")
flag.StringVar(&cpuProfile, "cpu-profile", "", "Enable CPU profiling.")
flag.DurationVar(&cpuProfileDuration, "cpu-profile-duration", 30*time.Second, "CPU profile duration.")
// RecordFailure reports a failure.
// It's a convenience function to use the defaultBoomer.
func RecordFailure(requestType, name string, responseTime int64, exception string) {
defaultBoomer.RecordFailure(requestType, name, responseTime, exception)
}
65 changes: 29 additions & 36 deletions boomer_test.go
Expand Up @@ -2,25 +2,10 @@ package boomer

import (
"math"
"os"
"testing"
"time"
)

func TestInitBoomer(t *testing.T) {
initBoomer()
defer Events.Unsubscribe("request_success", legacySuccessHandler)
defer Events.Unsubscribe("request_failure", legacyFailureHandler)

defer func() {
err := recover()
if err == nil {
t.Error("It should panic if initBoomer is called more than once.")
}
}()
initBoomer()
}

func TestRunTasksForTest(t *testing.T) {
count := 0
taskA := &Task{
Expand All @@ -36,6 +21,7 @@ func TestRunTasksForTest(t *testing.T) {
},
}
runTasks = "increaseCount,foobar"

runTasksForTest(taskA, taskWithoutName)

if count != 1 {
Expand All @@ -45,7 +31,7 @@ func TestRunTasksForTest(t *testing.T) {

func TestCreateRatelimiter(t *testing.T) {
rateLimiter, _ := createRateLimiter(100, "-1")
if stableRateLimiter, ok := rateLimiter.(*stableRateLimiter); !ok {
if stableRateLimiter, ok := rateLimiter.(*StableRateLimiter); !ok {
t.Error("Expected stableRateLimiter")
} else {
if stableRateLimiter.threshold != 100 {
Expand All @@ -54,7 +40,7 @@ func TestCreateRatelimiter(t *testing.T) {
}

rateLimiter, _ = createRateLimiter(0, "1")
if rampUpRateLimiter, ok := rateLimiter.(*rampUpRateLimiter); !ok {
if rampUpRateLimiter, ok := rateLimiter.(*RampUpRateLimiter); !ok {
t.Error("Expected rampUpRateLimiter")
} else {
if rampUpRateLimiter.maxThreshold != math.MaxInt64 {
Expand All @@ -66,7 +52,7 @@ func TestCreateRatelimiter(t *testing.T) {
}

rateLimiter, _ = createRateLimiter(10, "2/2s")
if rampUpRateLimiter, ok := rateLimiter.(*rampUpRateLimiter); !ok {
if rampUpRateLimiter, ok := rateLimiter.(*RampUpRateLimiter); !ok {
t.Error("Expected rampUpRateLimiter")
} else {
if rampUpRateLimiter.maxThreshold != 10 {
Expand All @@ -84,28 +70,35 @@ func TestCreateRatelimiter(t *testing.T) {
}
}

func TestStartMemoryProfile(t *testing.T) {
if _, err := os.Stat("mem.pprof"); os.IsExist(err) {
os.Remove("mem.pprof")
func TestRecordSuccess(t *testing.T) {
defaultBoomer = NewBoomer("127.0.0.1", 5557)
defaultBoomer.runner = newRunner(nil, nil, "asap")
RecordSuccess("http", "foo", int64(1), int64(10))

requestSuccessMsg := <-defaultBoomer.runner.stats.requestSuccessChannel
if requestSuccessMsg.requestType != "http" {
t.Error("Expected: http, got:", requestSuccessMsg.requestType)
}
startMemoryProfile("mem.pprof", 2*time.Second)
time.Sleep(2100 * time.Millisecond)
if _, err := os.Stat("mem.pprof"); os.IsNotExist(err) {
t.Error("File mem.pprof is not generated")
} else {
os.Remove("mem.pprof")
if requestSuccessMsg.responseTime != int64(1) {
t.Error("Expected: 1, got:", requestSuccessMsg.responseTime)
}
defaultBoomer = nil
}

func TestStartCPUProfile(t *testing.T) {
if _, err := os.Stat("cpu.pprof"); os.IsExist(err) {
os.Remove("cpu.pprof")
func TestRecordFailure(t *testing.T) {
defaultBoomer = NewBoomer("127.0.0.1", 5557)
defaultBoomer.runner = newRunner(nil, nil, "asap")
RecordFailure("udp", "bar", int64(2), "udp error")

requestFailureMsg := <-defaultBoomer.runner.stats.requestFailureChannel
if requestFailureMsg.requestType != "udp" {
t.Error("Expected: udp, got:", requestFailureMsg.requestType)
}
startCPUProfile("cpu.pprof", 2*time.Second)
time.Sleep(2100 * time.Millisecond)
if _, err := os.Stat("cpu.pprof"); os.IsNotExist(err) {
t.Error("File cpu.pprof is not generated")
} else {
os.Remove("cpu.pprof")
if requestFailureMsg.responseTime != int64(2) {
t.Error("Expected: 2, got:", requestFailureMsg.responseTime)
}
if requestFailureMsg.error != "udp error" {
t.Error("Expected: udp error, got:", requestFailureMsg.error)
}
defaultBoomer = nil
}

0 comments on commit 366119e

Please sign in to comment.