Skip to content

Commit

Permalink
Merge pull request #325 from mesos/jdef_executor_sender
Browse files Browse the repository at this point in the history
executor: Sender API support, updated example-executor
  • Loading branch information
jdef committed Sep 18, 2017
2 parents 7a9076f + 2b79cc3 commit 562dd0e
Show file tree
Hide file tree
Showing 16 changed files with 516 additions and 57 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ sync:
(cd ${CMD_VENDOR}; govendor sync)

.PHONY: generate
generate: GENERATE_PACKAGES = ./api/v1/lib/extras/executor/eventrules ./api/v1/lib/extras/executor/callrules ./api/v1/lib/extras/scheduler/eventrules ./api/v1/lib/extras/scheduler/callrules ./api/v1/lib/executor/events ./api/v1/lib/executor/calls ./api/v1/lib/scheduler/events ./api/v1/lib/scheduler/calls ./api/v1/lib/agent/calls ./api/v1/lib/master/calls ./api/v1/lib/httpcli/httpagent ./api/v1/lib/httpcli/httpmaster
generate: GENERATE_PACKAGES = ./api/v1/lib/extras/executor/eventrules ./api/v1/lib/extras/executor/callrules ./api/v1/lib/extras/scheduler/eventrules ./api/v1/lib/extras/scheduler/callrules ./api/v1/lib/executor/events ./api/v1/lib/executor/calls ./api/v1/lib/scheduler/events ./api/v1/lib/scheduler/calls ./api/v1/lib/agent/calls ./api/v1/lib/master/calls ./api/v1/lib/httpcli/httpagent ./api/v1/lib/httpcli/httpmaster ./api/v1/lib/httpcli/httpexec
generate:
go generate -x ${GENERATE_PACKAGES}

Expand Down
82 changes: 40 additions & 42 deletions api/v1/cmd/example-executor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/mesos/mesos-go/api/v1/lib/executor/config"
"github.com/mesos/mesos-go/api/v1/lib/executor/events"
"github.com/mesos/mesos-go/api/v1/lib/httpcli"
"github.com/mesos/mesos-go/api/v1/lib/httpcli/httpexec"
"github.com/pborman/uuid"
)

Expand Down Expand Up @@ -52,32 +53,38 @@ func run(cfg config.Config) {
Host: cfg.AgentEndpoint,
Path: apiPath,
}
http = httpcli.New(
httpcli.Endpoint(apiURL.String()),
httpcli.Codec(codecs.ByMediaType[codecs.MediaTypeProtobuf]),
httpcli.Do(httpcli.With(httpcli.Timeout(httpTimeout))),
)
callOptions = executor.CallOptions{
calls.Framework(cfg.FrameworkID),
calls.Executor(cfg.ExecutorID),
}
state = &internalState{
cli: httpcli.New(
httpcli.Endpoint(apiURL.String()),
httpcli.Codec(codecs.ByMediaType[codecs.MediaTypeProtobuf]),
httpcli.Do(httpcli.With(httpcli.Timeout(httpTimeout))),
cli: calls.SenderWith(
httpexec.NewSender(http.Send),
callOptions...,
),
callOptions: executor.CallOptions{
calls.Framework(cfg.FrameworkID),
calls.Executor(cfg.ExecutorID),
},
unackedTasks: make(map[mesos.TaskID]mesos.TaskInfo),
unackedUpdates: make(map[string]executor.Call_Update),
failedTasks: make(map[mesos.TaskID]mesos.TaskStatus),
}
subscribe = calls.Subscribe(nil, nil).With(state.callOptions...)
subscriber = calls.SenderWith(
httpexec.NewSender(http.Send, httpcli.Close(true)),
callOptions...,
)
shouldReconnect = maybeReconnect(cfg)
disconnected = time.Now()
handler = buildEventHandler(state)
)
for {
subscribe = subscribe.With(
unacknowledgedTasks(state),
unacknowledgedUpdates(state),
)
func() {
resp, err := state.cli.Do(subscribe, httpcli.Close(true))
subscribe := calls.Subscribe(unacknowledgedTasks(state), unacknowledgedUpdates(state))

log.Println("subscribing to agent for events..")
resp, err := subscriber.Send(context.TODO(), calls.NonStreaming(subscribe))
if resp != nil {
defer resp.Close()
}
Expand All @@ -104,43 +111,35 @@ func run(cfg config.Config) {
log.Printf("failed to re-establish subscription with agent within %v, aborting", cfg.RecoveryTimeout)
return
}
log.Println("waiting for reconnect timeout")
<-shouldReconnect // wait for some amount of time before retrying subscription
}
}

// unacknowledgedTasks is a functional option that sets the value of the UnacknowledgedTasks
// field of a Subscribe call.
func unacknowledgedTasks(state *internalState) executor.CallOpt {
return func(call *executor.Call) {
if n := len(state.unackedTasks); n > 0 {
unackedTasks := make([]mesos.TaskInfo, 0, n)
for k := range state.unackedTasks {
unackedTasks = append(unackedTasks, state.unackedTasks[k])
}
call.Subscribe.UnacknowledgedTasks = unackedTasks
} else {
call.Subscribe.UnacknowledgedTasks = nil
// unacknowledgedTasks generates the value of the UnacknowledgedTasks field of a Subscribe call.
func unacknowledgedTasks(state *internalState) (result []mesos.TaskInfo) {
if n := len(state.unackedTasks); n > 0 {
result = make([]mesos.TaskInfo, 0, n)
for k := range state.unackedTasks {
result = append(result, state.unackedTasks[k])
}
}
return
}

// unacknowledgedUpdates is a functional option that sets the value of the UnacknowledgedUpdates
// field of a Subscribe call.
func unacknowledgedUpdates(state *internalState) executor.CallOpt {
return func(call *executor.Call) {
if n := len(state.unackedUpdates); n > 0 {
unackedUpdates := make([]executor.Call_Update, 0, n)
for k := range state.unackedUpdates {
unackedUpdates = append(unackedUpdates, state.unackedUpdates[k])
}
call.Subscribe.UnacknowledgedUpdates = unackedUpdates
} else {
call.Subscribe.UnacknowledgedUpdates = nil
// unacknowledgedUpdates generates the value of the UnacknowledgedUpdates field of a Subscribe call.
func unacknowledgedUpdates(state *internalState) (result []executor.Call_Update) {
if n := len(state.unackedUpdates); n > 0 {
result = make([]executor.Call_Update, 0, n)
for k := range state.unackedUpdates {
result = append(result, state.unackedUpdates[k])
}
}
return
}

func eventLoop(state *internalState, decoder encoding.Decoder, h events.Handler) (err error) {
log.Println("listening for events from agent...")
ctx := context.TODO()
for err == nil && !state.shouldQuit {
// housekeeping
Expand Down Expand Up @@ -239,8 +238,8 @@ func launch(state *internalState, task mesos.TaskInfo) {
func protoString(s string) *string { return &s }

func update(state *internalState, status mesos.TaskStatus) error {
upd := calls.Update(status).With(state.callOptions...)
resp, err := state.cli.Do(upd)
upd := calls.Update(status)
resp, err := state.cli.Send(context.TODO(), calls.NonStreaming(upd))
if resp != nil {
resp.Close()
}
Expand All @@ -263,8 +262,7 @@ func newStatus(state *internalState, id mesos.TaskID) mesos.TaskStatus {
}

type internalState struct {
callOptions executor.CallOptions
cli *httpcli.Client
cli calls.Sender
cfg config.Config
framework mesos.FrameworkInfo
executor mesos.ExecutorInfo
Expand Down
4 changes: 2 additions & 2 deletions api/v1/lib/agent/calls/calls_generated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ package calls

import (
"context"
"testing"
"testing"

"github.com/mesos/mesos-go/api/v1/lib"

"github.com/mesos/mesos-go/api/v1/lib/agent"
"github.com/mesos/mesos-go/api/v1/lib/agent"
)

func TestNonStreaming(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package calls

// go generate -import github.com/mesos/mesos-go/api/v1/lib/executor -type C:*executor.Call
// go generate -import github.com/mesos/mesos-go/api/v1/lib/executor -type C:*executor.Call -output calls_caller_generated.go
// GENERATED CODE FOLLOWS; DO NOT EDIT.

import (
Expand Down
150 changes: 150 additions & 0 deletions api/v1/lib/executor/calls/calls_sender_generated.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package calls

// go generate -import github.com/mesos/mesos-go/api/v1/lib/executor -type C:executor.Call -type O:executor.CallOpt -output calls_sender_generated.go
// GENERATED CODE FOLLOWS; DO NOT EDIT.

import (
"context"

"github.com/mesos/mesos-go/api/v1/lib"
"github.com/mesos/mesos-go/api/v1/lib/encoding"

"github.com/mesos/mesos-go/api/v1/lib/executor"
)

type (
// Request generates a Call that's sent to a Mesos agent. Subsequent invocations are expected to
// yield equivalent calls. Intended for use w/ non-streaming requests to an agent.
Request interface {
Call() *executor.Call
}

// RequestFunc is the functional adaptation of Request.
RequestFunc func() *executor.Call

// RequestStreaming generates a Call that's send to a Mesos agent. Subsequent invocations MAY generate
// different Call objects. No more Call objects are expected once a nil is returned to signal the end of
// of the request stream.
RequestStreaming interface {
Request
IsStreaming()
}

// RequestStreamingFunc is the functional adaptation of RequestStreaming.
RequestStreamingFunc func() *executor.Call

// Send issues a Request to a Mesos agent and properly manages Call-specific mechanics.
Sender interface {
Send(context.Context, Request) (mesos.Response, error)
}

// SenderFunc is the functional adaptation of the Sender interface
SenderFunc func(context.Context, Request) (mesos.Response, error)
)

func (f RequestFunc) Call() *executor.Call { return f() }

func (f RequestFunc) Marshaler() encoding.Marshaler {
// avoid returning (*executor.Call)(nil) for interface type
if call := f(); call != nil {
return call
}
return nil
}

func (f RequestStreamingFunc) Push(c ...*executor.Call) RequestStreamingFunc { return Push(f, c...) }

func (f RequestStreamingFunc) Marshaler() encoding.Marshaler {
// avoid returning (*executor.Call)(nil) for interface type
if call := f(); call != nil {
return call
}
return nil
}

func (f RequestStreamingFunc) IsStreaming() {}

func (f RequestStreamingFunc) Call() *executor.Call { return f() }

// Push prepends one or more calls onto a request stream. If no calls are given then the original stream is returned.
func Push(r RequestStreaming, c ...*executor.Call) RequestStreamingFunc {
return func() *executor.Call {
if len(c) == 0 {
return r.Call()
}
head := c[0]
c = c[1:]
return head
}
}

// Empty generates a stream that always returns nil.
func Empty() RequestStreamingFunc { return func() *executor.Call { return nil } }

var (
_ = Request(RequestFunc(nil))
_ = RequestStreaming(RequestStreamingFunc(nil))
_ = Sender(SenderFunc(nil))
)

// NonStreaming returns a RequestFunc that always generates the same Call.
func NonStreaming(c *executor.Call) RequestFunc { return func() *executor.Call { return c } }

// FromChan returns a streaming request that fetches calls from the given channel until it closes.
// If a nil chan is specified then the returned func will always generate nil.
func FromChan(ch <-chan *executor.Call) RequestStreamingFunc {
if ch == nil {
// avoid blocking forever if we're handed a nil chan
return func() *executor.Call { return nil }
}
return func() *executor.Call {
if m, ok := <-ch; ok {
return m
}
return nil
}
}

// Send implements the Sender interface for SenderFunc
func (f SenderFunc) Send(ctx context.Context, r Request) (mesos.Response, error) {
return f(ctx, r)
}

// IgnoreResponse generates a sender that closes any non-nil response received by Mesos.
func IgnoreResponse(s Sender) SenderFunc {
return func(ctx context.Context, r Request) (mesos.Response, error) {
resp, err := s.Send(ctx, r)
if resp != nil {
resp.Close()
}
return nil, err
}
}

// SendNoData is a convenience func that executes the given Call using the provided Sender
// and always drops the response data.
func SendNoData(ctx context.Context, sender Sender, r Request) (err error) {
_, err = IgnoreResponse(sender).Send(ctx, r)
return
}

// SendWith injects the given options for all calls.
func SenderWith(s Sender, opts ...executor.CallOpt) SenderFunc {
if len(opts) == 0 {
return s.Send
}
return func(ctx context.Context, r Request) (mesos.Response, error) {
f := func() (c *executor.Call) {
if c = r.Call(); c != nil {
c = c.With(opts...)
}
return
}
switch r.(type) {
case RequestStreaming:
return s.Send(ctx, RequestStreamingFunc(f))
default:
return s.Send(ctx, RequestFunc(f))
}
}
}
Loading

0 comments on commit 562dd0e

Please sign in to comment.