Skip to content

Commit

Permalink
rules: unit test and fix for cancelled context r/ RateLimit
Browse files Browse the repository at this point in the history
  • Loading branch information
James DeFelice committed Jun 11, 2017
1 parent 15c8550 commit d7478ca
Show file tree
Hide file tree
Showing 9 changed files with 255 additions and 90 deletions.
34 changes: 18 additions & 16 deletions api/v1/lib/extras/executor/callrules/callrules_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,8 @@ const (
// according to the specified Overflow policy. May be useful, for example, when rate-limiting logged events.
// Returns nil (noop) if the receiver is nil, otherwise a nil chan will normally trigger an overflow.
// Panics when OverflowWait is specified with a nil chan, in order to prevent deadlock.
// A cancelled context will trigger the "otherwise" rule.
func (r Rule) RateLimit(p <-chan struct{}, over Overflow, otherwise Rule) Rule {
if r != nil && p == nil && over == OverflowWait {
panic("deadlock detected: reads from token chan will permanently block rule processing")
}
return limit(r, acquireChan(p), over, otherwise)
}

Expand All @@ -248,10 +246,15 @@ func acquireChan(tokenCh <-chan struct{}) func(context.Context, bool) bool {
if block {
select {
case <-tokenCh:
return true
// tie breaker prefers Done
select {
case <-ctx.Done():
default:
return true
}
case <-ctx.Done():
return false
}
return false
}
select {
case <-tokenCh:
Expand All @@ -273,29 +276,28 @@ func limit(r Rule, acquire func(_ context.Context, block bool) bool, over Overfl
if acquire == nil {
panic("acquire func is not allowed to be nil")
}
blocking := false
switch over {
case OverflowWait:
return func(ctx context.Context, e *executor.Call, z mesos.Response, err error, ch Chain) (context.Context, *executor.Call, mesos.Response, error) {
_ = acquire(ctx, true) // block until there's a signal
return r(ctx, e, z, err, ch)
}
case OverflowOtherwise:
return func(ctx context.Context, e *executor.Call, z mesos.Response, err error, ch Chain) (context.Context, *executor.Call, mesos.Response, error) {
if !acquire(ctx, false) {
return otherwise.Eval(ctx, e, z, err, ch)
}
return r(ctx, e, z, err, ch)
}
case OverflowWait:
blocking = true
default:
panic(fmt.Sprintf("unexpected Overflow type: %#v", over))
}
return func(ctx context.Context, e *executor.Call, z mesos.Response, err error, ch Chain) (context.Context, *executor.Call, mesos.Response, error) {
if !acquire(ctx, blocking) {
return otherwise.Eval(ctx, e, z, err, ch)
}
return r(ctx, e, z, err, ch)
}
}

/* TODO(jdef) not sure that this is very useful, leaving out for now...
// EveryN invokes the receiving rule beginning with the first event seen and then every n'th
// time after that. If nthTime is less then 2 then the receiver is returned, undecorated.
// The "otherwise" Rule (may be null) is invoked for every event in between the n'th invocations.
// A cancelled context will trigger the "otherwise" rule.
func (r Rule) EveryN(nthTime int, otherwise Rule) Rule {
if nthTime < 2 || r == nil {
return r
Expand Down
35 changes: 33 additions & 2 deletions api/v1/lib/extras/executor/callrules/callrules_generated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,9 +568,15 @@ func TestRateLimit(t *testing.T) {
var (
ch1 <-chan struct{} // always nil, blocking
ch2 = make(chan struct{}) // non-nil, blocking
// ch3 is o()
ch4 = make(chan struct{}) // non-nil, closed
ctx = context.Background()
p = prototype()
ctx = context.Background()
fin = func() context.Context {
c, cancel := context.WithCancel(context.Background())
cancel()
return c
}()

errOverflow = errors.New("overflow")
otherwiseSkip = Rule(nil)
Expand All @@ -581,6 +587,7 @@ func TestRateLimit(t *testing.T) {
var zp = &mesos.ResponseWrapper{}
close(ch4)
for ti, tc := range []struct {
// each set of inputs is executed 4 times: twice for r1, twice for r2
ctx context.Context
ch <-chan struct{}
over Overflow
Expand All @@ -594,21 +601,45 @@ func TestRateLimit(t *testing.T) {
{ctx, o(), OverflowOtherwise, otherwiseSkip, 0x0, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}},
{ctx, ch4, OverflowOtherwise, otherwiseSkip, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{fin, ch1, OverflowOtherwise, otherwiseSkip, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{fin, ch2, OverflowOtherwise, otherwiseSkip, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{fin, o(), OverflowOtherwise, otherwiseSkip, 0x0, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}},
{fin, ch4, OverflowOtherwise, otherwiseSkip, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{ctx, ch1, OverflowOtherwise, otherwiseSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{ctx, ch2, OverflowOtherwise, otherwiseSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{ctx, o(), OverflowOtherwise, otherwiseSkipWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}},
{ctx, ch4, OverflowOtherwise, otherwiseSkipWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{fin, ch1, OverflowOtherwise, otherwiseSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{fin, ch2, OverflowOtherwise, otherwiseSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{fin, o(), OverflowOtherwise, otherwiseSkipWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}},
{fin, ch4, OverflowOtherwise, otherwiseSkipWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{ctx, ch1, OverflowOtherwise, otherwiseDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, ch2, OverflowOtherwise, otherwiseDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, o(), OverflowOtherwise, otherwiseDiscard, 0x0, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}},
{ctx, ch4, OverflowOtherwise, otherwiseDiscard, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{fin, ch1, OverflowOtherwise, otherwiseDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{fin, ch2, OverflowOtherwise, otherwiseDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{fin, o(), OverflowOtherwise, otherwiseDiscard, 0x0, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}},
{fin, ch4, OverflowOtherwise, otherwiseDiscard, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{ctx, ch1, OverflowOtherwise, otherwiseDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, ch2, OverflowOtherwise, otherwiseDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, o(), OverflowOtherwise, otherwiseDiscardWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}},
{ctx, ch4, OverflowOtherwise, otherwiseDiscardWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{fin, ch1, OverflowOtherwise, otherwiseDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{fin, ch2, OverflowOtherwise, otherwiseDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{fin, o(), OverflowOtherwise, otherwiseDiscardWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}},
{fin, ch4, OverflowOtherwise, otherwiseDiscardWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{fin, ch1, OverflowWait, nil, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{fin, ch2, OverflowWait, nil, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{fin, o(), OverflowWait, nil, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{fin, ch4, OverflowWait, nil, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{ctx, ch4, OverflowWait, nil, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},
} {
var (
Expand Down Expand Up @@ -660,7 +691,7 @@ func TestRateLimit(t *testing.T) {
didPanic := false
func() {
defer func() { didPanic = recover() != nil }()
Rule(Rule(nil).Eval).RateLimit(nil, OverflowWait, nil)
Rule(Rule(nil).Eval).RateLimit(nil, OverflowWait, nil).Eval(ctx, p, zp, nil, ChainIdentity)
}()
if !didPanic {
t.Error("expected panic because we configured a rule to deadlock")
Expand Down
34 changes: 18 additions & 16 deletions api/v1/lib/extras/executor/eventrules/eventrules_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,8 @@ const (
// according to the specified Overflow policy. May be useful, for example, when rate-limiting logged events.
// Returns nil (noop) if the receiver is nil, otherwise a nil chan will normally trigger an overflow.
// Panics when OverflowWait is specified with a nil chan, in order to prevent deadlock.
// A cancelled context will trigger the "otherwise" rule.
func (r Rule) RateLimit(p <-chan struct{}, over Overflow, otherwise Rule) Rule {
if r != nil && p == nil && over == OverflowWait {
panic("deadlock detected: reads from token chan will permanently block rule processing")
}
return limit(r, acquireChan(p), over, otherwise)
}

Expand All @@ -247,10 +245,15 @@ func acquireChan(tokenCh <-chan struct{}) func(context.Context, bool) bool {
if block {
select {
case <-tokenCh:
return true
// tie breaker prefers Done
select {
case <-ctx.Done():
default:
return true
}
case <-ctx.Done():
return false
}
return false
}
select {
case <-tokenCh:
Expand All @@ -272,29 +275,28 @@ func limit(r Rule, acquire func(_ context.Context, block bool) bool, over Overfl
if acquire == nil {
panic("acquire func is not allowed to be nil")
}
blocking := false
switch over {
case OverflowWait:
return func(ctx context.Context, e *executor.Event, err error, ch Chain) (context.Context, *executor.Event, error) {
_ = acquire(ctx, true) // block until there's a signal
return r(ctx, e, err, ch)
}
case OverflowOtherwise:
return func(ctx context.Context, e *executor.Event, err error, ch Chain) (context.Context, *executor.Event, error) {
if !acquire(ctx, false) {
return otherwise.Eval(ctx, e, err, ch)
}
return r(ctx, e, err, ch)
}
case OverflowWait:
blocking = true
default:
panic(fmt.Sprintf("unexpected Overflow type: %#v", over))
}
return func(ctx context.Context, e *executor.Event, err error, ch Chain) (context.Context, *executor.Event, error) {
if !acquire(ctx, blocking) {
return otherwise.Eval(ctx, e, err, ch)
}
return r(ctx, e, err, ch)
}
}

/* TODO(jdef) not sure that this is very useful, leaving out for now...
// EveryN invokes the receiving rule beginning with the first event seen and then every n'th
// time after that. If nthTime is less then 2 then the receiver is returned, undecorated.
// The "otherwise" Rule (may be null) is invoked for every event in between the n'th invocations.
// A cancelled context will trigger the "otherwise" rule.
func (r Rule) EveryN(nthTime int, otherwise Rule) Rule {
if nthTime < 2 || r == nil {
return r
Expand Down
35 changes: 33 additions & 2 deletions api/v1/lib/extras/executor/eventrules/eventrules_generated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,9 +502,15 @@ func TestRateLimit(t *testing.T) {
var (
ch1 <-chan struct{} // always nil, blocking
ch2 = make(chan struct{}) // non-nil, blocking
// ch3 is o()
ch4 = make(chan struct{}) // non-nil, closed
ctx = context.Background()
p = prototype()
ctx = context.Background()
fin = func() context.Context {
c, cancel := context.WithCancel(context.Background())
cancel()
return c
}()

errOverflow = errors.New("overflow")
otherwiseSkip = Rule(nil)
Expand All @@ -514,6 +520,7 @@ func TestRateLimit(t *testing.T) {
)
close(ch4)
for ti, tc := range []struct {
// each set of inputs is executed 4 times: twice for r1, twice for r2
ctx context.Context
ch <-chan struct{}
over Overflow
Expand All @@ -527,21 +534,45 @@ func TestRateLimit(t *testing.T) {
{ctx, o(), OverflowOtherwise, otherwiseSkip, 0x0, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}},
{ctx, ch4, OverflowOtherwise, otherwiseSkip, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{fin, ch1, OverflowOtherwise, otherwiseSkip, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{fin, ch2, OverflowOtherwise, otherwiseSkip, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{fin, o(), OverflowOtherwise, otherwiseSkip, 0x0, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}},
{fin, ch4, OverflowOtherwise, otherwiseSkip, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{ctx, ch1, OverflowOtherwise, otherwiseSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{ctx, ch2, OverflowOtherwise, otherwiseSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{ctx, o(), OverflowOtherwise, otherwiseSkipWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}},
{ctx, ch4, OverflowOtherwise, otherwiseSkipWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{fin, ch1, OverflowOtherwise, otherwiseSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{fin, ch2, OverflowOtherwise, otherwiseSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{fin, o(), OverflowOtherwise, otherwiseSkipWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}},
{fin, ch4, OverflowOtherwise, otherwiseSkipWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{ctx, ch1, OverflowOtherwise, otherwiseDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, ch2, OverflowOtherwise, otherwiseDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, o(), OverflowOtherwise, otherwiseDiscard, 0x0, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}},
{ctx, ch4, OverflowOtherwise, otherwiseDiscard, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{fin, ch1, OverflowOtherwise, otherwiseDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{fin, ch2, OverflowOtherwise, otherwiseDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{fin, o(), OverflowOtherwise, otherwiseDiscard, 0x0, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}},
{fin, ch4, OverflowOtherwise, otherwiseDiscard, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{ctx, ch1, OverflowOtherwise, otherwiseDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, ch2, OverflowOtherwise, otherwiseDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, o(), OverflowOtherwise, otherwiseDiscardWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}},
{ctx, ch4, OverflowOtherwise, otherwiseDiscardWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{fin, ch1, OverflowOtherwise, otherwiseDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{fin, ch2, OverflowOtherwise, otherwiseDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{fin, o(), OverflowOtherwise, otherwiseDiscardWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}},
{fin, ch4, OverflowOtherwise, otherwiseDiscardWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{fin, ch1, OverflowWait, nil, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{fin, ch2, OverflowWait, nil, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{fin, o(), OverflowWait, nil, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{fin, ch4, OverflowWait, nil, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{ctx, ch4, OverflowWait, nil, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},
} {
var (
Expand Down Expand Up @@ -587,7 +618,7 @@ func TestRateLimit(t *testing.T) {
didPanic := false
func() {
defer func() { didPanic = recover() != nil }()
Rule(Rule(nil).Eval).RateLimit(nil, OverflowWait, nil)
Rule(Rule(nil).Eval).RateLimit(nil, OverflowWait, nil).Eval(ctx, p, nil, ChainIdentity)
}()
if !didPanic {
t.Error("expected panic because we configured a rule to deadlock")
Expand Down
Loading

0 comments on commit d7478ca

Please sign in to comment.