Skip to content

Commit

Permalink
Add a test runner (#40)
Browse files Browse the repository at this point in the history
Add a test runner

Something that came up in #39 - there's a fair bit of duplication
in the tests. A runner make current setup a bit easier/shorter,
but would make #39 much smaller.

There's no functional changes.
  • Loading branch information
rabbbit committed Oct 26, 2020
1 parent 40c185c commit a7b5fef
Showing 1 changed file with 105 additions and 84 deletions.
189 changes: 105 additions & 84 deletions ratelimit_test.go
Expand Up @@ -13,6 +13,82 @@ import (
"github.com/stretchr/testify/assert"
)

type runner interface {
// startTaking tries to Take() on passed in limiters in a loop/goroutine.
startTaking(rls ...ratelimit.Limiter)
// assertCountAt asserts the limiters have Taken() a number of times at the given time.
// It's a thin wrapper around afterFunc to reduce boilerplate code.
assertCountAt(d time.Duration, count int)
// getClock returns the test clock.
getClock() ratelimit.Clock
// afterFunc executes a func at a given time.
afterFunc(d time.Duration, fn func())
}

type runnerImpl struct {
t *testing.T

clock *clock.Mock
count atomic.Int32
// maxDuration is the time we need to move into the future for a test.
// It's populated automatically based on assertCountAt/afterFunc.
maxDuration time.Duration
doneCh chan struct{}
wg sync.WaitGroup
}

func runTest(t *testing.T, fn func(runner)) {
r := runnerImpl{
t: t,
clock: clock.NewMock(),
doneCh: make(chan struct{}),
}
defer close(r.doneCh)
defer r.wg.Wait()

fn(&r)
r.clock.Add(r.maxDuration)
}

// startTaking tries to Take() on passed in limiters in a loop/goroutine.
func (r *runnerImpl) startTaking(rls ...ratelimit.Limiter) {
go func() {
for {
for _, rl := range rls {
rl.Take()
}
r.count.Inc()
select {
case <-r.doneCh:
return
default:
}
}
}()
}

// assertCountAt asserts the limiters have Taken() a number of times at a given time.
func (r *runnerImpl) assertCountAt(d time.Duration, count int) {
r.wg.Add(1)
r.afterFunc(d, func() {
assert.InDelta(r.t, count, r.count.Load(), 10, "count within rate limit")
r.wg.Done()
})
}

// getClock return the test clock.
func (r *runnerImpl) getClock() ratelimit.Clock {
return r.clock
}

// afterFunc executes a func at a given time.
func (r *runnerImpl) afterFunc(d time.Duration, fn func()) {
if d > r.maxDuration {
r.maxDuration = d
}
r.clock.AfterFunc(d, fn)
}

func Example() {
rl := ratelimit.New(100) // per second

Expand Down Expand Up @@ -47,95 +123,40 @@ func TestUnlimited(t *testing.T) {
}

func TestRateLimiter(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait()

clock := clock.NewMock()
rl := ratelimit.New(100, ratelimit.WithClock(clock), ratelimit.WithoutSlack)

count := atomic.NewInt32(0)

// Until we're done...
done := make(chan struct{})
defer close(done)

// Create copious counts concurrently.
go job(rl, count, done)
go job(rl, count, done)
go job(rl, count, done)
go job(rl, count, done)

clock.AfterFunc(1*time.Second, func() {
assert.InDelta(t, 100, count.Load(), 10, "count within rate limit")
})

clock.AfterFunc(2*time.Second, func() {
assert.InDelta(t, 200, count.Load(), 10, "count within rate limit")
})

clock.AfterFunc(3*time.Second, func() {
assert.InDelta(t, 300, count.Load(), 10, "count within rate limit")
wg.Done()
runTest(t, func(r runner) {
rl := ratelimit.New(100, ratelimit.WithClock(r.getClock()), ratelimit.WithoutSlack)

// Create copious counts concurrently.
r.startTaking(rl)
r.startTaking(rl)
r.startTaking(rl)
r.startTaking(rl)

r.assertCountAt(1*time.Second, 100)
r.assertCountAt(2*time.Second, 200)
r.assertCountAt(3*time.Second, 300)
})

clock.Add(4 * time.Second)
}

func TestDelayedRateLimiter(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait()

clock := clock.NewMock()
slow := ratelimit.New(10, ratelimit.WithClock(clock))
fast := ratelimit.New(100, ratelimit.WithClock(clock))

count := atomic.NewInt32(0)

// Until we're done...
done := make(chan struct{})
defer close(done)

// Run a slow job
go func() {
for {
slow.Take()
fast.Take()
count.Inc()
select {
case <-done:
return
default:
}
}
}()

// Accumulate slack for 10 seconds,
clock.AfterFunc(20*time.Second, func() {
// Then start working.
go job(fast, count, done)
go job(fast, count, done)
go job(fast, count, done)
go job(fast, count, done)
runTest(t, func(r runner) {
slow := ratelimit.New(10, ratelimit.WithClock(r.getClock()))
fast := ratelimit.New(100, ratelimit.WithClock(r.getClock()))

// Run a slow startTaking
r.startTaking(slow, fast)

// Accumulate slack for 10 seconds,
r.afterFunc(20*time.Second, func() {
// Then start working.
r.startTaking(fast)
r.startTaking(fast)
r.startTaking(fast)
r.startTaking(fast)
})

r.assertCountAt(30*time.Second, 1200)
})

clock.AfterFunc(30*time.Second, func() {
assert.InDelta(t, 1200, count.Load(), 10, "count within rate limit")
wg.Done()
})

clock.Add(40 * time.Second)
}

func job(rl ratelimit.Limiter, count *atomic.Int32, done <-chan struct{}) {
for {
rl.Take()
count.Inc()
select {
case <-done:
return
default:
}
}
}

0 comments on commit a7b5fef

Please sign in to comment.