Skip to content

Commit

Permalink
Merge 70084cf into 43f0fa5
Browse files Browse the repository at this point in the history
  • Loading branch information
jdef committed Apr 16, 2017
2 parents 43f0fa5 + 70084cf commit bf80b40
Show file tree
Hide file tree
Showing 11 changed files with 238 additions and 54 deletions.
9 changes: 8 additions & 1 deletion api/v1/cmd/example-executor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ func main() {
os.Exit(0)
}

func maybeReconnect(cfg config.Config) <-chan struct{} {
if cfg.Checkpoint {
return backoff.Notifier(1*time.Second, cfg.SubscriptionBackoffMax*4/3, nil)
}
return nil
}

func run(cfg config.Config) {
var (
apiURL = url.URL{
Expand All @@ -58,7 +65,7 @@ func run(cfg config.Config) {
failedTasks: make(map[mesos.TaskID]mesos.TaskStatus),
}
subscribe = calls.Subscribe(nil, nil).With(state.callOptions...)
shouldReconnect = backoff.Notifier(1*time.Second, cfg.SubscriptionBackoffMax*4/3, nil)
shouldReconnect = maybeReconnect(cfg)
disconnected = time.Now()
handler = buildEventHandler(state)
)
Expand Down
54 changes: 33 additions & 21 deletions api/v1/cmd/example-scheduler/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,28 @@ func Run(cfg Config) error {
}

func buildControllerConfig(state *internalState, shutdown <-chan struct{}) controller.Config {
controlContext := &controller.ContextAdapter{
DoneFunc: func() bool { return state.done },
FrameworkIDFunc: func() string { return state.frameworkID },
ErrorFunc: func(err error) {
if err != nil && err != io.EOF {
log.Println(err)
} else {
log.Println("disconnected")
}
},
}
var (
frameworkIDStore = NewInMemoryIDStore()
controlContext = &controller.ContextAdapter{
DoneFunc: func() bool { return state.done },
// not called concurrently w/ subscription events, don't worry about using the atomic
FrameworkIDFunc: func() string { return frameworkIDStore.Get() },
ErrorFunc: func(err error) {
if err != nil && err != io.EOF {
log.Println(err)
} else {
log.Println("disconnected")
}
},
}
)

state.cli = calls.Decorators{
callMetrics(state.metricsAPI, time.Now, state.config.summaryMetrics),
logCalls(map[scheduler.Call_Type]string{scheduler.Call_SUBSCRIBE: "connecting..."}),
// automatically set the frameworkID for all outgoing calls
calls.When(func() bool { return frameworkIDStore.Get() != "" },
calls.FrameworkCaller(func() string { return frameworkIDStore.Get() })),
}.Apply(state.cli)

return controller.Config{
Expand All @@ -68,12 +75,12 @@ func buildControllerConfig(state *internalState, shutdown <-chan struct{}) contr
Handler: events.Decorators{
eventMetrics(state.metricsAPI, time.Now, state.config.summaryMetrics),
events.Decorator(logAllEvents).If(state.config.verbose),
}.Apply(buildEventHandler(state)),
}.Apply(buildEventHandler(state, frameworkIDStore)),
}
}

// buildEventHandler generates and returns a handler to process events received from the subscription.
func buildEventHandler(state *internalState) events.Handler {
func buildEventHandler(state *internalState, frameworkIDStore IDStore) events.Handler {
// TODO(jdef) would be nice to merge this ack handler with the status update handler below; need to
// figure out appropriate error propagation among chained handlers.
ack := events.AcknowledgeUpdates(func() calls.Caller { return state.cli })
Expand Down Expand Up @@ -104,19 +111,24 @@ func buildEventHandler(state *internalState) events.Handler {
statusUpdate(state, e.GetUpdate().GetStatus())
return nil
},
scheduler.Event_SUBSCRIBED: func(e *scheduler.Event) (err error) {
scheduler.Event_SUBSCRIBED: func(e *scheduler.Event) error {
log.Println("received a SUBSCRIBED event")
if state.frameworkID == "" {
state.frameworkID = e.GetSubscribed().GetFrameworkID().GetValue()
frameworkID := e.GetSubscribed().GetFrameworkID().GetValue()
if state.frameworkID == "" || state.frameworkID != frameworkID {
if state.frameworkID != "" && state.frameworkID != frameworkID && state.config.checkpoint {
state.done = true // TODO(jdef) not goroutine safe
return errors.New("frameworkID changed unexpectedly; failover may be broken")
}
state.frameworkID = frameworkID
if state.frameworkID == "" {
// sanity check
err = errors.New("mesos gave us an empty frameworkID")
} else {
// automatically set the frameworkID for all outgoing calls
state.cli = calls.FrameworkCaller(state.frameworkID).Apply(state.cli)
state.done = true // TODO(jdef) not goroutine safe
return errors.New("mesos gave us an empty frameworkID")
}
log.Println("FrameworkID", frameworkID)
frameworkIDStore.Set(frameworkID)
}
return
return nil
},
}),
)
Expand Down
3 changes: 3 additions & 0 deletions api/v1/cmd/example-scheduler/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Config struct {
url string
codec codec
timeout time.Duration
failoverTimeout time.Duration
checkpoint bool
principal string
hostname string
Expand Down Expand Up @@ -47,6 +48,7 @@ func (cfg *Config) AddFlags(fs *flag.FlagSet) {
fs.Var(&cfg.codec, "codec", "Codec to encode/decode scheduler API communications [protobuf, json]")
fs.StringVar(&cfg.url, "url", cfg.url, "Mesos scheduler API URL")
fs.DurationVar(&cfg.timeout, "timeout", cfg.timeout, "Mesos scheduler API connection timeout")
fs.DurationVar(&cfg.failoverTimeout, "failoverTimeout", cfg.failoverTimeout, "Framework failover timeout")
fs.BoolVar(&cfg.checkpoint, "checkpoint", cfg.checkpoint, "Enable/disable framework checkpointing")
fs.StringVar(&cfg.principal, "principal", cfg.principal, "Framework principal with which to authenticate")
fs.StringVar(&cfg.hostname, "hostname", cfg.hostname, "Framework hostname that is advertised to the master")
Expand Down Expand Up @@ -84,6 +86,7 @@ func NewConfig() Config {
url: env("MESOS_MASTER_HTTP", "http://:5050/api/v1/scheduler"),
codec: codec{Codec: &encoding.ProtobufCodec},
timeout: envDuration("MESOS_CONNECT_TIMEOUT", "20s"),
failoverTimeout: envDuration("FRAMEWORK_FAILOVER_TIMEOUT", "1000h"),
checkpoint: true,
server: server{address: env("LIBPROCESS_IP", "127.0.0.1")},
tasks: envInt("NUM_TASKS", "5"),
Expand Down
4 changes: 4 additions & 0 deletions api/v1/cmd/example-scheduler/app/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,15 @@ func buildHTTPSched(cfg Config, creds credentials) calls.Caller {
}

func buildFrameworkInfo(cfg Config) *mesos.FrameworkInfo {
failoverTimeout := cfg.failoverTimeout.Seconds()
frameworkInfo := &mesos.FrameworkInfo{
User: cfg.user,
Name: cfg.name,
Checkpoint: &cfg.checkpoint,
}
if cfg.failoverTimeout > 0 {
frameworkInfo.FailoverTimeout = &failoverTimeout
}
if cfg.role != "" {
frameworkInfo.Role = &cfg.role
}
Expand Down
43 changes: 43 additions & 0 deletions api/v1/cmd/example-scheduler/app/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package app

import "sync/atomic"

// IDStore is a thread-safe abstraction to load and store a stringified ID.
type IDStore interface {
Get() string
Set(string)
}

type IDStoreAdapter struct {
GetFunc func() string
SetFunc func(string)
}

func (a IDStoreAdapter) Get() string {
if a.GetFunc != nil {
return a.GetFunc()
}
return ""
}

func (a IDStoreAdapter) Set(s string) {
if a.SetFunc != nil {
a.SetFunc(s)
}
}

func NewInMemoryIDStore() IDStore {
var frameworkID atomic.Value
return &IDStoreAdapter{
GetFunc: func() string {
x := frameworkID.Load()
if x == nil {
return ""
}
return x.(string)
},
SetFunc: func(s string) {
frameworkID.Store(s)
},
}
}
24 changes: 24 additions & 0 deletions api/v1/lib/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,27 @@ type Response interface {
io.Closer
Decoder() encoding.Decoder
}

// ResponseWrapper delegates to optional handler funcs for invocations of Response methods.
type ResponseWrapper struct {
Response Response
CloseFunc func() error
DecoderFunc func() encoding.Decoder
}

func (wrapper *ResponseWrapper) Close() error {
if wrapper.CloseFunc != nil {
return wrapper.CloseFunc()
}
if wrapper.Response != nil {
return wrapper.Response.Close()
}
return nil
}

func (wrapper *ResponseWrapper) Decoder() encoding.Decoder {
if wrapper.DecoderFunc != nil {
return wrapper.DecoderFunc()
}
return wrapper.Response.Decoder()
}
14 changes: 10 additions & 4 deletions api/v1/lib/extras/scheduler/controller/controller.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package controller

import (
"fmt"

"github.com/mesos/mesos-go/api/v1/lib"
"github.com/mesos/mesos-go/api/v1/lib/encoding"
"github.com/mesos/mesos-go/api/v1/lib/scheduler"
Expand Down Expand Up @@ -78,7 +76,9 @@ func (_ *controllerImpl) Run(config Config) (lastErr error) {
for !config.Context.Done() {
frameworkID := config.Context.FrameworkID()
if config.Framework.GetFailoverTimeout() > 0 && frameworkID != "" {
subscribe.Subscribe.FrameworkInfo.ID = &mesos.FrameworkID{Value: frameworkID}
frameworkProto := &mesos.FrameworkID{Value: frameworkID}
subscribe.Subscribe.FrameworkInfo.ID = frameworkProto
subscribe.FrameworkID = frameworkProto
}
<-config.RegistrationTokens
resp, err := config.Caller.Call(subscribe)
Expand Down Expand Up @@ -131,13 +131,19 @@ func (ca *ContextAdapter) Error(err error) {
}
}

type SchedulerError string

func (e SchedulerError) Error() string {
return string(e)
}

// DefaultHandler provides the minimum implementation required for correct controller behavior.
func DefaultHandler(e *scheduler.Event) (err error) {
if e.GetType() == scheduler.Event_ERROR {
// it's recommended that we abort and re-try subscribing; returning an
// error here will cause the event loop to terminate and the connection
// will be reset.
err = fmt.Errorf("ERROR: %q", e.GetError().GetMessage())
err = SchedulerError(e.GetError().GetMessage())
}
return
}

0 comments on commit bf80b40

Please sign in to comment.