Skip to content

Commit

Permalink
rules: additional RateLimit unit tests and bugfix w/ respect to Overflow
Browse files Browse the repository at this point in the history
  • Loading branch information
James DeFelice committed Jun 9, 2017
1 parent 5ddd39e commit a944cc1
Show file tree
Hide file tree
Showing 9 changed files with 310 additions and 140 deletions.
16 changes: 8 additions & 8 deletions api/v1/lib/extras/executor/callrules/callrules_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,19 +201,19 @@ const (
OverflowDiscardWithError
// OverflowBackpressure waits until the rule may execute, or the context is canceled.
OverflowBackpressure
// OverflowSkipRule skips over the decorated rule and continues processing the rule chain
OverflowSkipRule
// OverflowSkipRuleWithError skips over the decorated rule and merges ErrOverflow upon executing the chain
OverflowSkipRuleWithError
// OverflowSkip skips over the decorated rule and continues processing the rule chain
OverflowSkip
// OverflowSkipWithError skips over the decorated rule and merges ErrOverflow upon executing the chain
OverflowSkipWithError
)

var ErrOverflow = errors.New("overflow: rate limit exceeded")

// RateLimit invokes the receiving Rule if the chan is readable (may be closed), otherwise it handles the "overflow"
// according to the specified Overflow policy. May be useful, for example, when rate-limiting logged events.
// A nil chan will always skip the rule.
// Returns nil (noop) if the receiver is nil, otherwise a nil chan will always trigger an overflow.
func (r Rule) RateLimit(p <-chan struct{}, over Overflow) Rule {
if p == nil || r == nil {
if r == nil {
return nil
}
return func(ctx context.Context, e *executor.Call, z mesos.Response, err error, ch Chain) (context.Context, *executor.Call, mesos.Response, error) {
Expand Down Expand Up @@ -244,9 +244,9 @@ func (r Rule) RateLimit(p <-chan struct{}, over Overflow) Rule {
return ctx, e, z, Error2(err, ErrOverflow)
case OverflowDiscard:
return ctx, e, z, err
case OverflowSkipRuleWithError:
case OverflowSkipWithError:
return ch(ctx, e, z, Error2(err, ErrOverflow))
case OverflowSkipRule:
case OverflowSkip:
return ch(ctx, e, z, err)
default:
panic(fmt.Sprintf("unexpected Overflow type: %#v", over))
Expand Down
74 changes: 54 additions & 20 deletions api/v1/lib/extras/executor/callrules/callrules_generated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,50 +503,84 @@ func TestOnce(t *testing.T) {
}

func TestRateLimit(t *testing.T) {
// non-blocking, then blocking
o := func() <-chan struct{} {
x := make(chan struct{}, 1)
x <- struct{}{}
return x
}
var (
ch1 <-chan struct{} // always nil
ch2 = make(chan struct{}) // non-nil, blocking
ch3 = make(chan struct{}, 1) // non-nil, non-blocking then blocking
ch4 = make(chan struct{}) // non-nil, closed
ch1 <-chan struct{} // always nil, blocking
ch2 = make(chan struct{}) // non-nil, blocking
ch4 = make(chan struct{}) // non-nil, closed
ctx = context.Background()
fin = func() context.Context {
c, cancel := context.WithCancel(context.Background())
cancel()
return c
}()
)
ch3 <- struct{}{}
close(ch4)
// TODO(jdef): unit test for OverflowBackpressure
for ti, tc := range []struct {
ch <-chan struct{}
wantsRuleCount []int
ctx context.Context
ch <-chan struct{}
over Overflow
wantsError int // bitmask: lower 4 bits, one for each case; first case = highest bit
wantsRuleCount []int
wantsChainCount []int
}{
{ch1, []int{0, 0, 0, 0}},
{ch2, []int{0, 0, 0, 0}},
{ch3, []int{1, 1, 1, 1}},
{ch4, []int{1, 2, 2, 2}},
{ctx, ch1, OverflowSkip, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{fin, ch1, OverflowSkip, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, ch2, OverflowSkip, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{ctx, o(), OverflowSkip, 0x0, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}},
{ctx, ch4, OverflowSkip, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{ctx, ch1, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{fin, ch1, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, ch2, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{ctx, o(), OverflowSkipWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}},
{ctx, ch4, OverflowSkipWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{ctx, ch1, OverflowDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{fin, ch1, OverflowDiscard, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, ch2, OverflowDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, o(), OverflowDiscard, 0x0, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}},
{ctx, ch4, OverflowDiscard, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{ctx, ch1, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{fin, ch1, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, ch2, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, o(), OverflowDiscardWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}},
{ctx, ch4, OverflowDiscardWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},
} {
var (
i, j int
p = prototype()
ctx = context.Background()
r1 = counter(&i).RateLimit(tc.ch, OverflowSkipRule).Eval
r2 = Rule(nil).RateLimit(tc.ch, OverflowSkipRule).Eval
r1 = counter(&i).RateLimit(tc.ch, tc.over).Eval
r2 = Rule(nil).RateLimit(tc.ch, tc.over).Eval // a nil rule still invokes the chain
)
var zp = &mesos.ResponseWrapper{}
for k, r := range []Rule{r1, r2} {
// execute each rule twice
for x := 0; x < 2; x++ {
_, e, zz, err := r(ctx, p, zp, nil, chainCounter(&j, chainIdentity))
_, e, zz, err := r(tc.ctx, p, zp, nil, chainCounter(&j, chainIdentity))
if e != p {
t.Errorf("test case %d failed: expected event %q instead of %q", ti, p, e)
}
if zz != zp {
t.Errorf("expected return object %q instead of %q", zp, zz)
}
if err != nil {
t.Errorf("test case %d failed: unexpected error %v", ti, err)
if b := 8 >> uint(k*2+x); ((b & tc.wantsError) != 0) != (err != nil) {
t.Errorf("test case (%d,%d,%d) failed: unexpected error %v", ti, k, x, err)
}
if y := tc.wantsRuleCount[k*2+x]; i != y {
t.Errorf("test case (%d,%d,%d) failed: expected count of %d instead of %d",
ti, k, x, y, i)
}
if y := (k * 2) + x + 1; j != y {
t.Errorf("test case %d failed: expected chain count of %d instead of %d",
ti, y, j)
if y := tc.wantsChainCount[k*2+x]; j != y {
t.Errorf("test case (%d,%d,%d) failed: expected chain count of %d instead of %d",
ti, k, x, y, j)
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions api/v1/lib/extras/executor/eventrules/eventrules_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,19 +200,19 @@ const (
OverflowDiscardWithError
// OverflowBackpressure waits until the rule may execute, or the context is canceled.
OverflowBackpressure
// OverflowSkipRule skips over the decorated rule and continues processing the rule chain
OverflowSkipRule
// OverflowSkipRuleWithError skips over the decorated rule and merges ErrOverflow upon executing the chain
OverflowSkipRuleWithError
// OverflowSkip skips over the decorated rule and continues processing the rule chain
OverflowSkip
// OverflowSkipWithError skips over the decorated rule and merges ErrOverflow upon executing the chain
OverflowSkipWithError
)

var ErrOverflow = errors.New("overflow: rate limit exceeded")

// RateLimit invokes the receiving Rule if the chan is readable (may be closed), otherwise it handles the "overflow"
// according to the specified Overflow policy. May be useful, for example, when rate-limiting logged events.
// A nil chan will always skip the rule.
// Returns nil (noop) if the receiver is nil, otherwise a nil chan will always trigger an overflow.
func (r Rule) RateLimit(p <-chan struct{}, over Overflow) Rule {
if p == nil || r == nil {
if r == nil {
return nil
}
return func(ctx context.Context, e *executor.Event, err error, ch Chain) (context.Context, *executor.Event, error) {
Expand Down Expand Up @@ -243,9 +243,9 @@ func (r Rule) RateLimit(p <-chan struct{}, over Overflow) Rule {
return ctx, e, Error2(err, ErrOverflow)
case OverflowDiscard:
return ctx, e, err
case OverflowSkipRuleWithError:
case OverflowSkipWithError:
return ch(ctx, e, Error2(err, ErrOverflow))
case OverflowSkipRule:
case OverflowSkip:
return ch(ctx, e, err)
default:
panic(fmt.Sprintf("unexpected Overflow type: %#v", over))
Expand Down
74 changes: 54 additions & 20 deletions api/v1/lib/extras/executor/eventrules/eventrules_generated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,46 +441,80 @@ func TestOnce(t *testing.T) {
}

func TestRateLimit(t *testing.T) {
// non-blocking, then blocking
o := func() <-chan struct{} {
x := make(chan struct{}, 1)
x <- struct{}{}
return x
}
var (
ch1 <-chan struct{} // always nil
ch2 = make(chan struct{}) // non-nil, blocking
ch3 = make(chan struct{}, 1) // non-nil, non-blocking then blocking
ch4 = make(chan struct{}) // non-nil, closed
ch1 <-chan struct{} // always nil, blocking
ch2 = make(chan struct{}) // non-nil, blocking
ch4 = make(chan struct{}) // non-nil, closed
ctx = context.Background()
fin = func() context.Context {
c, cancel := context.WithCancel(context.Background())
cancel()
return c
}()
)
ch3 <- struct{}{}
close(ch4)
// TODO(jdef): unit test for OverflowBackpressure
for ti, tc := range []struct {
ch <-chan struct{}
wantsRuleCount []int
ctx context.Context
ch <-chan struct{}
over Overflow
wantsError int // bitmask: lower 4 bits, one for each case; first case = highest bit
wantsRuleCount []int
wantsChainCount []int
}{
{ch1, []int{0, 0, 0, 0}},
{ch2, []int{0, 0, 0, 0}},
{ch3, []int{1, 1, 1, 1}},
{ch4, []int{1, 2, 2, 2}},
{ctx, ch1, OverflowSkip, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{fin, ch1, OverflowSkip, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, ch2, OverflowSkip, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{ctx, o(), OverflowSkip, 0x0, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}},
{ctx, ch4, OverflowSkip, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{ctx, ch1, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{fin, ch1, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, ch2, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{ctx, o(), OverflowSkipWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}},
{ctx, ch4, OverflowSkipWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{ctx, ch1, OverflowDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{fin, ch1, OverflowDiscard, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, ch2, OverflowDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, o(), OverflowDiscard, 0x0, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}},
{ctx, ch4, OverflowDiscard, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{ctx, ch1, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{fin, ch1, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, ch2, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, o(), OverflowDiscardWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}},
{ctx, ch4, OverflowDiscardWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},
} {
var (
i, j int
p = prototype()
ctx = context.Background()
r1 = counter(&i).RateLimit(tc.ch, OverflowSkipRule).Eval
r2 = Rule(nil).RateLimit(tc.ch, OverflowSkipRule).Eval
r1 = counter(&i).RateLimit(tc.ch, tc.over).Eval
r2 = Rule(nil).RateLimit(tc.ch, tc.over).Eval // a nil rule still invokes the chain
)
for k, r := range []Rule{r1, r2} {
// execute each rule twice
for x := 0; x < 2; x++ {
_, e, err := r(ctx, p, nil, chainCounter(&j, chainIdentity))
_, e, err := r(tc.ctx, p, nil, chainCounter(&j, chainIdentity))
if e != p {
t.Errorf("test case %d failed: expected event %q instead of %q", ti, p, e)
}
if err != nil {
t.Errorf("test case %d failed: unexpected error %v", ti, err)
if b := 8 >> uint(k*2+x); ((b & tc.wantsError) != 0) != (err != nil) {
t.Errorf("test case (%d,%d,%d) failed: unexpected error %v", ti, k, x, err)
}
if y := tc.wantsRuleCount[k*2+x]; i != y {
t.Errorf("test case (%d,%d,%d) failed: expected count of %d instead of %d",
ti, k, x, y, i)
}
if y := (k * 2) + x + 1; j != y {
t.Errorf("test case %d failed: expected chain count of %d instead of %d",
ti, y, j)
if y := tc.wantsChainCount[k*2+x]; j != y {
t.Errorf("test case (%d,%d,%d) failed: expected chain count of %d instead of %d",
ti, k, x, y, j)
}
}
}
Expand Down
Loading

0 comments on commit a944cc1

Please sign in to comment.