From a1f0c0193a7da6f0badd2bd163a6df7ba17e9540 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Sun, 17 Sep 2017 22:14:07 +0000 Subject: [PATCH 01/10] lib/executor: generated Sender API --- ...generated.go => calls_caller_generated.go} | 2 +- .../executor/calls/calls_sender_generated.go | 129 ++++++++++++++++++ .../calls/calls_sender_generated_test.go | 99 ++++++++++++++ api/v1/lib/executor/calls/gen.go | 5 +- 4 files changed, 233 insertions(+), 2 deletions(-) rename api/v1/lib/executor/calls/{calls_generated.go => calls_caller_generated.go} (94%) create mode 100644 api/v1/lib/executor/calls/calls_sender_generated.go create mode 100644 api/v1/lib/executor/calls/calls_sender_generated_test.go diff --git a/api/v1/lib/executor/calls/calls_generated.go b/api/v1/lib/executor/calls/calls_caller_generated.go similarity index 94% rename from api/v1/lib/executor/calls/calls_generated.go rename to api/v1/lib/executor/calls/calls_caller_generated.go index e4e1b01d..dfa25fdf 100644 --- a/api/v1/lib/executor/calls/calls_generated.go +++ b/api/v1/lib/executor/calls/calls_caller_generated.go @@ -1,6 +1,6 @@ package calls -// go generate -import github.com/mesos/mesos-go/api/v1/lib/executor -type C:*executor.Call +// go generate -import github.com/mesos/mesos-go/api/v1/lib/executor -type C:*executor.Call -output calls_caller_generated.go // GENERATED CODE FOLLOWS; DO NOT EDIT. import ( diff --git a/api/v1/lib/executor/calls/calls_sender_generated.go b/api/v1/lib/executor/calls/calls_sender_generated.go new file mode 100644 index 00000000..763256d6 --- /dev/null +++ b/api/v1/lib/executor/calls/calls_sender_generated.go @@ -0,0 +1,129 @@ +package calls + +// go generate -import github.com/mesos/mesos-go/api/v1/lib/executor -type C:executor.Call -output calls_sender_generated.go +// GENERATED CODE FOLLOWS; DO NOT EDIT. + +import ( + "context" + + "github.com/mesos/mesos-go/api/v1/lib" + "github.com/mesos/mesos-go/api/v1/lib/encoding" + + "github.com/mesos/mesos-go/api/v1/lib/executor" +) + +type ( + // Request generates a Call that's sent to a Mesos agent. Subsequent invocations are expected to + // yield equivalent calls. Intended for use w/ non-streaming requests to an agent. + Request interface { + Call() *executor.Call + } + + // RequestFunc is the functional adaptation of Request. + RequestFunc func() *executor.Call + + // RequestStreaming generates a Call that's send to a Mesos agent. Subsequent invocations MAY generate + // different Call objects. No more Call objects are expected once a nil is returned to signal the end of + // of the request stream. + RequestStreaming interface { + Request + IsStreaming() + } + + // RequestStreamingFunc is the functional adaptation of RequestStreaming. + RequestStreamingFunc func() *executor.Call + + // Send issues a Request to a Mesos agent and properly manages Call-specific mechanics. + Sender interface { + Send(context.Context, Request) (mesos.Response, error) + } + + // SenderFunc is the functional adaptation of the Sender interface + SenderFunc func(context.Context, Request) (mesos.Response, error) +) + +func (f RequestFunc) Call() *executor.Call { return f() } + +func (f RequestFunc) Marshaler() encoding.Marshaler { + // avoid returning (*executor.Call)(nil) for interface type + if call := f(); call != nil { + return call + } + return nil +} + +func (f RequestStreamingFunc) Push(c ...*executor.Call) RequestStreamingFunc { return Push(f, c...) } + +func (f RequestStreamingFunc) Marshaler() encoding.Marshaler { + // avoid returning (*executor.Call)(nil) for interface type + if call := f(); call != nil { + return call + } + return nil +} + +func (f RequestStreamingFunc) IsStreaming() {} + +func (f RequestStreamingFunc) Call() *executor.Call { return f() } + +// Push prepends one or more calls onto a request stream. If no calls are given then the original stream is returned. +func Push(r RequestStreaming, c ...*executor.Call) RequestStreamingFunc { + return func() *executor.Call { + if len(c) == 0 { + return r.Call() + } + head := c[0] + c = c[1:] + return head + } +} + +// Empty generates a stream that always returns nil. +func Empty() RequestStreamingFunc { return func() *executor.Call { return nil } } + +var ( + _ = Request(RequestFunc(nil)) + _ = RequestStreaming(RequestStreamingFunc(nil)) + _ = Sender(SenderFunc(nil)) +) + +// NonStreaming returns a RequestFunc that always generates the same Call. +func NonStreaming(c *executor.Call) RequestFunc { return func() *executor.Call { return c } } + +// FromChan returns a streaming request that fetches calls from the given channel until it closes. +// If a nil chan is specified then the returned func will always generate nil. +func FromChan(ch <-chan *executor.Call) RequestStreamingFunc { + if ch == nil { + // avoid blocking forever if we're handed a nil chan + return func() *executor.Call { return nil } + } + return func() *executor.Call { + if m, ok := <-ch; ok { + return m + } + return nil + } +} + +// Send implements the Sender interface for SenderFunc +func (f SenderFunc) Send(ctx context.Context, r Request) (mesos.Response, error) { + return f(ctx, r) +} + +// IgnoreResponse generates a sender that closes any non-nil response received by Mesos. +func IgnoreResponse(s Sender) SenderFunc { + return func(ctx context.Context, r Request) (mesos.Response, error) { + resp, err := s.Send(ctx, r) + if resp != nil { + resp.Close() + } + return nil, err + } +} + +// SendNoData is a convenience func that executes the given Call using the provided Sender +// and always drops the response data. +func SendNoData(ctx context.Context, sender Sender, r Request) (err error) { + _, err = IgnoreResponse(sender).Send(ctx, r) + return +} diff --git a/api/v1/lib/executor/calls/calls_sender_generated_test.go b/api/v1/lib/executor/calls/calls_sender_generated_test.go new file mode 100644 index 00000000..0625821e --- /dev/null +++ b/api/v1/lib/executor/calls/calls_sender_generated_test.go @@ -0,0 +1,99 @@ +package calls + +// go generate -import github.com/mesos/mesos-go/api/v1/lib/executor -type C:executor.Call -output calls_sender_generated.go +// GENERATED CODE FOLLOWS; DO NOT EDIT. + +import ( + "context" + "testing" + + "github.com/mesos/mesos-go/api/v1/lib" + + "github.com/mesos/mesos-go/api/v1/lib/executor" +) + +func TestNonStreaming(t *testing.T) { + c := new(executor.Call) + f := NonStreaming(c) + if x := f.Call(); x != c { + t.Fatalf("expected %#v instead of %#v", c, x) + } + if x := f.Marshaler(); x == nil { + t.Fatal("expected non-nil Marshaler") + } + f = NonStreaming(nil) + if x := f.Marshaler(); x != nil { + t.Fatalf("expected nil Marshaler instead of %#v", x) + } +} + +func TestStreaming(t *testing.T) { + f := Empty() + + f.IsStreaming() + + if x := f.Call(); x != nil { + t.Fatalf("expected nil Call instead of %#v", x) + } + if x := f.Marshaler(); x != nil { + t.Fatalf("expected nil Call instead of %#v", x) + } + + c := new(executor.Call) + + f = f.Push(c) + if x := f.Marshaler(); x == nil { + t.Fatal("expected non-nil Marshaler") + } + if x := f.Marshaler(); x != nil { + t.Fatalf("expected nil Marshaler instead of %#v", x) + } + + c2 := new(executor.Call) + + f = Empty().Push(c, c2) + if x := f.Call(); x != c { + t.Fatalf("expected %#v instead of %#v", c, x) + } + if x := f.Call(); x != c2 { + t.Fatalf("expected %#v instead of %#v", c2, x) + } + if x := f.Call(); x != nil { + t.Fatalf("expected nil Call instead of %#v", x) + } + + ch := make(chan *executor.Call, 2) + ch <- c + ch <- c2 + close(ch) + f = FromChan(ch) + if x := f.Call(); x != c { + t.Fatalf("expected %#v instead of %#v", c, x) + } + if x := f.Call(); x != c2 { + t.Fatalf("expected %#v instead of %#v", c2, x) + } + if x := f.Call(); x != nil { + t.Fatalf("expected nil Call instead of %#v", x) + } + + f = FromChan(nil) + if x := f.Call(); x != nil { + t.Fatalf("expected nil Call instead of %#v", x) + } +} + +func TestIgnoreResponse(t *testing.T) { + var closed bool + + IgnoreResponse(SenderFunc(func(_ context.Context, _ Request) (mesos.Response, error) { + return &mesos.ResponseWrapper{Closer: mesos.CloseFunc(func() error { + closed = true + return nil + })}, nil + })).Send(nil, nil) + + if !closed { + t.Fatal("expected response to be closed") + } +} diff --git a/api/v1/lib/executor/calls/gen.go b/api/v1/lib/executor/calls/gen.go index b78ed2ea..a5f3f3e5 100644 --- a/api/v1/lib/executor/calls/gen.go +++ b/api/v1/lib/executor/calls/gen.go @@ -1,3 +1,6 @@ package calls -//go:generate go run ../../extras/gen/callers.go ../../extras/gen/gen.go -import github.com/mesos/mesos-go/api/v1/lib/executor -type C:*executor.Call +// callers.go APIs are deprecated in favor of sender.go APIs +//go:generate go run ../../extras/gen/callers.go ../../extras/gen/gen.go -import github.com/mesos/mesos-go/api/v1/lib/executor -type C:*executor.Call -output calls_caller_generated.go + +//go:generate go run ../../extras/gen/sender.go ../../extras/gen/gen.go -import github.com/mesos/mesos-go/api/v1/lib/executor -type C:executor.Call -output calls_sender_generated.go From 6477b476ef556f28a319ffa7eb68a4dc345de12b Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Mon, 18 Sep 2017 11:57:54 +0000 Subject: [PATCH 02/10] gen: define SenderWith if call option type O is defined --- api/v1/lib/extras/gen/sender.go | 52 +++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/api/v1/lib/extras/gen/sender.go b/api/v1/lib/extras/gen/sender.go index 1183d205..8e3145ca 100644 --- a/api/v1/lib/extras/gen/sender.go +++ b/api/v1/lib/extras/gen/sender.go @@ -142,6 +142,28 @@ func SendNoData(ctx context.Context, sender Sender, r Request) (err error) { _, err = IgnoreResponse(sender).Send(ctx, r) return } +{{if .Type "O"}}{{/* O is a functional call option, C.With(...O) must be defined elsewhere */}} +// SendWith injects the given options for all calls. +func SenderWith(s Sender, opts ...{{.Type "O"}}) SenderFunc { + if len(opts) == 0 { + return s.Send + } + return func(ctx context.Context, r Request) (mesos.Response, error) { + f := func() (c *{{.Type "C"}}) { + if c = r.Call(); c != nil { + c = c.With(opts...) + } + return + } + switch r.(type) { + case RequestStreaming: + return s.Send(ctx, RequestStreamingFunc(f)) + default: + return s.Send(ctx, RequestFunc(f)) + } + } +} +{{end -}} `)) var testTemplate = template.Must(template.New("").Parse(`package {{.Package}} @@ -244,4 +266,34 @@ func TestIgnoreResponse(t *testing.T) { t.Fatal("expected response to be closed") } } +{{if .Type "O"}}{{/* O is a functional call option, C.With(...O) must be defined elsewhere */}} +func TestSenderWith(t *testing.T) { + var ( + s = SenderFunc(func(_ context.Context, r Request) (mesos.Response, error) { + _ = r.Call() // need to invoke this to invoke SenderWith call decoration + return nil, nil + }) + ignore = func(_ mesos.Response, _ error) {} + c = new({{.Type "C"}}) + ) + + for ti, tc := range []Request{NonStreaming(c), Empty().Push(c, c)} { + var ( + invoked bool + opt = func(c *{{.Type "C"}}) { invoked = true } + ) + + // sanity check (w/o any options) + ignore(SenderWith(s).Send(context.Background(), tc)) + if invoked { + t.Fatalf("test case %d failed: unexpected option invocation", ti) + } + + ignore(SenderWith(s, opt).Send(context.Background(), tc)) + if !invoked { + t.Fatalf("test case %d failed: expected option invocation", ti) + } + } +} +{{end -}} `)) From 8af94afbf65ee0ea5624861ae72c217519327893 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Mon, 18 Sep 2017 11:59:06 +0000 Subject: [PATCH 03/10] lib/executor/calls: generator defines type O for SenderWith support --- .../executor/calls/calls_sender_generated.go | 23 +++++++++++++- .../calls/calls_sender_generated_test.go | 31 ++++++++++++++++++- api/v1/lib/executor/calls/gen.go | 2 +- 3 files changed, 53 insertions(+), 3 deletions(-) diff --git a/api/v1/lib/executor/calls/calls_sender_generated.go b/api/v1/lib/executor/calls/calls_sender_generated.go index 763256d6..d93ed725 100644 --- a/api/v1/lib/executor/calls/calls_sender_generated.go +++ b/api/v1/lib/executor/calls/calls_sender_generated.go @@ -1,6 +1,6 @@ package calls -// go generate -import github.com/mesos/mesos-go/api/v1/lib/executor -type C:executor.Call -output calls_sender_generated.go +// go generate -import github.com/mesos/mesos-go/api/v1/lib/executor -type C:executor.Call -type O:executor.CallOpt -output calls_sender_generated.go // GENERATED CODE FOLLOWS; DO NOT EDIT. import ( @@ -127,3 +127,24 @@ func SendNoData(ctx context.Context, sender Sender, r Request) (err error) { _, err = IgnoreResponse(sender).Send(ctx, r) return } + +// SendWith injects the given options for all calls. +func SenderWith(s Sender, opts ...executor.CallOpt) SenderFunc { + if len(opts) == 0 { + return s.Send + } + return func(ctx context.Context, r Request) (mesos.Response, error) { + f := func() (c *executor.Call) { + if c = r.Call(); c != nil { + c = c.With(opts...) + } + return + } + switch r.(type) { + case RequestStreaming: + return s.Send(ctx, RequestStreamingFunc(f)) + default: + return s.Send(ctx, RequestFunc(f)) + } + } +} diff --git a/api/v1/lib/executor/calls/calls_sender_generated_test.go b/api/v1/lib/executor/calls/calls_sender_generated_test.go index 0625821e..fb5b226c 100644 --- a/api/v1/lib/executor/calls/calls_sender_generated_test.go +++ b/api/v1/lib/executor/calls/calls_sender_generated_test.go @@ -1,6 +1,6 @@ package calls -// go generate -import github.com/mesos/mesos-go/api/v1/lib/executor -type C:executor.Call -output calls_sender_generated.go +// go generate -import github.com/mesos/mesos-go/api/v1/lib/executor -type C:executor.Call -type O:executor.CallOpt -output calls_sender_generated.go // GENERATED CODE FOLLOWS; DO NOT EDIT. import ( @@ -97,3 +97,32 @@ func TestIgnoreResponse(t *testing.T) { t.Fatal("expected response to be closed") } } + +func TestSenderWith(t *testing.T) { + var ( + s = SenderFunc(func(_ context.Context, r Request) (mesos.Response, error) { + _ = r.Call() // need to invoke this to invoke SenderWith call decoration + return nil, nil + }) + ignore = func(_ mesos.Response, _ error) {} + c = new(executor.Call) + ) + + for ti, tc := range []Request{NonStreaming(c), Empty().Push(c, c)} { + var ( + invoked bool + opt = func(c *executor.Call) { invoked = true } + ) + + // sanity check (w/o any options) + ignore(SenderWith(s).Send(context.Background(), tc)) + if invoked { + t.Fatalf("test case %d failed: unexpected option invocation", ti) + } + + ignore(SenderWith(s, opt).Send(context.Background(), tc)) + if !invoked { + t.Fatalf("test case %d failed: expected option invocation", ti) + } + } +} diff --git a/api/v1/lib/executor/calls/gen.go b/api/v1/lib/executor/calls/gen.go index a5f3f3e5..27d9de41 100644 --- a/api/v1/lib/executor/calls/gen.go +++ b/api/v1/lib/executor/calls/gen.go @@ -3,4 +3,4 @@ package calls // callers.go APIs are deprecated in favor of sender.go APIs //go:generate go run ../../extras/gen/callers.go ../../extras/gen/gen.go -import github.com/mesos/mesos-go/api/v1/lib/executor -type C:*executor.Call -output calls_caller_generated.go -//go:generate go run ../../extras/gen/sender.go ../../extras/gen/gen.go -import github.com/mesos/mesos-go/api/v1/lib/executor -type C:executor.Call -output calls_sender_generated.go +//go:generate go run ../../extras/gen/sender.go ../../extras/gen/gen.go -import github.com/mesos/mesos-go/api/v1/lib/executor -type C:executor.Call -type O:executor.CallOpt -output calls_sender_generated.go From e664bcae70a101b8be0797745165c33303469304 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Mon, 18 Sep 2017 12:00:12 +0000 Subject: [PATCH 04/10] gen/httpsender: NewSender accepts RequestOpt varargs --- api/v1/lib/extras/gen/httpsender.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/v1/lib/extras/gen/httpsender.go b/api/v1/lib/extras/gen/httpsender.go index 6cb03604..2b05c214 100644 --- a/api/v1/lib/extras/gen/httpsender.go +++ b/api/v1/lib/extras/gen/httpsender.go @@ -41,7 +41,7 @@ var DefaultResponseClassifier = ResponseClassifier(classifyResponse) // NewSender generates a sender that uses the Mesos v1 HTTP API for encoding/decoding requests/responses. // The ResponseClass is inferred from the first object generated by the given Request. -func NewSender(cf ClientFunc) calls.Sender { +func NewSender(cf ClientFunc, ro ...httpcli.RequestOpt) calls.Sender { return calls.SenderFunc(func(ctx context.Context, r calls.Request) (mesos.Response, error) { var ( obj = r.Call() @@ -60,7 +60,7 @@ func NewSender(cf ClientFunc) calls.Sender { req = calls.NonStreaming(obj) } - return cf(req, rc, httpcli.Context(ctx)) + return cf(req, rc, append(ro, httpcli.Context(ctx))...) }) } `)) From d1fc995458e4fffb37f3294fa16db35c9f7b1a7c Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Mon, 18 Sep 2017 12:00:43 +0000 Subject: [PATCH 05/10] http{agent,master}: regenerate for NewSender code gen changes --- api/v1/lib/httpcli/httpagent/httpagent_generated.go | 4 ++-- api/v1/lib/httpcli/httpmaster/httpmaster_generated.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/api/v1/lib/httpcli/httpagent/httpagent_generated.go b/api/v1/lib/httpcli/httpagent/httpagent_generated.go index b9db6d3c..a83664a8 100644 --- a/api/v1/lib/httpcli/httpagent/httpagent_generated.go +++ b/api/v1/lib/httpcli/httpagent/httpagent_generated.go @@ -25,7 +25,7 @@ var DefaultResponseClassifier = ResponseClassifier(classifyResponse) // NewSender generates a sender that uses the Mesos v1 HTTP API for encoding/decoding requests/responses. // The ResponseClass is inferred from the first object generated by the given Request. -func NewSender(cf ClientFunc) calls.Sender { +func NewSender(cf ClientFunc, ro ...httpcli.RequestOpt) calls.Sender { return calls.SenderFunc(func(ctx context.Context, r calls.Request) (mesos.Response, error) { var ( obj = r.Call() @@ -44,6 +44,6 @@ func NewSender(cf ClientFunc) calls.Sender { req = calls.NonStreaming(obj) } - return cf(req, rc, httpcli.Context(ctx)) + return cf(req, rc, append(ro, httpcli.Context(ctx))...) }) } diff --git a/api/v1/lib/httpcli/httpmaster/httpmaster_generated.go b/api/v1/lib/httpcli/httpmaster/httpmaster_generated.go index 3e32cea2..e4c4c84c 100644 --- a/api/v1/lib/httpcli/httpmaster/httpmaster_generated.go +++ b/api/v1/lib/httpcli/httpmaster/httpmaster_generated.go @@ -25,7 +25,7 @@ var DefaultResponseClassifier = ResponseClassifier(classifyResponse) // NewSender generates a sender that uses the Mesos v1 HTTP API for encoding/decoding requests/responses. // The ResponseClass is inferred from the first object generated by the given Request. -func NewSender(cf ClientFunc) calls.Sender { +func NewSender(cf ClientFunc, ro ...httpcli.RequestOpt) calls.Sender { return calls.SenderFunc(func(ctx context.Context, r calls.Request) (mesos.Response, error) { var ( obj = r.Call() @@ -44,6 +44,6 @@ func NewSender(cf ClientFunc) calls.Sender { req = calls.NonStreaming(obj) } - return cf(req, rc, httpcli.Context(ctx)) + return cf(req, rc, append(ro, httpcli.Context(ctx))...) }) } From 56d9c0e6aec7be72218b2136d98a2e4625316c27 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Mon, 18 Sep 2017 12:02:27 +0000 Subject: [PATCH 06/10] httpexec: initial revision --- api/v1/lib/httpcli/httpexec/gen.go | 3 + .../httpcli/httpexec/httpexec_generated.go | 49 +++++++++++++++ .../httpexec/httpexec_generated_test.go | 59 +++++++++++++++++++ api/v1/lib/httpcli/httpexec/util.go | 10 ++++ 4 files changed, 121 insertions(+) create mode 100644 api/v1/lib/httpcli/httpexec/gen.go create mode 100644 api/v1/lib/httpcli/httpexec/httpexec_generated.go create mode 100644 api/v1/lib/httpcli/httpexec/httpexec_generated_test.go create mode 100644 api/v1/lib/httpcli/httpexec/util.go diff --git a/api/v1/lib/httpcli/httpexec/gen.go b/api/v1/lib/httpcli/httpexec/gen.go new file mode 100644 index 00000000..4411ffba --- /dev/null +++ b/api/v1/lib/httpcli/httpexec/gen.go @@ -0,0 +1,3 @@ +package httpexec + +//go:generate go run ../../extras/gen/httpsender.go ../../extras/gen/gen.go -import github.com/mesos/mesos-go/api/v1/lib/executor -import github.com/mesos/mesos-go/api/v1/lib/executor/calls -type C:executor.Call:executor.Call{Type:agent.Call_MESSAGE} diff --git a/api/v1/lib/httpcli/httpexec/httpexec_generated.go b/api/v1/lib/httpcli/httpexec/httpexec_generated.go new file mode 100644 index 00000000..845fb24a --- /dev/null +++ b/api/v1/lib/httpcli/httpexec/httpexec_generated.go @@ -0,0 +1,49 @@ +package httpexec + +// go generate -import github.com/mesos/mesos-go/api/v1/lib/executor -import github.com/mesos/mesos-go/api/v1/lib/executor/calls -type C:executor.Call:executor.Call{Type:agent.Call_MESSAGE} +// GENERATED CODE FOLLOWS; DO NOT EDIT. + +import ( + "context" + + "github.com/mesos/mesos-go/api/v1/lib" + "github.com/mesos/mesos-go/api/v1/lib/client" + "github.com/mesos/mesos-go/api/v1/lib/httpcli" + + "github.com/mesos/mesos-go/api/v1/lib/executor" + "github.com/mesos/mesos-go/api/v1/lib/executor/calls" +) + +// ResponseClassifier determines the appropriate response class for the given call. +type ResponseClassifier func(*executor.Call) (client.ResponseClass, error) + +// ClientFunc sends a Request to Mesos and returns the generated Response. +type ClientFunc func(client.Request, client.ResponseClass, ...httpcli.RequestOpt) (mesos.Response, error) + +// DefaultResponseClassifier is a pluggable classifier. +var DefaultResponseClassifier = ResponseClassifier(classifyResponse) + +// NewSender generates a sender that uses the Mesos v1 HTTP API for encoding/decoding requests/responses. +// The ResponseClass is inferred from the first object generated by the given Request. +func NewSender(cf ClientFunc, ro ...httpcli.RequestOpt) calls.Sender { + return calls.SenderFunc(func(ctx context.Context, r calls.Request) (mesos.Response, error) { + var ( + obj = r.Call() + rc, err = DefaultResponseClassifier(obj) + ) + if err != nil { + return nil, err + } + + var req client.Request + + switch r := r.(type) { + case calls.RequestStreaming: + req = calls.Push(r, obj) + default: + req = calls.NonStreaming(obj) + } + + return cf(req, rc, append(ro, httpcli.Context(ctx))...) + }) +} diff --git a/api/v1/lib/httpcli/httpexec/httpexec_generated_test.go b/api/v1/lib/httpcli/httpexec/httpexec_generated_test.go new file mode 100644 index 00000000..28d94fdb --- /dev/null +++ b/api/v1/lib/httpcli/httpexec/httpexec_generated_test.go @@ -0,0 +1,59 @@ +package httpexec + +// go generate -import github.com/mesos/mesos-go/api/v1/lib/executor -import github.com/mesos/mesos-go/api/v1/lib/executor/calls -type C:executor.Call:executor.Call{Type:agent.Call_MESSAGE} +// GENERATED CODE FOLLOWS; DO NOT EDIT. + +import ( + "context" + "testing" + + "github.com/mesos/mesos-go/api/v1/lib" + "github.com/mesos/mesos-go/api/v1/lib/client" + "github.com/mesos/mesos-go/api/v1/lib/httpcli" + + "github.com/mesos/mesos-go/api/v1/lib/executor" + "github.com/mesos/mesos-go/api/v1/lib/executor/calls" +) + +func TestNewSender(t *testing.T) { + ch := make(chan client.Request, 1) + cf := ClientFunc(func(r client.Request, _ client.ResponseClass, _ ...httpcli.RequestOpt) (_ mesos.Response, _ error) { + ch <- r + return + }) + check := func(_ mesos.Response, err error) { + if err != nil { + t.Fatal(err) + } + } + sent := func() client.Request { + select { + case r := <-ch: + return r + default: + t.Fatal("no request was sent") + } + return nil + } + sender := NewSender(cf) + c := &executor.Call{Type:agent.Call_MESSAGE} + + check(sender.Send(context.Background(), calls.NonStreaming(c))) + r := sent() + if _, ok := r.(client.RequestStreaming); ok { + t.Fatalf("expected non-streaming request instead of %v", r) + } + + check(sender.Send(context.Background(), calls.Empty().Push(c))) + r = sent() + if _, ok := r.(client.RequestStreaming); !ok { + t.Fatalf("expected streaming request instead of %v", r) + } + + // expect this to fail because newly created call structs don't have a type + // that can be used for classifying an expected response type. + _, err := sender.Send(context.Background(), calls.Empty().Push(new(executor.Call))) + if err == nil { + t.Fatal("expected send to fail w/ malformed call") + } +} diff --git a/api/v1/lib/httpcli/httpexec/util.go b/api/v1/lib/httpcli/httpexec/util.go new file mode 100644 index 00000000..038689a7 --- /dev/null +++ b/api/v1/lib/httpcli/httpexec/util.go @@ -0,0 +1,10 @@ +package httpexec + +import ( + "github.com/mesos/mesos-go/api/v1/lib/client" + "github.com/mesos/mesos-go/api/v1/lib/executor" +) + +func classifyResponse(c *executor.Call) (client.ResponseClass, error) { + return client.ResponseClassAuto, nil // TODO(jdef) fix this, ResponseClassAuto is deprecated +} From d59419c01c27398a08d6ddf3ce1c410604214a6a Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Mon, 18 Sep 2017 12:02:52 +0000 Subject: [PATCH 07/10] build: generate for lib/executor and lib/httpcli/httpexec packages --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index a00cd76b..d5a919a8 100644 --- a/Makefile +++ b/Makefile @@ -94,7 +94,7 @@ sync: (cd ${CMD_VENDOR}; govendor sync) .PHONY: generate -generate: GENERATE_PACKAGES = ./api/v1/lib/extras/executor/eventrules ./api/v1/lib/extras/executor/callrules ./api/v1/lib/extras/scheduler/eventrules ./api/v1/lib/extras/scheduler/callrules ./api/v1/lib/executor/events ./api/v1/lib/executor/calls ./api/v1/lib/scheduler/events ./api/v1/lib/scheduler/calls ./api/v1/lib/agent/calls ./api/v1/lib/master/calls ./api/v1/lib/httpcli/httpagent ./api/v1/lib/httpcli/httpmaster +generate: GENERATE_PACKAGES = ./api/v1/lib/extras/executor/eventrules ./api/v1/lib/extras/executor/callrules ./api/v1/lib/extras/scheduler/eventrules ./api/v1/lib/extras/scheduler/callrules ./api/v1/lib/executor/events ./api/v1/lib/executor/calls ./api/v1/lib/scheduler/events ./api/v1/lib/scheduler/calls ./api/v1/lib/agent/calls ./api/v1/lib/master/calls ./api/v1/lib/httpcli/httpagent ./api/v1/lib/httpcli/httpmaster ./api/v1/lib/httpcli/httpexec generate: go generate -x ${GENERATE_PACKAGES} From a26912873f3aefff26f2e1c2c9e1b523c88882b6 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Mon, 18 Sep 2017 12:04:54 +0000 Subject: [PATCH 08/10] example-executor: migrate from calls.Caller to calls.Sender API --- api/v1/cmd/example-executor/main.go | 82 ++++++++++++++--------------- 1 file changed, 40 insertions(+), 42 deletions(-) diff --git a/api/v1/cmd/example-executor/main.go b/api/v1/cmd/example-executor/main.go index 46025ca9..606719ed 100644 --- a/api/v1/cmd/example-executor/main.go +++ b/api/v1/cmd/example-executor/main.go @@ -18,6 +18,7 @@ import ( "github.com/mesos/mesos-go/api/v1/lib/executor/config" "github.com/mesos/mesos-go/api/v1/lib/executor/events" "github.com/mesos/mesos-go/api/v1/lib/httpcli" + "github.com/mesos/mesos-go/api/v1/lib/httpcli/httpexec" "github.com/pborman/uuid" ) @@ -52,32 +53,38 @@ func run(cfg config.Config) { Host: cfg.AgentEndpoint, Path: apiPath, } + http = httpcli.New( + httpcli.Endpoint(apiURL.String()), + httpcli.Codec(codecs.ByMediaType[codecs.MediaTypeProtobuf]), + httpcli.Do(httpcli.With(httpcli.Timeout(httpTimeout))), + ) + callOptions = executor.CallOptions{ + calls.Framework(cfg.FrameworkID), + calls.Executor(cfg.ExecutorID), + } state = &internalState{ - cli: httpcli.New( - httpcli.Endpoint(apiURL.String()), - httpcli.Codec(codecs.ByMediaType[codecs.MediaTypeProtobuf]), - httpcli.Do(httpcli.With(httpcli.Timeout(httpTimeout))), + cli: calls.SenderWith( + httpexec.NewSender(http.Send), + callOptions..., ), - callOptions: executor.CallOptions{ - calls.Framework(cfg.FrameworkID), - calls.Executor(cfg.ExecutorID), - }, unackedTasks: make(map[mesos.TaskID]mesos.TaskInfo), unackedUpdates: make(map[string]executor.Call_Update), failedTasks: make(map[mesos.TaskID]mesos.TaskStatus), } - subscribe = calls.Subscribe(nil, nil).With(state.callOptions...) + subscriber = calls.SenderWith( + httpexec.NewSender(http.Send, httpcli.Close(true)), + callOptions..., + ) shouldReconnect = maybeReconnect(cfg) disconnected = time.Now() handler = buildEventHandler(state) ) for { - subscribe = subscribe.With( - unacknowledgedTasks(state), - unacknowledgedUpdates(state), - ) func() { - resp, err := state.cli.Do(subscribe, httpcli.Close(true)) + subscribe := calls.Subscribe(unacknowledgedTasks(state), unacknowledgedUpdates(state)) + + log.Println("subscribing to agent for events..") + resp, err := subscriber.Send(context.TODO(), calls.NonStreaming(subscribe)) if resp != nil { defer resp.Close() } @@ -104,43 +111,35 @@ func run(cfg config.Config) { log.Printf("failed to re-establish subscription with agent within %v, aborting", cfg.RecoveryTimeout) return } + log.Println("waiting for reconnect timeout") <-shouldReconnect // wait for some amount of time before retrying subscription } } -// unacknowledgedTasks is a functional option that sets the value of the UnacknowledgedTasks -// field of a Subscribe call. -func unacknowledgedTasks(state *internalState) executor.CallOpt { - return func(call *executor.Call) { - if n := len(state.unackedTasks); n > 0 { - unackedTasks := make([]mesos.TaskInfo, 0, n) - for k := range state.unackedTasks { - unackedTasks = append(unackedTasks, state.unackedTasks[k]) - } - call.Subscribe.UnacknowledgedTasks = unackedTasks - } else { - call.Subscribe.UnacknowledgedTasks = nil +// unacknowledgedTasks generates the value of the UnacknowledgedTasks field of a Subscribe call. +func unacknowledgedTasks(state *internalState) (result []mesos.TaskInfo) { + if n := len(state.unackedTasks); n > 0 { + result = make([]mesos.TaskInfo, 0, n) + for k := range state.unackedTasks { + result = append(result, state.unackedTasks[k]) } } + return } -// unacknowledgedUpdates is a functional option that sets the value of the UnacknowledgedUpdates -// field of a Subscribe call. -func unacknowledgedUpdates(state *internalState) executor.CallOpt { - return func(call *executor.Call) { - if n := len(state.unackedUpdates); n > 0 { - unackedUpdates := make([]executor.Call_Update, 0, n) - for k := range state.unackedUpdates { - unackedUpdates = append(unackedUpdates, state.unackedUpdates[k]) - } - call.Subscribe.UnacknowledgedUpdates = unackedUpdates - } else { - call.Subscribe.UnacknowledgedUpdates = nil +// unacknowledgedUpdates generates the value of the UnacknowledgedUpdates field of a Subscribe call. +func unacknowledgedUpdates(state *internalState) (result []executor.Call_Update) { + if n := len(state.unackedUpdates); n > 0 { + result = make([]executor.Call_Update, 0, n) + for k := range state.unackedUpdates { + result = append(result, state.unackedUpdates[k]) } } + return } func eventLoop(state *internalState, decoder encoding.Decoder, h events.Handler) (err error) { + log.Println("listening for events from agent...") ctx := context.TODO() for err == nil && !state.shouldQuit { // housekeeping @@ -239,8 +238,8 @@ func launch(state *internalState, task mesos.TaskInfo) { func protoString(s string) *string { return &s } func update(state *internalState, status mesos.TaskStatus) error { - upd := calls.Update(status).With(state.callOptions...) - resp, err := state.cli.Do(upd) + upd := calls.Update(status) + resp, err := state.cli.Send(context.TODO(), calls.NonStreaming(upd)) if resp != nil { resp.Close() } @@ -263,8 +262,7 @@ func newStatus(state *internalState, id mesos.TaskID) mesos.TaskStatus { } type internalState struct { - callOptions executor.CallOptions - cli *httpcli.Client + cli calls.Sender cfg config.Config framework mesos.FrameworkInfo executor mesos.ExecutorInfo From 5798ac90f230c67019ae163be2b5d3bdda640e4e Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Mon, 18 Sep 2017 12:16:42 +0000 Subject: [PATCH 09/10] httpexec: fix broken unit test --- api/v1/lib/httpcli/httpexec/gen.go | 2 +- api/v1/lib/httpcli/httpexec/httpexec_generated.go | 2 +- .../lib/httpcli/httpexec/httpexec_generated_test.go | 4 ++-- api/v1/lib/httpcli/httpexec/util.go | 11 +++++++++-- 4 files changed, 13 insertions(+), 6 deletions(-) diff --git a/api/v1/lib/httpcli/httpexec/gen.go b/api/v1/lib/httpcli/httpexec/gen.go index 4411ffba..40fd2355 100644 --- a/api/v1/lib/httpcli/httpexec/gen.go +++ b/api/v1/lib/httpcli/httpexec/gen.go @@ -1,3 +1,3 @@ package httpexec -//go:generate go run ../../extras/gen/httpsender.go ../../extras/gen/gen.go -import github.com/mesos/mesos-go/api/v1/lib/executor -import github.com/mesos/mesos-go/api/v1/lib/executor/calls -type C:executor.Call:executor.Call{Type:agent.Call_MESSAGE} +//go:generate go run ../../extras/gen/httpsender.go ../../extras/gen/gen.go -import github.com/mesos/mesos-go/api/v1/lib/executor -import github.com/mesos/mesos-go/api/v1/lib/executor/calls -type C:executor.Call:executor.Call{Type:executor.Call_MESSAGE} diff --git a/api/v1/lib/httpcli/httpexec/httpexec_generated.go b/api/v1/lib/httpcli/httpexec/httpexec_generated.go index 845fb24a..5a516eaf 100644 --- a/api/v1/lib/httpcli/httpexec/httpexec_generated.go +++ b/api/v1/lib/httpcli/httpexec/httpexec_generated.go @@ -1,6 +1,6 @@ package httpexec -// go generate -import github.com/mesos/mesos-go/api/v1/lib/executor -import github.com/mesos/mesos-go/api/v1/lib/executor/calls -type C:executor.Call:executor.Call{Type:agent.Call_MESSAGE} +// go generate -import github.com/mesos/mesos-go/api/v1/lib/executor -import github.com/mesos/mesos-go/api/v1/lib/executor/calls -type C:executor.Call:executor.Call{Type:executor.Call_MESSAGE} // GENERATED CODE FOLLOWS; DO NOT EDIT. import ( diff --git a/api/v1/lib/httpcli/httpexec/httpexec_generated_test.go b/api/v1/lib/httpcli/httpexec/httpexec_generated_test.go index 28d94fdb..6feedd86 100644 --- a/api/v1/lib/httpcli/httpexec/httpexec_generated_test.go +++ b/api/v1/lib/httpcli/httpexec/httpexec_generated_test.go @@ -1,6 +1,6 @@ package httpexec -// go generate -import github.com/mesos/mesos-go/api/v1/lib/executor -import github.com/mesos/mesos-go/api/v1/lib/executor/calls -type C:executor.Call:executor.Call{Type:agent.Call_MESSAGE} +// go generate -import github.com/mesos/mesos-go/api/v1/lib/executor -import github.com/mesos/mesos-go/api/v1/lib/executor/calls -type C:executor.Call:executor.Call{Type:executor.Call_MESSAGE} // GENERATED CODE FOLLOWS; DO NOT EDIT. import ( @@ -36,7 +36,7 @@ func TestNewSender(t *testing.T) { return nil } sender := NewSender(cf) - c := &executor.Call{Type:agent.Call_MESSAGE} + c := &executor.Call{Type:executor.Call_MESSAGE} check(sender.Send(context.Background(), calls.NonStreaming(c))) r := sent() diff --git a/api/v1/lib/httpcli/httpexec/util.go b/api/v1/lib/httpcli/httpexec/util.go index 038689a7..521c105f 100644 --- a/api/v1/lib/httpcli/httpexec/util.go +++ b/api/v1/lib/httpcli/httpexec/util.go @@ -3,8 +3,15 @@ package httpexec import ( "github.com/mesos/mesos-go/api/v1/lib/client" "github.com/mesos/mesos-go/api/v1/lib/executor" + "github.com/mesos/mesos-go/api/v1/lib/httpcli" ) -func classifyResponse(c *executor.Call) (client.ResponseClass, error) { - return client.ResponseClassAuto, nil // TODO(jdef) fix this, ResponseClassAuto is deprecated +func classifyResponse(c *executor.Call) (rc client.ResponseClass, err error) { + switch name := executor.Call_Type_name[int32(c.GetType())]; name { + case "", "UNKNOWN": + err = httpcli.ProtocolError("unsupported call type") + default: + rc = client.ResponseClassAuto // TODO(jdef) fix this, ResponseClassAuto is deprecated + } + return } From 2b79cc3ceab79fac2e776e09a592e5f4659201b1 Mon Sep 17 00:00:00 2001 From: James DeFelice Date: Mon, 18 Sep 2017 12:42:47 +0000 Subject: [PATCH 10/10] gen/sender: fix whitespace issues --- api/v1/lib/agent/calls/calls_generated_test.go | 4 ++-- api/v1/lib/executor/calls/calls_sender_generated_test.go | 4 ++-- api/v1/lib/extras/gen/sender.go | 4 ++-- api/v1/lib/master/calls/calls_generated_test.go | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/api/v1/lib/agent/calls/calls_generated_test.go b/api/v1/lib/agent/calls/calls_generated_test.go index cdba8b97..65d9901b 100644 --- a/api/v1/lib/agent/calls/calls_generated_test.go +++ b/api/v1/lib/agent/calls/calls_generated_test.go @@ -5,11 +5,11 @@ package calls import ( "context" - "testing" + "testing" "github.com/mesos/mesos-go/api/v1/lib" - "github.com/mesos/mesos-go/api/v1/lib/agent" + "github.com/mesos/mesos-go/api/v1/lib/agent" ) func TestNonStreaming(t *testing.T) { diff --git a/api/v1/lib/executor/calls/calls_sender_generated_test.go b/api/v1/lib/executor/calls/calls_sender_generated_test.go index fb5b226c..b67c5332 100644 --- a/api/v1/lib/executor/calls/calls_sender_generated_test.go +++ b/api/v1/lib/executor/calls/calls_sender_generated_test.go @@ -5,11 +5,11 @@ package calls import ( "context" - "testing" + "testing" "github.com/mesos/mesos-go/api/v1/lib" - "github.com/mesos/mesos-go/api/v1/lib/executor" + "github.com/mesos/mesos-go/api/v1/lib/executor" ) func TestNonStreaming(t *testing.T) { diff --git a/api/v1/lib/extras/gen/sender.go b/api/v1/lib/extras/gen/sender.go index 8e3145ca..b9e612fc 100644 --- a/api/v1/lib/extras/gen/sender.go +++ b/api/v1/lib/extras/gen/sender.go @@ -173,11 +173,11 @@ var testTemplate = template.Must(template.New("").Parse(`package {{.Package}} import ( "context" - "testing" + "testing" "github.com/mesos/mesos-go/api/v1/lib" {{range .Imports}} - {{ printf "%q" . -}} + {{ printf "%q" . -}} {{end}} ) diff --git a/api/v1/lib/master/calls/calls_generated_test.go b/api/v1/lib/master/calls/calls_generated_test.go index 113b6012..728548da 100644 --- a/api/v1/lib/master/calls/calls_generated_test.go +++ b/api/v1/lib/master/calls/calls_generated_test.go @@ -5,11 +5,11 @@ package calls import ( "context" - "testing" + "testing" "github.com/mesos/mesos-go/api/v1/lib" - "github.com/mesos/mesos-go/api/v1/lib/master" + "github.com/mesos/mesos-go/api/v1/lib/master" ) func TestNonStreaming(t *testing.T) {