Skip to content

Commit

Permalink
rules: separate UnlessDone from RateLimit, refactor tests
Browse files Browse the repository at this point in the history
  • Loading branch information
James DeFelice committed Jun 9, 2017
1 parent a944cc1 commit 289fdd0
Show file tree
Hide file tree
Showing 9 changed files with 361 additions and 135 deletions.
37 changes: 20 additions & 17 deletions api/v1/lib/extras/executor/callrules/callrules_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,23 @@ func (r Rule) Once() Rule {
}
}

// UnlessDone returns a decorated rule that checks context.Done: if the context has been canceled then the rule chain
// is aborted and the context.Err is merged with the current error state.
// Returns nil (noop) if the receiving Rule is nil.
func (r Rule) UnlessDone() Rule {
if r == nil {
return nil
}
return func(ctx context.Context, e *executor.Call, z mesos.Response, err error, ch Chain) (context.Context, *executor.Call, mesos.Response, error) {
select {
case <-ctx.Done():
return ctx, e, z, Error2(err, ctx.Err())
default:
return r(ctx, e, z, err, ch)
}
}
}

type Overflow int

const (
Expand All @@ -217,29 +234,14 @@ func (r Rule) RateLimit(p <-chan struct{}, over Overflow) Rule {
return nil
}
return func(ctx context.Context, e *executor.Call, z mesos.Response, err error, ch Chain) (context.Context, *executor.Call, mesos.Response, error) {
checkTieBreaker := func() (context.Context, *executor.Call, mesos.Response, error) {
select {
case <-ctx.Done():
return ctx, e, z, Error2(err, ctx.Err())
default:
return r(ctx, e, z, err, ch)
}
}
select {
case <-p:
return checkTieBreaker()
case <-ctx.Done():
return ctx, e, z, Error2(err, ctx.Err())
// continue
default:
// overflow
switch over {
case OverflowBackpressure:
select {
case <-p:
return checkTieBreaker()
case <-ctx.Done():
return ctx, e, z, Error2(err, ctx.Err())
}
<-p
case OverflowDiscardWithError:
return ctx, e, z, Error2(err, ErrOverflow)
case OverflowDiscard:
Expand All @@ -252,6 +254,7 @@ func (r Rule) RateLimit(p <-chan struct{}, over Overflow) Rule {
panic(fmt.Sprintf("unexpected Overflow type: %#v", over))
}
}
return r(ctx, e, z, err, ch)
}
}

Expand Down
63 changes: 53 additions & 10 deletions api/v1/lib/extras/executor/callrules/callrules_generated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,56 @@ func TestError2(t *testing.T) {
}
}

func TestUnlessDone(t *testing.T) {
var (
p = prototype()
ctx = context.Background()
fin = func() context.Context {
c, cancel := context.WithCancel(context.Background())
cancel()
return c
}()
)
var zp = &mesos.ResponseWrapper{}
for ti, tc := range []struct {
ctx context.Context
wantsError []error
wantsRuleCount []int
wantsChainCount []int
}{
{ctx, []error{nil, nil}, []int{1, 2}, []int{1, 2}},
{fin, []error{nil, context.Canceled}, []int{1, 1}, []int{1, 1}},
} {
var (
i, j int
r1 = counter(&i)
r2 = r1.UnlessDone()
)
for k, r := range []Rule{r1, r2} {
_, e, zz, err := r(tc.ctx, p, zp, nil, chainCounter(&j, chainIdentity))
if e != p {
t.Errorf("test case %d failed: expected event %q instead of %q", ti, p, e)
}
if zz != zp {
t.Errorf("test case %d failed: expected return object %q instead of %q", ti, zp, zz)
}
if err != tc.wantsError[k] {
t.Errorf("test case %d failed: unexpected error %v", ti, err)
}
if i != tc.wantsRuleCount[k] {
t.Errorf("test case %d failed: expected count of %d instead of %d", ti, tc.wantsRuleCount[k], i)
}
if j != tc.wantsChainCount[k] {
t.Errorf("test case %d failed: expected chain count of %d instead of %d", ti, tc.wantsRuleCount[k], j)
}
}
}
r := Rule(nil).UnlessDone()
if r != nil {
t.Error("expected nil result from UnlessDone")
}
}

func TestAndThen(t *testing.T) {
var (
i, j int
Expand Down Expand Up @@ -514,14 +564,8 @@ func TestRateLimit(t *testing.T) {
ch2 = make(chan struct{}) // non-nil, blocking
ch4 = make(chan struct{}) // non-nil, closed
ctx = context.Background()
fin = func() context.Context {
c, cancel := context.WithCancel(context.Background())
cancel()
return c
}()
)
close(ch4)
// TODO(jdef): unit test for OverflowBackpressure
for ti, tc := range []struct {
ctx context.Context
ch <-chan struct{}
Expand All @@ -531,28 +575,27 @@ func TestRateLimit(t *testing.T) {
wantsChainCount []int
}{
{ctx, ch1, OverflowSkip, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{fin, ch1, OverflowSkip, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, ch2, OverflowSkip, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{ctx, o(), OverflowSkip, 0x0, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}},
{ctx, ch4, OverflowSkip, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{ctx, ch1, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{fin, ch1, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, ch2, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{ctx, o(), OverflowSkipWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}},
{ctx, ch4, OverflowSkipWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{ctx, ch1, OverflowDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{fin, ch1, OverflowDiscard, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, ch2, OverflowDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, o(), OverflowDiscard, 0x0, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}},
{ctx, ch4, OverflowDiscard, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{ctx, ch1, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{fin, ch1, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, ch2, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, o(), OverflowDiscardWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}},
{ctx, ch4, OverflowDiscardWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

// TODO(jdef): test OverflowBackpressure (blocking)
{ctx, ch4, OverflowBackpressure, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},
} {
var (
i, j int
Expand Down
37 changes: 20 additions & 17 deletions api/v1/lib/extras/executor/eventrules/eventrules_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,23 @@ func (r Rule) Once() Rule {
}
}

// UnlessDone returns a decorated rule that checks context.Done: if the context has been canceled then the rule chain
// is aborted and the context.Err is merged with the current error state.
// Returns nil (noop) if the receiving Rule is nil.
func (r Rule) UnlessDone() Rule {
if r == nil {
return nil
}
return func(ctx context.Context, e *executor.Event, err error, ch Chain) (context.Context, *executor.Event, error) {
select {
case <-ctx.Done():
return ctx, e, Error2(err, ctx.Err())
default:
return r(ctx, e, err, ch)
}
}
}

type Overflow int

const (
Expand All @@ -216,29 +233,14 @@ func (r Rule) RateLimit(p <-chan struct{}, over Overflow) Rule {
return nil
}
return func(ctx context.Context, e *executor.Event, err error, ch Chain) (context.Context, *executor.Event, error) {
checkTieBreaker := func() (context.Context, *executor.Event, error) {
select {
case <-ctx.Done():
return ctx, e, Error2(err, ctx.Err())
default:
return r(ctx, e, err, ch)
}
}
select {
case <-p:
return checkTieBreaker()
case <-ctx.Done():
return ctx, e, Error2(err, ctx.Err())
// continue
default:
// overflow
switch over {
case OverflowBackpressure:
select {
case <-p:
return checkTieBreaker()
case <-ctx.Done():
return ctx, e, Error2(err, ctx.Err())
}
<-p
case OverflowDiscardWithError:
return ctx, e, Error2(err, ErrOverflow)
case OverflowDiscard:
Expand All @@ -251,6 +253,7 @@ func (r Rule) RateLimit(p <-chan struct{}, over Overflow) Rule {
panic(fmt.Sprintf("unexpected Overflow type: %#v", over))
}
}
return r(ctx, e, err, ch)
}
}

Expand Down
59 changes: 49 additions & 10 deletions api/v1/lib/extras/executor/eventrules/eventrules_generated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,52 @@ func TestError2(t *testing.T) {
}
}

func TestUnlessDone(t *testing.T) {
var (
p = prototype()
ctx = context.Background()
fin = func() context.Context {
c, cancel := context.WithCancel(context.Background())
cancel()
return c
}()
)
for ti, tc := range []struct {
ctx context.Context
wantsError []error
wantsRuleCount []int
wantsChainCount []int
}{
{ctx, []error{nil, nil}, []int{1, 2}, []int{1, 2}},
{fin, []error{nil, context.Canceled}, []int{1, 1}, []int{1, 1}},
} {
var (
i, j int
r1 = counter(&i)
r2 = r1.UnlessDone()
)
for k, r := range []Rule{r1, r2} {
_, e, err := r(tc.ctx, p, nil, chainCounter(&j, chainIdentity))
if e != p {
t.Errorf("test case %d failed: expected event %q instead of %q", ti, p, e)
}
if err != tc.wantsError[k] {
t.Errorf("test case %d failed: unexpected error %v", ti, err)
}
if i != tc.wantsRuleCount[k] {
t.Errorf("test case %d failed: expected count of %d instead of %d", ti, tc.wantsRuleCount[k], i)
}
if j != tc.wantsChainCount[k] {
t.Errorf("test case %d failed: expected chain count of %d instead of %d", ti, tc.wantsRuleCount[k], j)
}
}
}
r := Rule(nil).UnlessDone()
if r != nil {
t.Error("expected nil result from UnlessDone")
}
}

func TestAndThen(t *testing.T) {
var (
i, j int
Expand Down Expand Up @@ -452,14 +498,8 @@ func TestRateLimit(t *testing.T) {
ch2 = make(chan struct{}) // non-nil, blocking
ch4 = make(chan struct{}) // non-nil, closed
ctx = context.Background()
fin = func() context.Context {
c, cancel := context.WithCancel(context.Background())
cancel()
return c
}()
)
close(ch4)
// TODO(jdef): unit test for OverflowBackpressure
for ti, tc := range []struct {
ctx context.Context
ch <-chan struct{}
Expand All @@ -469,28 +509,27 @@ func TestRateLimit(t *testing.T) {
wantsChainCount []int
}{
{ctx, ch1, OverflowSkip, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{fin, ch1, OverflowSkip, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, ch2, OverflowSkip, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{ctx, o(), OverflowSkip, 0x0, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}},
{ctx, ch4, OverflowSkip, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{ctx, ch1, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{fin, ch1, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, ch2, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}},
{ctx, o(), OverflowSkipWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}},
{ctx, ch4, OverflowSkipWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{ctx, ch1, OverflowDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{fin, ch1, OverflowDiscard, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, ch2, OverflowDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, o(), OverflowDiscard, 0x0, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}},
{ctx, ch4, OverflowDiscard, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

{ctx, ch1, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{fin, ch1, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, ch2, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}},
{ctx, o(), OverflowDiscardWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}},
{ctx, ch4, OverflowDiscardWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},

// TODO(jdef): test OverflowBackpressure (blocking)
{ctx, ch4, OverflowBackpressure, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}},
} {
var (
i, j int
Expand Down
Loading

0 comments on commit 289fdd0

Please sign in to comment.