Skip to content

Commit

Permalink
improved failover for non-checkpointing scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
James DeFelice committed Apr 16, 2017
1 parent c4a1026 commit eaeca69
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 32 deletions.
9 changes: 8 additions & 1 deletion api/v1/cmd/example-executor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ func main() {
os.Exit(0)
}

func maybeReconnect(cfg config.Config) <-chan struct{} {
if cfg.Checkpoint {
return backoff.Notifier(1*time.Second, cfg.SubscriptionBackoffMax*4/3, nil)
}
return nil
}

func run(cfg config.Config) {
var (
apiURL = url.URL{
Expand All @@ -58,7 +65,7 @@ func run(cfg config.Config) {
failedTasks: make(map[mesos.TaskID]mesos.TaskStatus),
}
subscribe = calls.Subscribe(nil, nil).With(state.callOptions...)
shouldReconnect = backoff.Notifier(1*time.Second, cfg.SubscriptionBackoffMax*4/3, nil)
shouldReconnect = maybeReconnect(cfg)
disconnected = time.Now()
handler = buildEventHandler(state)
)
Expand Down
53 changes: 32 additions & 21 deletions api/v1/cmd/example-scheduler/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,28 @@ func Run(cfg Config) error {
}

func buildControllerConfig(state *internalState, shutdown <-chan struct{}) controller.Config {
controlContext := &controller.ContextAdapter{
DoneFunc: func() bool { return state.done },
FrameworkIDFunc: func() string { return state.frameworkID },
ErrorFunc: func(err error) {
if err != nil && err != io.EOF {
log.Println(err)
} else {
log.Println("disconnected")
}
},
}
var (
frameworkIDStore = NewInMemoryIDStore()
controlContext = &controller.ContextAdapter{
DoneFunc: func() bool { return state.done },
// not called concurrently w/ subscription events, don't worry about using the atomic
FrameworkIDFunc: func() string { return frameworkIDStore.Get() },
ErrorFunc: func(err error) {
if err != nil && err != io.EOF {
log.Println(err)
} else {
log.Println("disconnected")
}
},
}
)

state.cli = calls.Decorators{
callMetrics(state.metricsAPI, time.Now, state.config.summaryMetrics),
logCalls(map[scheduler.Call_Type]string{scheduler.Call_SUBSCRIBE: "connecting..."}),
// automatically set the frameworkID for all outgoing calls
calls.When(func() bool { return frameworkIDStore.Get() != "" },
calls.FrameworkCaller(func() string { return frameworkIDStore.Get() })),
}.Apply(state.cli)

return controller.Config{
Expand All @@ -68,12 +75,12 @@ func buildControllerConfig(state *internalState, shutdown <-chan struct{}) contr
Handler: events.Decorators{
eventMetrics(state.metricsAPI, time.Now, state.config.summaryMetrics),
events.Decorator(logAllEvents).If(state.config.verbose),
}.Apply(buildEventHandler(state)),
}.Apply(buildEventHandler(state, frameworkIDStore)),
}
}

// buildEventHandler generates and returns a handler to process events received from the subscription.
func buildEventHandler(state *internalState) events.Handler {
func buildEventHandler(state *internalState, frameworkIDStore IDStore) events.Handler {
// TODO(jdef) would be nice to merge this ack handler with the status update handler below; need to
// figure out appropriate error propagation among chained handlers.
ack := events.AcknowledgeUpdates(func() calls.Caller { return state.cli })
Expand Down Expand Up @@ -104,19 +111,23 @@ func buildEventHandler(state *internalState) events.Handler {
statusUpdate(state, e.GetUpdate().GetStatus())
return nil
},
scheduler.Event_SUBSCRIBED: func(e *scheduler.Event) (err error) {
scheduler.Event_SUBSCRIBED: func(e *scheduler.Event) error {
log.Println("received a SUBSCRIBED event")
if state.frameworkID == "" {
state.frameworkID = e.GetSubscribed().GetFrameworkID().GetValue()
frameworkID := e.GetSubscribed().GetFrameworkID().GetValue()
if state.frameworkID == "" || state.frameworkID != frameworkID {
if state.frameworkID != "" && state.frameworkID != frameworkID && state.config.checkpoint {
state.done = true // TODO(jdef) not goroutine safe
return errors.New("frameworkID changed unexpectedly; failover may be broken")
}
state.frameworkID = frameworkID
if state.frameworkID == "" {
// sanity check
err = errors.New("mesos gave us an empty frameworkID")
} else {
// automatically set the frameworkID for all outgoing calls
state.cli = calls.FrameworkCaller(state.frameworkID).Apply(state.cli)
state.done = true // TODO(jdef) not goroutine safe
return errors.New("mesos gave us an empty frameworkID")
}
frameworkIDStore.Set(frameworkID)
}
return
return nil
},
}),
)
Expand Down
3 changes: 3 additions & 0 deletions api/v1/cmd/example-scheduler/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Config struct {
url string
codec codec
timeout time.Duration
failoverTimeout time.Duration
checkpoint bool
principal string
hostname string
Expand Down Expand Up @@ -47,6 +48,7 @@ func (cfg *Config) AddFlags(fs *flag.FlagSet) {
fs.Var(&cfg.codec, "codec", "Codec to encode/decode scheduler API communications [protobuf, json]")
fs.StringVar(&cfg.url, "url", cfg.url, "Mesos scheduler API URL")
fs.DurationVar(&cfg.timeout, "timeout", cfg.timeout, "Mesos scheduler API connection timeout")
fs.DurationVar(&cfg.failoverTimeout, "failoverTimeout", cfg.failoverTimeout, "Framework failover timeout")
fs.BoolVar(&cfg.checkpoint, "checkpoint", cfg.checkpoint, "Enable/disable framework checkpointing")
fs.StringVar(&cfg.principal, "principal", cfg.principal, "Framework principal with which to authenticate")
fs.StringVar(&cfg.hostname, "hostname", cfg.hostname, "Framework hostname that is advertised to the master")
Expand Down Expand Up @@ -84,6 +86,7 @@ func NewConfig() Config {
url: env("MESOS_MASTER_HTTP", "http://:5050/api/v1/scheduler"),
codec: codec{Codec: &encoding.ProtobufCodec},
timeout: envDuration("MESOS_CONNECT_TIMEOUT", "20s"),
failoverTimeout: envDuration("FRAMEWORK_FAILOVER_TIMEOUT", "1000h"),
checkpoint: true,
server: server{address: env("LIBPROCESS_IP", "127.0.0.1")},
tasks: envInt("NUM_TASKS", "5"),
Expand Down
4 changes: 4 additions & 0 deletions api/v1/cmd/example-scheduler/app/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,15 @@ func buildHTTPSched(cfg Config, creds credentials) calls.Caller {
}

func buildFrameworkInfo(cfg Config) *mesos.FrameworkInfo {
failoverTimeout := cfg.failoverTimeout.Seconds()
frameworkInfo := &mesos.FrameworkInfo{
User: cfg.user,
Name: cfg.name,
Checkpoint: &cfg.checkpoint,
}
if cfg.failoverTimeout > 0 {
frameworkInfo.FailoverTimeout = &failoverTimeout
}
if cfg.role != "" {
frameworkInfo.Role = &cfg.role
}
Expand Down
43 changes: 43 additions & 0 deletions api/v1/cmd/example-scheduler/app/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package app

import "sync/atomic"

// IDStore is a thread-safe abstraction to load and store a stringified ID.
type IDStore interface {
Get() string
Set(string)
}

type IDStoreAdapter struct {
GetFunc func() string
SetFunc func(string)
}

func (a IDStoreAdapter) Get() string {
if a.GetFunc != nil {
return a.GetFunc()
}
return ""
}

func (a IDStoreAdapter) Set(s string) {
if a.SetFunc != nil {
a.SetFunc(s)
}
}

func NewInMemoryIDStore() IDStore {
var frameworkID atomic.Value
return &IDStoreAdapter{
GetFunc: func() string {
x := frameworkID.Load()
if x == nil {
return ""
}
return x.(string)
},
SetFunc: func(s string) {
frameworkID.Store(s)
},
}
}
24 changes: 24 additions & 0 deletions api/v1/lib/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,27 @@ type Response interface {
io.Closer
Decoder() encoding.Decoder
}

// ResponseWrapper delegates to optional handler funcs for invocations of Response methods.
type ResponseWrapper struct {
Response Response
CloseFunc func() error
DecoderFunc func() encoding.Decoder
}

func (wrapper *ResponseWrapper) Close() error {
if wrapper.CloseFunc != nil {
return wrapper.CloseFunc()
}
if wrapper.Response != nil {
return wrapper.Response.Close()
}
return nil
}

func (wrapper *ResponseWrapper) Decoder() encoding.Decoder {
if wrapper.DecoderFunc != nil {
return wrapper.DecoderFunc()
}
return wrapper.Response.Decoder()
}
5 changes: 4 additions & 1 deletion api/v1/lib/httpcli/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ type Client struct {
func New(opts ...Opt) *Client {
c := &Client{
codec: &encoding.ProtobufCodec,
do: With(),
do: With(DefaultConfigOpt...),
header: http.Header{},
errorMapper: defaultErrorMapper,
}
Expand Down Expand Up @@ -352,6 +352,9 @@ type Config struct {

type ConfigOpt func(*Config)

// DefaultConfigOpt represents the default client config options.
var DefaultConfigOpt []ConfigOpt

// With returns a DoFunc that executes HTTP round-trips.
// The default implementation provides reasonable defaults for timeouts:
// keep-alive, connection, request/response read/write, and TLS handshake.
Expand Down
2 changes: 1 addition & 1 deletion api/v1/lib/httpcli/httpsched/httpsched.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type (
redirect RedirectSettings
}

// Caller is the public interface this framework scheduler's should consume
// Caller is the public interface a framework scheduler's should consume
Caller interface {
calls.Caller
// httpDo is intentionally package-private; clients of this package may extend a Caller
Expand Down
31 changes: 26 additions & 5 deletions api/v1/lib/httpcli/httpsched/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

"github.com/mesos/mesos-go/api/v1/lib"
"github.com/mesos/mesos-go/api/v1/lib/encoding"
"github.com/mesos/mesos-go/api/v1/lib/httpcli"
"github.com/mesos/mesos-go/api/v1/lib/scheduler"
"github.com/mesos/mesos-go/api/v1/lib/scheduler/calls"
Expand Down Expand Up @@ -95,14 +96,34 @@ func disconnectedFn(state *state) stateFn {
)

// (c) execute the call, save the result in resp, err
state.resp, state.err = subscribeCaller.Call(state.call)
stateResp, stateErr := subscribeCaller.Call(state.call)
state.err = stateErr

// wrap the response: any errors processing the subscription stream should result in a
// transition to a disconnected state ASAP.
state.resp = &mesos.ResponseWrapper{
Response: stateResp,
DecoderFunc: func() encoding.Decoder {
decoder := stateResp.Decoder()
return func(u encoding.Unmarshaler) (err error) {
err = decoder(u)
if err != nil {
state.m.Lock()
state.fn = disconnectedFn
state.m.Unlock()
_ = stateResp.Close() // swallow any error here
}
return
}
},
}

// (d) if err != nil return unsubscribedFn
if state.err != nil {
// (d) if err != nil return disconnectedFn since we're unsubscribed
if stateErr != nil {
return disconnectedFn
}

// (e) else prepare callerTemporary w/ special header, return subscribingFn
// (e) else prepare callerTemporary w/ special header, return connectedFn since we're now subscribed
state.caller = &callerTemporary{
opt: httpcli.DefaultHeader(headerMesosStreamID, mesosStreamID),
callerInternal: state.client,
Expand All @@ -121,7 +142,7 @@ func connectedFn(state *state) stateFn {
// (b) execute call, save the result in resp, err
state.resp, state.err = state.caller.Call(state.call)

// (c) return connectedFn; TODO(jdef) detect specific Mesos error codes as triggers -> disconnectedFn?
// stay connected, don't attempt to interpret errors here
return connectedFn
}

Expand Down
25 changes: 22 additions & 3 deletions api/v1/lib/scheduler/calls/caller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,22 @@ func (d Decorator) If(b bool) Decorator {
return result
}

// When applies the given decorator when the supplied bool func returns true. The supplied
// condtional func `f` is evaluated upon every Caller invocation.
func When(f func() bool, maybeDecorate Decorator) Decorator {
if f == nil {
return noopDecorator
}
return func(h Caller) Caller {
return CallerFunc(func(c *scheduler.Call) (mesos.Response, error) {
if f() {
return maybeDecorate(h).Call(c)
}
return h.Call(c)
})
}
}

// Apply applies the Decorators in the order they're listed such that the last Decorator invoked
// generates the final (wrapping) Caller that is ultimately returned.
func (ds Decorators) Combine() (result Decorator) {
Expand All @@ -75,11 +91,14 @@ func (ds Decorators) Combine() (result Decorator) {
return
}

// FrameworkCaller generates and returns a Decorator that applies the given frameworkID to all calls.
func FrameworkCaller(frameworkID string) Decorator {
// FrameworkCaller generates and returns a Decorator that applies the given frameworkID to all calls (except SUBSCRIBE).
func FrameworkCaller(frameworkID func() string) Decorator {
return func(h Caller) Caller {
return CallerFunc(func(c *scheduler.Call) (mesos.Response, error) {
c.FrameworkID = &mesos.FrameworkID{Value: frameworkID}
// never overwrite framework ID for subscribe calls; the scheduler must do that part
if c.Type == nil || *c.Type != scheduler.Call_SUBSCRIBE {
c.FrameworkID = &mesos.FrameworkID{Value: frameworkID()}
}
return h.Call(c)
})
}
Expand Down

0 comments on commit eaeca69

Please sign in to comment.