Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: Sender API support, updated example-executor #325

Merged
merged 10 commits into from
Sep 18, 2017
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ sync:
(cd ${CMD_VENDOR}; govendor sync)

.PHONY: generate
generate: GENERATE_PACKAGES = ./api/v1/lib/extras/executor/eventrules ./api/v1/lib/extras/executor/callrules ./api/v1/lib/extras/scheduler/eventrules ./api/v1/lib/extras/scheduler/callrules ./api/v1/lib/executor/events ./api/v1/lib/executor/calls ./api/v1/lib/scheduler/events ./api/v1/lib/scheduler/calls ./api/v1/lib/agent/calls ./api/v1/lib/master/calls ./api/v1/lib/httpcli/httpagent ./api/v1/lib/httpcli/httpmaster
generate: GENERATE_PACKAGES = ./api/v1/lib/extras/executor/eventrules ./api/v1/lib/extras/executor/callrules ./api/v1/lib/extras/scheduler/eventrules ./api/v1/lib/extras/scheduler/callrules ./api/v1/lib/executor/events ./api/v1/lib/executor/calls ./api/v1/lib/scheduler/events ./api/v1/lib/scheduler/calls ./api/v1/lib/agent/calls ./api/v1/lib/master/calls ./api/v1/lib/httpcli/httpagent ./api/v1/lib/httpcli/httpmaster ./api/v1/lib/httpcli/httpexec
generate:
go generate -x ${GENERATE_PACKAGES}

Expand Down
82 changes: 40 additions & 42 deletions api/v1/cmd/example-executor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/mesos/mesos-go/api/v1/lib/executor/config"
"github.com/mesos/mesos-go/api/v1/lib/executor/events"
"github.com/mesos/mesos-go/api/v1/lib/httpcli"
"github.com/mesos/mesos-go/api/v1/lib/httpcli/httpexec"
"github.com/pborman/uuid"
)

Expand Down Expand Up @@ -52,32 +53,38 @@ func run(cfg config.Config) {
Host: cfg.AgentEndpoint,
Path: apiPath,
}
http = httpcli.New(
httpcli.Endpoint(apiURL.String()),
httpcli.Codec(codecs.ByMediaType[codecs.MediaTypeProtobuf]),
httpcli.Do(httpcli.With(httpcli.Timeout(httpTimeout))),
)
callOptions = executor.CallOptions{
calls.Framework(cfg.FrameworkID),
calls.Executor(cfg.ExecutorID),
}
state = &internalState{
cli: httpcli.New(
httpcli.Endpoint(apiURL.String()),
httpcli.Codec(codecs.ByMediaType[codecs.MediaTypeProtobuf]),
httpcli.Do(httpcli.With(httpcli.Timeout(httpTimeout))),
cli: calls.SenderWith(
httpexec.NewSender(http.Send),
callOptions...,
),
callOptions: executor.CallOptions{
calls.Framework(cfg.FrameworkID),
calls.Executor(cfg.ExecutorID),
},
unackedTasks: make(map[mesos.TaskID]mesos.TaskInfo),
unackedUpdates: make(map[string]executor.Call_Update),
failedTasks: make(map[mesos.TaskID]mesos.TaskStatus),
}
subscribe = calls.Subscribe(nil, nil).With(state.callOptions...)
subscriber = calls.SenderWith(
httpexec.NewSender(http.Send, httpcli.Close(true)),
callOptions...,
)
shouldReconnect = maybeReconnect(cfg)
disconnected = time.Now()
handler = buildEventHandler(state)
)
for {
subscribe = subscribe.With(
unacknowledgedTasks(state),
unacknowledgedUpdates(state),
)
func() {
resp, err := state.cli.Do(subscribe, httpcli.Close(true))
subscribe := calls.Subscribe(unacknowledgedTasks(state), unacknowledgedUpdates(state))

log.Println("subscribing to agent for events..")
resp, err := subscriber.Send(context.TODO(), calls.NonStreaming(subscribe))
if resp != nil {
defer resp.Close()
}
Expand All @@ -104,43 +111,35 @@ func run(cfg config.Config) {
log.Printf("failed to re-establish subscription with agent within %v, aborting", cfg.RecoveryTimeout)
return
}
log.Println("waiting for reconnect timeout")
<-shouldReconnect // wait for some amount of time before retrying subscription
}
}

// unacknowledgedTasks is a functional option that sets the value of the UnacknowledgedTasks
// field of a Subscribe call.
func unacknowledgedTasks(state *internalState) executor.CallOpt {
return func(call *executor.Call) {
if n := len(state.unackedTasks); n > 0 {
unackedTasks := make([]mesos.TaskInfo, 0, n)
for k := range state.unackedTasks {
unackedTasks = append(unackedTasks, state.unackedTasks[k])
}
call.Subscribe.UnacknowledgedTasks = unackedTasks
} else {
call.Subscribe.UnacknowledgedTasks = nil
// unacknowledgedTasks generates the value of the UnacknowledgedTasks field of a Subscribe call.
func unacknowledgedTasks(state *internalState) (result []mesos.TaskInfo) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think unacknowledgedTasks should be a function on *internalState but it's a minor issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could go either way. Maybe tackle in a follow-up PR

if n := len(state.unackedTasks); n > 0 {
result = make([]mesos.TaskInfo, 0, n)
for k := range state.unackedTasks {
result = append(result, state.unackedTasks[k])
}
}
return
}

// unacknowledgedUpdates is a functional option that sets the value of the UnacknowledgedUpdates
// field of a Subscribe call.
func unacknowledgedUpdates(state *internalState) executor.CallOpt {
return func(call *executor.Call) {
if n := len(state.unackedUpdates); n > 0 {
unackedUpdates := make([]executor.Call_Update, 0, n)
for k := range state.unackedUpdates {
unackedUpdates = append(unackedUpdates, state.unackedUpdates[k])
}
call.Subscribe.UnacknowledgedUpdates = unackedUpdates
} else {
call.Subscribe.UnacknowledgedUpdates = nil
// unacknowledgedUpdates generates the value of the UnacknowledgedUpdates field of a Subscribe call.
func unacknowledgedUpdates(state *internalState) (result []executor.Call_Update) {
if n := len(state.unackedUpdates); n > 0 {
result = make([]executor.Call_Update, 0, n)
for k := range state.unackedUpdates {
result = append(result, state.unackedUpdates[k])
}
}
return
}

func eventLoop(state *internalState, decoder encoding.Decoder, h events.Handler) (err error) {
log.Println("listening for events from agent...")
ctx := context.TODO()
for err == nil && !state.shouldQuit {
// housekeeping
Expand Down Expand Up @@ -239,8 +238,8 @@ func launch(state *internalState, task mesos.TaskInfo) {
func protoString(s string) *string { return &s }

func update(state *internalState, status mesos.TaskStatus) error {
upd := calls.Update(status).With(state.callOptions...)
resp, err := state.cli.Do(upd)
upd := calls.Update(status)
resp, err := state.cli.Send(context.TODO(), calls.NonStreaming(upd))
if resp != nil {
resp.Close()
}
Expand All @@ -263,8 +262,7 @@ func newStatus(state *internalState, id mesos.TaskID) mesos.TaskStatus {
}

type internalState struct {
callOptions executor.CallOptions
cli *httpcli.Client
cli calls.Sender
cfg config.Config
framework mesos.FrameworkInfo
executor mesos.ExecutorInfo
Expand Down
4 changes: 2 additions & 2 deletions api/v1/lib/agent/calls/calls_generated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ package calls

import (
"context"
"testing"
"testing"

"github.com/mesos/mesos-go/api/v1/lib"

"github.com/mesos/mesos-go/api/v1/lib/agent"
"github.com/mesos/mesos-go/api/v1/lib/agent"
)

func TestNonStreaming(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package calls

// go generate -import github.com/mesos/mesos-go/api/v1/lib/executor -type C:*executor.Call
// go generate -import github.com/mesos/mesos-go/api/v1/lib/executor -type C:*executor.Call -output calls_caller_generated.go
// GENERATED CODE FOLLOWS; DO NOT EDIT.

import (
Expand Down
150 changes: 150 additions & 0 deletions api/v1/lib/executor/calls/calls_sender_generated.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package calls

// go generate -import github.com/mesos/mesos-go/api/v1/lib/executor -type C:executor.Call -type O:executor.CallOpt -output calls_sender_generated.go
// GENERATED CODE FOLLOWS; DO NOT EDIT.

import (
"context"

"github.com/mesos/mesos-go/api/v1/lib"
"github.com/mesos/mesos-go/api/v1/lib/encoding"

"github.com/mesos/mesos-go/api/v1/lib/executor"
)

type (
// Request generates a Call that's sent to a Mesos agent. Subsequent invocations are expected to
// yield equivalent calls. Intended for use w/ non-streaming requests to an agent.
Request interface {
Call() *executor.Call
}

// RequestFunc is the functional adaptation of Request.
RequestFunc func() *executor.Call

// RequestStreaming generates a Call that's send to a Mesos agent. Subsequent invocations MAY generate
// different Call objects. No more Call objects are expected once a nil is returned to signal the end of
// of the request stream.
RequestStreaming interface {
Request
IsStreaming()
}

// RequestStreamingFunc is the functional adaptation of RequestStreaming.
RequestStreamingFunc func() *executor.Call

// Send issues a Request to a Mesos agent and properly manages Call-specific mechanics.
Sender interface {
Send(context.Context, Request) (mesos.Response, error)
}

// SenderFunc is the functional adaptation of the Sender interface
SenderFunc func(context.Context, Request) (mesos.Response, error)
)

func (f RequestFunc) Call() *executor.Call { return f() }

func (f RequestFunc) Marshaler() encoding.Marshaler {
// avoid returning (*executor.Call)(nil) for interface type
if call := f(); call != nil {
return call
}
return nil
}

func (f RequestStreamingFunc) Push(c ...*executor.Call) RequestStreamingFunc { return Push(f, c...) }

func (f RequestStreamingFunc) Marshaler() encoding.Marshaler {
// avoid returning (*executor.Call)(nil) for interface type
if call := f(); call != nil {
return call
}
return nil
}

func (f RequestStreamingFunc) IsStreaming() {}

func (f RequestStreamingFunc) Call() *executor.Call { return f() }

// Push prepends one or more calls onto a request stream. If no calls are given then the original stream is returned.
func Push(r RequestStreaming, c ...*executor.Call) RequestStreamingFunc {
return func() *executor.Call {
if len(c) == 0 {
return r.Call()
}
head := c[0]
c = c[1:]
return head
}
}

// Empty generates a stream that always returns nil.
func Empty() RequestStreamingFunc { return func() *executor.Call { return nil } }

var (
_ = Request(RequestFunc(nil))
_ = RequestStreaming(RequestStreamingFunc(nil))
_ = Sender(SenderFunc(nil))
)

// NonStreaming returns a RequestFunc that always generates the same Call.
func NonStreaming(c *executor.Call) RequestFunc { return func() *executor.Call { return c } }

// FromChan returns a streaming request that fetches calls from the given channel until it closes.
// If a nil chan is specified then the returned func will always generate nil.
func FromChan(ch <-chan *executor.Call) RequestStreamingFunc {
if ch == nil {
// avoid blocking forever if we're handed a nil chan
return func() *executor.Call { return nil }
}
return func() *executor.Call {
if m, ok := <-ch; ok {
return m
}
return nil
}
}

// Send implements the Sender interface for SenderFunc
func (f SenderFunc) Send(ctx context.Context, r Request) (mesos.Response, error) {
return f(ctx, r)
}

// IgnoreResponse generates a sender that closes any non-nil response received by Mesos.
func IgnoreResponse(s Sender) SenderFunc {
return func(ctx context.Context, r Request) (mesos.Response, error) {
resp, err := s.Send(ctx, r)
if resp != nil {
resp.Close()
}
return nil, err
}
}

// SendNoData is a convenience func that executes the given Call using the provided Sender
// and always drops the response data.
func SendNoData(ctx context.Context, sender Sender, r Request) (err error) {
_, err = IgnoreResponse(sender).Send(ctx, r)
return
}

// SendWith injects the given options for all calls.
func SenderWith(s Sender, opts ...executor.CallOpt) SenderFunc {
if len(opts) == 0 {
return s.Send
}
return func(ctx context.Context, r Request) (mesos.Response, error) {
f := func() (c *executor.Call) {
if c = r.Call(); c != nil {
c = c.With(opts...)
}
return
}
switch r.(type) {
case RequestStreaming:
return s.Send(ctx, RequestStreamingFunc(f))
default:
return s.Send(ctx, RequestFunc(f))
}
}
}
Loading