diff --git a/api/v1/lib/extras/executor/callrules/callrules_generated.go b/api/v1/lib/extras/executor/callrules/callrules_generated.go index 14ad0eb4..3b6e9d2c 100644 --- a/api/v1/lib/extras/executor/callrules/callrules_generated.go +++ b/api/v1/lib/extras/executor/callrules/callrules_generated.go @@ -192,6 +192,23 @@ func (r Rule) Once() Rule { } } +// UnlessDone returns a decorated rule that checks context.Done: if the context has been canceled then the rule chain +// is aborted and the context.Err is merged with the current error state. +// Returns nil (noop) if the receiving Rule is nil. +func (r Rule) UnlessDone() Rule { + if r == nil { + return nil + } + return func(ctx context.Context, e *executor.Call, z mesos.Response, err error, ch Chain) (context.Context, *executor.Call, mesos.Response, error) { + select { + case <-ctx.Done(): + return ctx, e, z, Error2(err, ctx.Err()) + default: + return r(ctx, e, z, err, ch) + } + } +} + type Overflow int const ( @@ -217,29 +234,14 @@ func (r Rule) RateLimit(p <-chan struct{}, over Overflow) Rule { return nil } return func(ctx context.Context, e *executor.Call, z mesos.Response, err error, ch Chain) (context.Context, *executor.Call, mesos.Response, error) { - checkTieBreaker := func() (context.Context, *executor.Call, mesos.Response, error) { - select { - case <-ctx.Done(): - return ctx, e, z, Error2(err, ctx.Err()) - default: - return r(ctx, e, z, err, ch) - } - } select { case <-p: - return checkTieBreaker() - case <-ctx.Done(): - return ctx, e, z, Error2(err, ctx.Err()) + // continue default: // overflow switch over { case OverflowBackpressure: - select { - case <-p: - return checkTieBreaker() - case <-ctx.Done(): - return ctx, e, z, Error2(err, ctx.Err()) - } + <-p case OverflowDiscardWithError: return ctx, e, z, Error2(err, ErrOverflow) case OverflowDiscard: @@ -252,6 +254,7 @@ func (r Rule) RateLimit(p <-chan struct{}, over Overflow) Rule { panic(fmt.Sprintf("unexpected Overflow type: %#v", over)) } } + return r(ctx, e, z, err, ch) } } diff --git a/api/v1/lib/extras/executor/callrules/callrules_generated_test.go b/api/v1/lib/extras/executor/callrules/callrules_generated_test.go index 0e128c93..b7064496 100644 --- a/api/v1/lib/extras/executor/callrules/callrules_generated_test.go +++ b/api/v1/lib/extras/executor/callrules/callrules_generated_test.go @@ -167,6 +167,56 @@ func TestError2(t *testing.T) { } } +func TestUnlessDone(t *testing.T) { + var ( + p = prototype() + ctx = context.Background() + fin = func() context.Context { + c, cancel := context.WithCancel(context.Background()) + cancel() + return c + }() + ) + var zp = &mesos.ResponseWrapper{} + for ti, tc := range []struct { + ctx context.Context + wantsError []error + wantsRuleCount []int + wantsChainCount []int + }{ + {ctx, []error{nil, nil}, []int{1, 2}, []int{1, 2}}, + {fin, []error{nil, context.Canceled}, []int{1, 1}, []int{1, 1}}, + } { + var ( + i, j int + r1 = counter(&i) + r2 = r1.UnlessDone() + ) + for k, r := range []Rule{r1, r2} { + _, e, zz, err := r(tc.ctx, p, zp, nil, chainCounter(&j, chainIdentity)) + if e != p { + t.Errorf("test case %d failed: expected event %q instead of %q", ti, p, e) + } + if zz != zp { + t.Errorf("test case %d failed: expected return object %q instead of %q", ti, zp, zz) + } + if err != tc.wantsError[k] { + t.Errorf("test case %d failed: unexpected error %v", ti, err) + } + if i != tc.wantsRuleCount[k] { + t.Errorf("test case %d failed: expected count of %d instead of %d", ti, tc.wantsRuleCount[k], i) + } + if j != tc.wantsChainCount[k] { + t.Errorf("test case %d failed: expected chain count of %d instead of %d", ti, tc.wantsRuleCount[k], j) + } + } + } + r := Rule(nil).UnlessDone() + if r != nil { + t.Error("expected nil result from UnlessDone") + } +} + func TestAndThen(t *testing.T) { var ( i, j int @@ -514,14 +564,8 @@ func TestRateLimit(t *testing.T) { ch2 = make(chan struct{}) // non-nil, blocking ch4 = make(chan struct{}) // non-nil, closed ctx = context.Background() - fin = func() context.Context { - c, cancel := context.WithCancel(context.Background()) - cancel() - return c - }() ) close(ch4) - // TODO(jdef): unit test for OverflowBackpressure for ti, tc := range []struct { ctx context.Context ch <-chan struct{} @@ -531,28 +575,27 @@ func TestRateLimit(t *testing.T) { wantsChainCount []int }{ {ctx, ch1, OverflowSkip, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}}, - {fin, ch1, OverflowSkip, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, ch2, OverflowSkip, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}}, {ctx, o(), OverflowSkip, 0x0, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}}, {ctx, ch4, OverflowSkip, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, {ctx, ch1, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}}, - {fin, ch1, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, ch2, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}}, {ctx, o(), OverflowSkipWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}}, {ctx, ch4, OverflowSkipWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, {ctx, ch1, OverflowDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, - {fin, ch1, OverflowDiscard, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, ch2, OverflowDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, o(), OverflowDiscard, 0x0, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}}, {ctx, ch4, OverflowDiscard, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, {ctx, ch1, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, - {fin, ch1, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, ch2, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, o(), OverflowDiscardWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}}, {ctx, ch4, OverflowDiscardWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, + + // TODO(jdef): test OverflowBackpressure (blocking) + {ctx, ch4, OverflowBackpressure, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, } { var ( i, j int diff --git a/api/v1/lib/extras/executor/eventrules/eventrules_generated.go b/api/v1/lib/extras/executor/eventrules/eventrules_generated.go index d60c8bd9..3b1af3f6 100644 --- a/api/v1/lib/extras/executor/eventrules/eventrules_generated.go +++ b/api/v1/lib/extras/executor/eventrules/eventrules_generated.go @@ -191,6 +191,23 @@ func (r Rule) Once() Rule { } } +// UnlessDone returns a decorated rule that checks context.Done: if the context has been canceled then the rule chain +// is aborted and the context.Err is merged with the current error state. +// Returns nil (noop) if the receiving Rule is nil. +func (r Rule) UnlessDone() Rule { + if r == nil { + return nil + } + return func(ctx context.Context, e *executor.Event, err error, ch Chain) (context.Context, *executor.Event, error) { + select { + case <-ctx.Done(): + return ctx, e, Error2(err, ctx.Err()) + default: + return r(ctx, e, err, ch) + } + } +} + type Overflow int const ( @@ -216,29 +233,14 @@ func (r Rule) RateLimit(p <-chan struct{}, over Overflow) Rule { return nil } return func(ctx context.Context, e *executor.Event, err error, ch Chain) (context.Context, *executor.Event, error) { - checkTieBreaker := func() (context.Context, *executor.Event, error) { - select { - case <-ctx.Done(): - return ctx, e, Error2(err, ctx.Err()) - default: - return r(ctx, e, err, ch) - } - } select { case <-p: - return checkTieBreaker() - case <-ctx.Done(): - return ctx, e, Error2(err, ctx.Err()) + // continue default: // overflow switch over { case OverflowBackpressure: - select { - case <-p: - return checkTieBreaker() - case <-ctx.Done(): - return ctx, e, Error2(err, ctx.Err()) - } + <-p case OverflowDiscardWithError: return ctx, e, Error2(err, ErrOverflow) case OverflowDiscard: @@ -251,6 +253,7 @@ func (r Rule) RateLimit(p <-chan struct{}, over Overflow) Rule { panic(fmt.Sprintf("unexpected Overflow type: %#v", over)) } } + return r(ctx, e, err, ch) } } diff --git a/api/v1/lib/extras/executor/eventrules/eventrules_generated_test.go b/api/v1/lib/extras/executor/eventrules/eventrules_generated_test.go index 7dc5c131..2add12ff 100644 --- a/api/v1/lib/extras/executor/eventrules/eventrules_generated_test.go +++ b/api/v1/lib/extras/executor/eventrules/eventrules_generated_test.go @@ -150,6 +150,52 @@ func TestError2(t *testing.T) { } } +func TestUnlessDone(t *testing.T) { + var ( + p = prototype() + ctx = context.Background() + fin = func() context.Context { + c, cancel := context.WithCancel(context.Background()) + cancel() + return c + }() + ) + for ti, tc := range []struct { + ctx context.Context + wantsError []error + wantsRuleCount []int + wantsChainCount []int + }{ + {ctx, []error{nil, nil}, []int{1, 2}, []int{1, 2}}, + {fin, []error{nil, context.Canceled}, []int{1, 1}, []int{1, 1}}, + } { + var ( + i, j int + r1 = counter(&i) + r2 = r1.UnlessDone() + ) + for k, r := range []Rule{r1, r2} { + _, e, err := r(tc.ctx, p, nil, chainCounter(&j, chainIdentity)) + if e != p { + t.Errorf("test case %d failed: expected event %q instead of %q", ti, p, e) + } + if err != tc.wantsError[k] { + t.Errorf("test case %d failed: unexpected error %v", ti, err) + } + if i != tc.wantsRuleCount[k] { + t.Errorf("test case %d failed: expected count of %d instead of %d", ti, tc.wantsRuleCount[k], i) + } + if j != tc.wantsChainCount[k] { + t.Errorf("test case %d failed: expected chain count of %d instead of %d", ti, tc.wantsRuleCount[k], j) + } + } + } + r := Rule(nil).UnlessDone() + if r != nil { + t.Error("expected nil result from UnlessDone") + } +} + func TestAndThen(t *testing.T) { var ( i, j int @@ -452,14 +498,8 @@ func TestRateLimit(t *testing.T) { ch2 = make(chan struct{}) // non-nil, blocking ch4 = make(chan struct{}) // non-nil, closed ctx = context.Background() - fin = func() context.Context { - c, cancel := context.WithCancel(context.Background()) - cancel() - return c - }() ) close(ch4) - // TODO(jdef): unit test for OverflowBackpressure for ti, tc := range []struct { ctx context.Context ch <-chan struct{} @@ -469,28 +509,27 @@ func TestRateLimit(t *testing.T) { wantsChainCount []int }{ {ctx, ch1, OverflowSkip, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}}, - {fin, ch1, OverflowSkip, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, ch2, OverflowSkip, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}}, {ctx, o(), OverflowSkip, 0x0, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}}, {ctx, ch4, OverflowSkip, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, {ctx, ch1, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}}, - {fin, ch1, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, ch2, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}}, {ctx, o(), OverflowSkipWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}}, {ctx, ch4, OverflowSkipWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, {ctx, ch1, OverflowDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, - {fin, ch1, OverflowDiscard, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, ch2, OverflowDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, o(), OverflowDiscard, 0x0, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}}, {ctx, ch4, OverflowDiscard, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, {ctx, ch1, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, - {fin, ch1, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, ch2, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, o(), OverflowDiscardWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}}, {ctx, ch4, OverflowDiscardWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, + + // TODO(jdef): test OverflowBackpressure (blocking) + {ctx, ch4, OverflowBackpressure, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, } { var ( i, j int diff --git a/api/v1/lib/extras/gen/rules.go b/api/v1/lib/extras/gen/rules.go index 0c4fbb99..0d16327d 100644 --- a/api/v1/lib/extras/gen/rules.go +++ b/api/v1/lib/extras/gen/rules.go @@ -208,6 +208,23 @@ func (r Rule) Once() Rule { } } +// UnlessDone returns a decorated rule that checks context.Done: if the context has been canceled then the rule chain +// is aborted and the context.Err is merged with the current error state. +// Returns nil (noop) if the receiving Rule is nil. +func (r Rule) UnlessDone() Rule { + if r == nil { + return nil + } + return func(ctx context.Context, e {{.Type "E"}}, {{.Arg "Z" "z," -}} err error, ch Chain) (context.Context, {{.Type "E"}}, {{.Arg "Z" "," -}} error) { + select { + case <-ctx.Done(): + return ctx, e, {{.Ref "Z" "z," -}} Error2(err, ctx.Err()) + default: + return r(ctx, e, {{.Ref "Z" "z," -}} err, ch) + } + } +} + type Overflow int const ( @@ -233,29 +250,14 @@ func (r Rule) RateLimit(p <-chan struct{}, over Overflow) Rule { return nil } return func(ctx context.Context, e {{.Type "E"}}, {{.Arg "Z" "z," -}} err error, ch Chain) (context.Context, {{.Type "E"}}, {{.Arg "Z" "," -}} error) { - checkTieBreaker := func() (context.Context, {{.Type "E"}}, {{.Arg "Z" "," -}} error) { - select { - case <-ctx.Done(): - return ctx, e, {{.Ref "Z" "z," -}} Error2(err, ctx.Err()) - default: - return r(ctx, e, {{.Ref "Z" "z," -}} err, ch) - } - } select { case <-p: - return checkTieBreaker() - case <-ctx.Done(): - return ctx, e, {{.Ref "Z" "z," -}} Error2(err, ctx.Err()) + // continue default: // overflow switch over { case OverflowBackpressure: - select { - case <-p: - return checkTieBreaker() - case <-ctx.Done(): - return ctx, e, {{.Ref "Z" "z," -}} Error2(err, ctx.Err()) - } + <-p case OverflowDiscardWithError: return ctx, e, {{.Ref "Z" "z," -}} Error2(err, ErrOverflow) case OverflowDiscard: @@ -268,6 +270,7 @@ func (r Rule) RateLimit(p <-chan struct{}, over Overflow) Rule { panic(fmt.Sprintf("unexpected Overflow type: %#v", over)) } } + return r(ctx, e, {{.Ref "Z" "z," -}} err, ch) } } @@ -539,6 +542,60 @@ func TestError2(t *testing.T) { } } +func TestUnlessDone(t *testing.T) { + var ( + p = prototype() + ctx = context.Background() + fin = func() context.Context { + c, cancel := context.WithCancel(context.Background()) + cancel() + return c + }() + ) + {{if .Type "Z" -}} + var zp = {{.Prototype "Z"}} + {{end -}} + for ti, tc := range []struct { + ctx context.Context + wantsError []error + wantsRuleCount []int + wantsChainCount []int + }{ + {ctx, []error{nil, nil}, []int{1, 2}, []int{1, 2}}, + {fin, []error{nil, context.Canceled}, []int{1, 1}, []int{1, 1}}, + } { + var ( + i, j int + r1 = counter(&i) + r2 = r1.UnlessDone() + ) + for k, r := range []Rule{r1, r2} { + _, e, {{.Ref "Z" "zz," -}} err := r(tc.ctx, p, {{.Ref "Z" "zp," -}} nil, chainCounter(&j, chainIdentity)) + if e != p { + t.Errorf("test case %d failed: expected event %q instead of %q", ti, p, e) + } + {{if .Type "Z" -}} + if zz != zp { + t.Errorf("test case %d failed: expected return object %q instead of %q", ti, zp, zz) + } + {{end -}} + if err != tc.wantsError[k] { + t.Errorf("test case %d failed: unexpected error %v", ti, err) + } + if i != tc.wantsRuleCount[k] { + t.Errorf("test case %d failed: expected count of %d instead of %d", ti, tc.wantsRuleCount[k], i) + } + if j != tc.wantsChainCount[k] { + t.Errorf("test case %d failed: expected chain count of %d instead of %d", ti, tc.wantsRuleCount[k], j) + } + } + } + r := Rule(nil).UnlessDone() + if r != nil { + t.Error("expected nil result from UnlessDone") + } +} + func TestAndThen(t *testing.T) { var ( i, j int @@ -928,14 +985,8 @@ func TestRateLimit(t *testing.T) { ch2 = make(chan struct{}) // non-nil, blocking ch4 = make(chan struct{}) // non-nil, closed ctx = context.Background() - fin = func() context.Context { - c, cancel := context.WithCancel(context.Background()) - cancel() - return c - }() ) close(ch4) - // TODO(jdef): unit test for OverflowBackpressure for ti, tc := range []struct { ctx context.Context ch <-chan struct{} @@ -945,28 +996,27 @@ func TestRateLimit(t *testing.T) { wantsChainCount []int }{ {ctx, ch1, OverflowSkip, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}}, - {fin, ch1, OverflowSkip, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, ch2, OverflowSkip, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}}, {ctx, o(), OverflowSkip, 0x0, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}}, {ctx, ch4, OverflowSkip, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, {ctx, ch1, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}}, - {fin, ch1, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, ch2, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}}, {ctx, o(), OverflowSkipWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}}, {ctx, ch4, OverflowSkipWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, {ctx, ch1, OverflowDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, - {fin, ch1, OverflowDiscard, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, ch2, OverflowDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, o(), OverflowDiscard, 0x0, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}}, {ctx, ch4, OverflowDiscard, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, {ctx, ch1, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, - {fin, ch1, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, ch2, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, o(), OverflowDiscardWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}}, {ctx, ch4, OverflowDiscardWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, + + // TODO(jdef): test OverflowBackpressure (blocking) + {ctx, ch4, OverflowBackpressure, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, } { var ( i, j int diff --git a/api/v1/lib/extras/scheduler/callrules/callrules_generated.go b/api/v1/lib/extras/scheduler/callrules/callrules_generated.go index 204273ff..023ac7ae 100644 --- a/api/v1/lib/extras/scheduler/callrules/callrules_generated.go +++ b/api/v1/lib/extras/scheduler/callrules/callrules_generated.go @@ -192,6 +192,23 @@ func (r Rule) Once() Rule { } } +// UnlessDone returns a decorated rule that checks context.Done: if the context has been canceled then the rule chain +// is aborted and the context.Err is merged with the current error state. +// Returns nil (noop) if the receiving Rule is nil. +func (r Rule) UnlessDone() Rule { + if r == nil { + return nil + } + return func(ctx context.Context, e *scheduler.Call, z mesos.Response, err error, ch Chain) (context.Context, *scheduler.Call, mesos.Response, error) { + select { + case <-ctx.Done(): + return ctx, e, z, Error2(err, ctx.Err()) + default: + return r(ctx, e, z, err, ch) + } + } +} + type Overflow int const ( @@ -217,29 +234,14 @@ func (r Rule) RateLimit(p <-chan struct{}, over Overflow) Rule { return nil } return func(ctx context.Context, e *scheduler.Call, z mesos.Response, err error, ch Chain) (context.Context, *scheduler.Call, mesos.Response, error) { - checkTieBreaker := func() (context.Context, *scheduler.Call, mesos.Response, error) { - select { - case <-ctx.Done(): - return ctx, e, z, Error2(err, ctx.Err()) - default: - return r(ctx, e, z, err, ch) - } - } select { case <-p: - return checkTieBreaker() - case <-ctx.Done(): - return ctx, e, z, Error2(err, ctx.Err()) + // continue default: // overflow switch over { case OverflowBackpressure: - select { - case <-p: - return checkTieBreaker() - case <-ctx.Done(): - return ctx, e, z, Error2(err, ctx.Err()) - } + <-p case OverflowDiscardWithError: return ctx, e, z, Error2(err, ErrOverflow) case OverflowDiscard: @@ -252,6 +254,7 @@ func (r Rule) RateLimit(p <-chan struct{}, over Overflow) Rule { panic(fmt.Sprintf("unexpected Overflow type: %#v", over)) } } + return r(ctx, e, z, err, ch) } } diff --git a/api/v1/lib/extras/scheduler/callrules/callrules_generated_test.go b/api/v1/lib/extras/scheduler/callrules/callrules_generated_test.go index 5928b2c7..a884de19 100644 --- a/api/v1/lib/extras/scheduler/callrules/callrules_generated_test.go +++ b/api/v1/lib/extras/scheduler/callrules/callrules_generated_test.go @@ -167,6 +167,56 @@ func TestError2(t *testing.T) { } } +func TestUnlessDone(t *testing.T) { + var ( + p = prototype() + ctx = context.Background() + fin = func() context.Context { + c, cancel := context.WithCancel(context.Background()) + cancel() + return c + }() + ) + var zp = &mesos.ResponseWrapper{} + for ti, tc := range []struct { + ctx context.Context + wantsError []error + wantsRuleCount []int + wantsChainCount []int + }{ + {ctx, []error{nil, nil}, []int{1, 2}, []int{1, 2}}, + {fin, []error{nil, context.Canceled}, []int{1, 1}, []int{1, 1}}, + } { + var ( + i, j int + r1 = counter(&i) + r2 = r1.UnlessDone() + ) + for k, r := range []Rule{r1, r2} { + _, e, zz, err := r(tc.ctx, p, zp, nil, chainCounter(&j, chainIdentity)) + if e != p { + t.Errorf("test case %d failed: expected event %q instead of %q", ti, p, e) + } + if zz != zp { + t.Errorf("test case %d failed: expected return object %q instead of %q", ti, zp, zz) + } + if err != tc.wantsError[k] { + t.Errorf("test case %d failed: unexpected error %v", ti, err) + } + if i != tc.wantsRuleCount[k] { + t.Errorf("test case %d failed: expected count of %d instead of %d", ti, tc.wantsRuleCount[k], i) + } + if j != tc.wantsChainCount[k] { + t.Errorf("test case %d failed: expected chain count of %d instead of %d", ti, tc.wantsRuleCount[k], j) + } + } + } + r := Rule(nil).UnlessDone() + if r != nil { + t.Error("expected nil result from UnlessDone") + } +} + func TestAndThen(t *testing.T) { var ( i, j int @@ -514,14 +564,8 @@ func TestRateLimit(t *testing.T) { ch2 = make(chan struct{}) // non-nil, blocking ch4 = make(chan struct{}) // non-nil, closed ctx = context.Background() - fin = func() context.Context { - c, cancel := context.WithCancel(context.Background()) - cancel() - return c - }() ) close(ch4) - // TODO(jdef): unit test for OverflowBackpressure for ti, tc := range []struct { ctx context.Context ch <-chan struct{} @@ -531,28 +575,27 @@ func TestRateLimit(t *testing.T) { wantsChainCount []int }{ {ctx, ch1, OverflowSkip, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}}, - {fin, ch1, OverflowSkip, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, ch2, OverflowSkip, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}}, {ctx, o(), OverflowSkip, 0x0, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}}, {ctx, ch4, OverflowSkip, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, {ctx, ch1, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}}, - {fin, ch1, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, ch2, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}}, {ctx, o(), OverflowSkipWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}}, {ctx, ch4, OverflowSkipWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, {ctx, ch1, OverflowDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, - {fin, ch1, OverflowDiscard, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, ch2, OverflowDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, o(), OverflowDiscard, 0x0, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}}, {ctx, ch4, OverflowDiscard, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, {ctx, ch1, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, - {fin, ch1, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, ch2, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, o(), OverflowDiscardWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}}, {ctx, ch4, OverflowDiscardWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, + + // TODO(jdef): test OverflowBackpressure (blocking) + {ctx, ch4, OverflowBackpressure, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, } { var ( i, j int diff --git a/api/v1/lib/extras/scheduler/eventrules/eventrules_generated.go b/api/v1/lib/extras/scheduler/eventrules/eventrules_generated.go index c713c52f..6b613386 100644 --- a/api/v1/lib/extras/scheduler/eventrules/eventrules_generated.go +++ b/api/v1/lib/extras/scheduler/eventrules/eventrules_generated.go @@ -191,6 +191,23 @@ func (r Rule) Once() Rule { } } +// UnlessDone returns a decorated rule that checks context.Done: if the context has been canceled then the rule chain +// is aborted and the context.Err is merged with the current error state. +// Returns nil (noop) if the receiving Rule is nil. +func (r Rule) UnlessDone() Rule { + if r == nil { + return nil + } + return func(ctx context.Context, e *scheduler.Event, err error, ch Chain) (context.Context, *scheduler.Event, error) { + select { + case <-ctx.Done(): + return ctx, e, Error2(err, ctx.Err()) + default: + return r(ctx, e, err, ch) + } + } +} + type Overflow int const ( @@ -216,29 +233,14 @@ func (r Rule) RateLimit(p <-chan struct{}, over Overflow) Rule { return nil } return func(ctx context.Context, e *scheduler.Event, err error, ch Chain) (context.Context, *scheduler.Event, error) { - checkTieBreaker := func() (context.Context, *scheduler.Event, error) { - select { - case <-ctx.Done(): - return ctx, e, Error2(err, ctx.Err()) - default: - return r(ctx, e, err, ch) - } - } select { case <-p: - return checkTieBreaker() - case <-ctx.Done(): - return ctx, e, Error2(err, ctx.Err()) + // continue default: // overflow switch over { case OverflowBackpressure: - select { - case <-p: - return checkTieBreaker() - case <-ctx.Done(): - return ctx, e, Error2(err, ctx.Err()) - } + <-p case OverflowDiscardWithError: return ctx, e, Error2(err, ErrOverflow) case OverflowDiscard: @@ -251,6 +253,7 @@ func (r Rule) RateLimit(p <-chan struct{}, over Overflow) Rule { panic(fmt.Sprintf("unexpected Overflow type: %#v", over)) } } + return r(ctx, e, err, ch) } } diff --git a/api/v1/lib/extras/scheduler/eventrules/eventrules_generated_test.go b/api/v1/lib/extras/scheduler/eventrules/eventrules_generated_test.go index a6643289..d2334e36 100644 --- a/api/v1/lib/extras/scheduler/eventrules/eventrules_generated_test.go +++ b/api/v1/lib/extras/scheduler/eventrules/eventrules_generated_test.go @@ -150,6 +150,52 @@ func TestError2(t *testing.T) { } } +func TestUnlessDone(t *testing.T) { + var ( + p = prototype() + ctx = context.Background() + fin = func() context.Context { + c, cancel := context.WithCancel(context.Background()) + cancel() + return c + }() + ) + for ti, tc := range []struct { + ctx context.Context + wantsError []error + wantsRuleCount []int + wantsChainCount []int + }{ + {ctx, []error{nil, nil}, []int{1, 2}, []int{1, 2}}, + {fin, []error{nil, context.Canceled}, []int{1, 1}, []int{1, 1}}, + } { + var ( + i, j int + r1 = counter(&i) + r2 = r1.UnlessDone() + ) + for k, r := range []Rule{r1, r2} { + _, e, err := r(tc.ctx, p, nil, chainCounter(&j, chainIdentity)) + if e != p { + t.Errorf("test case %d failed: expected event %q instead of %q", ti, p, e) + } + if err != tc.wantsError[k] { + t.Errorf("test case %d failed: unexpected error %v", ti, err) + } + if i != tc.wantsRuleCount[k] { + t.Errorf("test case %d failed: expected count of %d instead of %d", ti, tc.wantsRuleCount[k], i) + } + if j != tc.wantsChainCount[k] { + t.Errorf("test case %d failed: expected chain count of %d instead of %d", ti, tc.wantsRuleCount[k], j) + } + } + } + r := Rule(nil).UnlessDone() + if r != nil { + t.Error("expected nil result from UnlessDone") + } +} + func TestAndThen(t *testing.T) { var ( i, j int @@ -452,14 +498,8 @@ func TestRateLimit(t *testing.T) { ch2 = make(chan struct{}) // non-nil, blocking ch4 = make(chan struct{}) // non-nil, closed ctx = context.Background() - fin = func() context.Context { - c, cancel := context.WithCancel(context.Background()) - cancel() - return c - }() ) close(ch4) - // TODO(jdef): unit test for OverflowBackpressure for ti, tc := range []struct { ctx context.Context ch <-chan struct{} @@ -469,28 +509,27 @@ func TestRateLimit(t *testing.T) { wantsChainCount []int }{ {ctx, ch1, OverflowSkip, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}}, - {fin, ch1, OverflowSkip, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, ch2, OverflowSkip, 0x0, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}}, {ctx, o(), OverflowSkip, 0x0, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}}, {ctx, ch4, OverflowSkip, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, {ctx, ch1, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}}, - {fin, ch1, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, ch2, OverflowSkipWithError, 0xC, []int{0, 0, 0, 0}, []int{1, 2, 3, 4}}, {ctx, o(), OverflowSkipWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 2, 3, 4}}, {ctx, ch4, OverflowSkipWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, {ctx, ch1, OverflowDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, - {fin, ch1, OverflowDiscard, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, ch2, OverflowDiscard, 0x0, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, o(), OverflowDiscard, 0x0, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}}, {ctx, ch4, OverflowDiscard, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, {ctx, ch1, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, - {fin, ch1, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, ch2, OverflowDiscardWithError, 0xC, []int{0, 0, 0, 0}, []int{0, 0, 1, 2}}, {ctx, o(), OverflowDiscardWithError, 0x4, []int{1, 1, 1, 1}, []int{1, 1, 2, 3}}, {ctx, ch4, OverflowDiscardWithError, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, + + // TODO(jdef): test OverflowBackpressure (blocking) + {ctx, ch4, OverflowBackpressure, 0x0, []int{1, 2, 2, 2}, []int{1, 2, 3, 4}}, } { var ( i, j int