Skip to content

Commit

Permalink
rules: make ChainIdentity public; simplify Overflow and RateLimit, mo…
Browse files Browse the repository at this point in the history
…re unit testing
  • Loading branch information
James DeFelice committed Jun 10, 2017
1 parent 289fdd0 commit 38fa54f
Show file tree
Hide file tree
Showing 16 changed files with 715 additions and 428 deletions.
2 changes: 1 addition & 1 deletion api/v1/lib/extras/executor/callrules/callers_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (r Rule) Call(ctx context.Context, c *executor.Call) (mesos.Response, error
if r == nil {
return nil, nil
}
_, _, resp, err := r(ctx, c, nil, nil, chainIdentity)
_, _, resp, err := r(ctx, c, nil, nil, ChainIdentity)
return resp, err
}

Expand Down
103 changes: 62 additions & 41 deletions api/v1/lib/extras/executor/callrules/callrules_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package callrules

import (
"context"
"errors"
"fmt"
"sync"

Expand Down Expand Up @@ -44,13 +43,13 @@ type (
var (
_ = evaler(Rule(nil))
_ = evaler(Rules{})

// chainIdentity is a Chain that returns the arguments as its results.
chainIdentity = func(ctx context.Context, e *executor.Call, z mesos.Response, err error) (context.Context, *executor.Call, mesos.Response, error) {
return ctx, e, z, err
}
)

// ChainIdentity is a Chain that returns the arguments as its results.
func ChainIdentity(ctx context.Context, e *executor.Call, z mesos.Response, err error) (context.Context, *executor.Call, mesos.Response, error) {
return ctx, e, z, err
}

// Eval is a convenience func that processes a nil Rule as a noop.
func (r Rule) Eval(ctx context.Context, e *executor.Call, z mesos.Response, err error, ch Chain) (context.Context, *executor.Call, mesos.Response, error) {
if r != nil {
Expand All @@ -69,7 +68,7 @@ func (rs Rules) Eval(ctx context.Context, e *executor.Call, z mesos.Response, er
// from Rule to Rule. Chain is safe to invoke concurrently.
func (rs Rules) Chain() Chain {
if len(rs) == 0 {
return chainIdentity
return ChainIdentity
}
return func(ctx context.Context, e *executor.Call, z mesos.Response, err error) (context.Context, *executor.Call, mesos.Response, error) {
return rs[0].Eval(ctx, e, z, err, rs[1:].Chain())
Expand Down Expand Up @@ -212,44 +211,65 @@ func (r Rule) UnlessDone() Rule {
type Overflow int

const (
// OverflowDiscard aborts the rule chain and returns the current state
OverflowDiscard Overflow = iota
// OverflowDiscardWithError aborts the rule chain and returns the current state merged with ErrOverflow
OverflowDiscardWithError
// OverflowBackpressure waits until the rule may execute, or the context is canceled.
OverflowBackpressure
// OverflowSkip skips over the decorated rule and continues processing the rule chain
OverflowSkip
// OverflowSkipWithError skips over the decorated rule and merges ErrOverflow upon executing the chain
OverflowSkipWithError
// OverflowWait waits until the rule may execute, or the context is canceled.
OverflowWait Overflow = iota
// OverflowOtherwise skips over the decorated rule and invoke an alternative instead.
OverflowOtherwise
)

var ErrOverflow = errors.New("overflow: rate limit exceeded")

// RateLimit invokes the receiving Rule if the chan is readable (may be closed), otherwise it handles the "overflow"
// RateLimit invokes the receiving Rule if a read of chan "p" succeeds (may be closed), otherwise proceeds
// according to the specified Overflow policy. May be useful, for example, when rate-limiting logged events.
// Returns nil (noop) if the receiver is nil, otherwise a nil chan will always trigger an overflow.
func (r Rule) RateLimit(p <-chan struct{}, over Overflow) Rule {
// Returns nil (noop) if the receiver is nil, otherwise a nil chan will trigger an overflow.
// Panics when OverflowWait is specified with a nil chan, in order to prevent deadlock.
func (r Rule) RateLimit(p <-chan struct{}, over Overflow, otherwise Rule) Rule {
if r != nil && p == nil && over == OverflowWait {
panic("deadlock detected: reads from token chan will permanently block rule processing")
}
return rateLimit(r, acquireFunc(p), over, otherwise)
}

// acquireFunc wraps a signal chan with a func that can be used with rateLimit.
// should only be called by RateLimit (because it implements deadlock detection).
func acquireFunc(tokenCh <-chan struct{}) func(bool) bool {
if tokenCh == nil {
// always false/blocked: acquire never succeeds
return func(block bool) bool {
if block {
panic("deadlock detected: block should never be true when the token chan is nil")
}
return false
}
}
return func(block bool) bool {
if block {
<-tokenCh
return true
}
select {
case <-tokenCh:
return true
default:
return false
}
}
}

// rateLimit is more easily unit tested than RateLimit.
func rateLimit(r Rule, acquire func(block bool) bool, over Overflow, otherwise Rule) Rule {
if r == nil {
return nil
}
if acquire == nil {
panic("acquire func is not allowed to be nil")
}
return func(ctx context.Context, e *executor.Call, z mesos.Response, err error, ch Chain) (context.Context, *executor.Call, mesos.Response, error) {
select {
case <-p:
// continue
default:
// overflow
if !acquire(false) {
// non-blocking acquire failed, check overflow policy
switch over {
case OverflowBackpressure:
<-p
case OverflowDiscardWithError:
return ctx, e, z, Error2(err, ErrOverflow)
case OverflowDiscard:
return ctx, e, z, err
case OverflowSkipWithError:
return ch(ctx, e, z, Error2(err, ErrOverflow))
case OverflowSkip:
return ch(ctx, e, z, err)
case OverflowWait:
_ = acquire(true) // block until there's a signal
case OverflowOtherwise:
return otherwise.Eval(ctx, e, z, err, ch)
default:
panic(fmt.Sprintf("unexpected Overflow type: %#v", over))
}
Expand All @@ -259,8 +279,9 @@ func (r Rule) RateLimit(p <-chan struct{}, over Overflow) Rule {
}

// EveryN invokes the receiving rule beginning with the first event seen and then every n'th
// time after that. If nthTime is less then 2 then this call is a noop (the receiver is returned).
func (r Rule) EveryN(nthTime int) Rule {
// time after that. If nthTime is less then 2 then the receiver is returned, undecorated.
// The "otherwise" Rule (may be null) is invoked for every event in between the n'th invocations.
func (r Rule) EveryN(nthTime int, otherwise Rule) Rule {
if nthTime < 2 || r == nil {
return r
}
Expand All @@ -283,7 +304,7 @@ func (r Rule) EveryN(nthTime int) Rule {
if forward() {
return r(ctx, e, z, err, ch)
}
return ch(ctx, e, z, err)
return otherwise.Eval(ctx, e, z, err, ch)
}
}

Expand All @@ -295,7 +316,7 @@ func Drop() Rule {
// ThenDrop executes the receiving rule, but aborts the Chain, and returns the (context.Context, *executor.Call, mesos.Response, error) tuple as-is.
func (r Rule) ThenDrop() Rule {
return func(ctx context.Context, e *executor.Call, z mesos.Response, err error, _ Chain) (context.Context, *executor.Call, mesos.Response, error) {
return r.Eval(ctx, e, z, err, chainIdentity)
return r.Eval(ctx, e, z, err, ChainIdentity)
}
}

Expand Down
Loading

0 comments on commit 38fa54f

Please sign in to comment.