diff --git a/api/v1/cmd/example-executor/main.go b/api/v1/cmd/example-executor/main.go index dba69b47..7690d9e4 100644 --- a/api/v1/cmd/example-executor/main.go +++ b/api/v1/cmd/example-executor/main.go @@ -36,6 +36,13 @@ func main() { os.Exit(0) } +func maybeReconnect(cfg config.Config) <-chan struct{} { + if cfg.Checkpoint { + return backoff.Notifier(1*time.Second, cfg.SubscriptionBackoffMax*4/3, nil) + } + return nil +} + func run(cfg config.Config) { var ( apiURL = url.URL{ @@ -58,7 +65,7 @@ func run(cfg config.Config) { failedTasks: make(map[mesos.TaskID]mesos.TaskStatus), } subscribe = calls.Subscribe(nil, nil).With(state.callOptions...) - shouldReconnect = backoff.Notifier(1*time.Second, cfg.SubscriptionBackoffMax*4/3, nil) + shouldReconnect = maybeReconnect(cfg) disconnected = time.Now() handler = buildEventHandler(state) ) diff --git a/api/v1/cmd/example-scheduler/app/app.go b/api/v1/cmd/example-scheduler/app/app.go index a701f200..f3e74986 100644 --- a/api/v1/cmd/example-scheduler/app/app.go +++ b/api/v1/cmd/example-scheduler/app/app.go @@ -42,21 +42,28 @@ func Run(cfg Config) error { } func buildControllerConfig(state *internalState, shutdown <-chan struct{}) controller.Config { - controlContext := &controller.ContextAdapter{ - DoneFunc: func() bool { return state.done }, - FrameworkIDFunc: func() string { return state.frameworkID }, - ErrorFunc: func(err error) { - if err != nil && err != io.EOF { - log.Println(err) - } else { - log.Println("disconnected") - } - }, - } + var ( + frameworkIDStore = NewInMemoryIDStore() + controlContext = &controller.ContextAdapter{ + DoneFunc: func() bool { return state.done }, + // not called concurrently w/ subscription events, don't worry about using the atomic + FrameworkIDFunc: func() string { return frameworkIDStore.Get() }, + ErrorFunc: func(err error) { + if err != nil && err != io.EOF { + log.Println(err) + } else { + log.Println("disconnected") + } + }, + } + ) state.cli = calls.Decorators{ callMetrics(state.metricsAPI, time.Now, state.config.summaryMetrics), logCalls(map[scheduler.Call_Type]string{scheduler.Call_SUBSCRIBE: "connecting..."}), + // automatically set the frameworkID for all outgoing calls + calls.When(func() bool { return frameworkIDStore.Get() != "" }, + calls.FrameworkCaller(func() string { return frameworkIDStore.Get() })), }.Apply(state.cli) return controller.Config{ @@ -68,12 +75,12 @@ func buildControllerConfig(state *internalState, shutdown <-chan struct{}) contr Handler: events.Decorators{ eventMetrics(state.metricsAPI, time.Now, state.config.summaryMetrics), events.Decorator(logAllEvents).If(state.config.verbose), - }.Apply(buildEventHandler(state)), + }.Apply(buildEventHandler(state, frameworkIDStore)), } } // buildEventHandler generates and returns a handler to process events received from the subscription. -func buildEventHandler(state *internalState) events.Handler { +func buildEventHandler(state *internalState, frameworkIDStore IDStore) events.Handler { // TODO(jdef) would be nice to merge this ack handler with the status update handler below; need to // figure out appropriate error propagation among chained handlers. ack := events.AcknowledgeUpdates(func() calls.Caller { return state.cli }) @@ -104,19 +111,24 @@ func buildEventHandler(state *internalState) events.Handler { statusUpdate(state, e.GetUpdate().GetStatus()) return nil }, - scheduler.Event_SUBSCRIBED: func(e *scheduler.Event) (err error) { + scheduler.Event_SUBSCRIBED: func(e *scheduler.Event) error { log.Println("received a SUBSCRIBED event") - if state.frameworkID == "" { - state.frameworkID = e.GetSubscribed().GetFrameworkID().GetValue() + frameworkID := e.GetSubscribed().GetFrameworkID().GetValue() + if state.frameworkID == "" || state.frameworkID != frameworkID { + if state.frameworkID != "" && state.frameworkID != frameworkID && state.config.checkpoint { + state.done = true // TODO(jdef) not goroutine safe + return errors.New("frameworkID changed unexpectedly; failover may be broken") + } + state.frameworkID = frameworkID if state.frameworkID == "" { // sanity check - err = errors.New("mesos gave us an empty frameworkID") - } else { - // automatically set the frameworkID for all outgoing calls - state.cli = calls.FrameworkCaller(state.frameworkID).Apply(state.cli) + state.done = true // TODO(jdef) not goroutine safe + return errors.New("mesos gave us an empty frameworkID") } + log.Println("FrameworkID", frameworkID) + frameworkIDStore.Set(frameworkID) } - return + return nil }, }), ) diff --git a/api/v1/cmd/example-scheduler/app/config.go b/api/v1/cmd/example-scheduler/app/config.go index 9a33e092..edbcb879 100644 --- a/api/v1/cmd/example-scheduler/app/config.go +++ b/api/v1/cmd/example-scheduler/app/config.go @@ -15,6 +15,7 @@ type Config struct { url string codec codec timeout time.Duration + failoverTimeout time.Duration checkpoint bool principal string hostname string @@ -47,6 +48,7 @@ func (cfg *Config) AddFlags(fs *flag.FlagSet) { fs.Var(&cfg.codec, "codec", "Codec to encode/decode scheduler API communications [protobuf, json]") fs.StringVar(&cfg.url, "url", cfg.url, "Mesos scheduler API URL") fs.DurationVar(&cfg.timeout, "timeout", cfg.timeout, "Mesos scheduler API connection timeout") + fs.DurationVar(&cfg.failoverTimeout, "failoverTimeout", cfg.failoverTimeout, "Framework failover timeout") fs.BoolVar(&cfg.checkpoint, "checkpoint", cfg.checkpoint, "Enable/disable framework checkpointing") fs.StringVar(&cfg.principal, "principal", cfg.principal, "Framework principal with which to authenticate") fs.StringVar(&cfg.hostname, "hostname", cfg.hostname, "Framework hostname that is advertised to the master") @@ -84,6 +86,7 @@ func NewConfig() Config { url: env("MESOS_MASTER_HTTP", "http://:5050/api/v1/scheduler"), codec: codec{Codec: &encoding.ProtobufCodec}, timeout: envDuration("MESOS_CONNECT_TIMEOUT", "20s"), + failoverTimeout: envDuration("FRAMEWORK_FAILOVER_TIMEOUT", "1000h"), checkpoint: true, server: server{address: env("LIBPROCESS_IP", "127.0.0.1")}, tasks: envInt("NUM_TASKS", "5"), diff --git a/api/v1/cmd/example-scheduler/app/state.go b/api/v1/cmd/example-scheduler/app/state.go index a81b3bcc..b66ebbbb 100644 --- a/api/v1/cmd/example-scheduler/app/state.go +++ b/api/v1/cmd/example-scheduler/app/state.go @@ -135,11 +135,15 @@ func buildHTTPSched(cfg Config, creds credentials) calls.Caller { } func buildFrameworkInfo(cfg Config) *mesos.FrameworkInfo { + failoverTimeout := cfg.failoverTimeout.Seconds() frameworkInfo := &mesos.FrameworkInfo{ User: cfg.user, Name: cfg.name, Checkpoint: &cfg.checkpoint, } + if cfg.failoverTimeout > 0 { + frameworkInfo.FailoverTimeout = &failoverTimeout + } if cfg.role != "" { frameworkInfo.Role = &cfg.role } diff --git a/api/v1/cmd/example-scheduler/app/store.go b/api/v1/cmd/example-scheduler/app/store.go new file mode 100644 index 00000000..bc7fa7a2 --- /dev/null +++ b/api/v1/cmd/example-scheduler/app/store.go @@ -0,0 +1,43 @@ +package app + +import "sync/atomic" + +// IDStore is a thread-safe abstraction to load and store a stringified ID. +type IDStore interface { + Get() string + Set(string) +} + +type IDStoreAdapter struct { + GetFunc func() string + SetFunc func(string) +} + +func (a IDStoreAdapter) Get() string { + if a.GetFunc != nil { + return a.GetFunc() + } + return "" +} + +func (a IDStoreAdapter) Set(s string) { + if a.SetFunc != nil { + a.SetFunc(s) + } +} + +func NewInMemoryIDStore() IDStore { + var frameworkID atomic.Value + return &IDStoreAdapter{ + GetFunc: func() string { + x := frameworkID.Load() + if x == nil { + return "" + } + return x.(string) + }, + SetFunc: func(s string) { + frameworkID.Store(s) + }, + } +} diff --git a/api/v1/lib/client.go b/api/v1/lib/client.go index b107b706..8f9da462 100644 --- a/api/v1/lib/client.go +++ b/api/v1/lib/client.go @@ -25,3 +25,27 @@ type Response interface { io.Closer Decoder() encoding.Decoder } + +// ResponseWrapper delegates to optional handler funcs for invocations of Response methods. +type ResponseWrapper struct { + Response Response + CloseFunc func() error + DecoderFunc func() encoding.Decoder +} + +func (wrapper *ResponseWrapper) Close() error { + if wrapper.CloseFunc != nil { + return wrapper.CloseFunc() + } + if wrapper.Response != nil { + return wrapper.Response.Close() + } + return nil +} + +func (wrapper *ResponseWrapper) Decoder() encoding.Decoder { + if wrapper.DecoderFunc != nil { + return wrapper.DecoderFunc() + } + return wrapper.Response.Decoder() +} diff --git a/api/v1/lib/extras/scheduler/controller/controller.go b/api/v1/lib/extras/scheduler/controller/controller.go index fc7ca43c..76d15ab3 100644 --- a/api/v1/lib/extras/scheduler/controller/controller.go +++ b/api/v1/lib/extras/scheduler/controller/controller.go @@ -1,8 +1,6 @@ package controller import ( - "fmt" - "github.com/mesos/mesos-go/api/v1/lib" "github.com/mesos/mesos-go/api/v1/lib/encoding" "github.com/mesos/mesos-go/api/v1/lib/scheduler" @@ -78,7 +76,9 @@ func (_ *controllerImpl) Run(config Config) (lastErr error) { for !config.Context.Done() { frameworkID := config.Context.FrameworkID() if config.Framework.GetFailoverTimeout() > 0 && frameworkID != "" { - subscribe.Subscribe.FrameworkInfo.ID = &mesos.FrameworkID{Value: frameworkID} + frameworkProto := &mesos.FrameworkID{Value: frameworkID} + subscribe.Subscribe.FrameworkInfo.ID = frameworkProto + subscribe.FrameworkID = frameworkProto } <-config.RegistrationTokens resp, err := config.Caller.Call(subscribe) @@ -131,13 +131,19 @@ func (ca *ContextAdapter) Error(err error) { } } +type SchedulerError string + +func (e SchedulerError) Error() string { + return string(e) +} + // DefaultHandler provides the minimum implementation required for correct controller behavior. func DefaultHandler(e *scheduler.Event) (err error) { if e.GetType() == scheduler.Event_ERROR { // it's recommended that we abort and re-try subscribing; returning an // error here will cause the event loop to terminate and the connection // will be reset. - err = fmt.Errorf("ERROR: %q", e.GetError().GetMessage()) + err = SchedulerError(e.GetError().GetMessage()) } return } diff --git a/api/v1/lib/httpcli/http.go b/api/v1/lib/httpcli/http.go index 04a2dd25..51125726 100644 --- a/api/v1/lib/httpcli/http.go +++ b/api/v1/lib/httpcli/http.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "log" "net" "net/http" @@ -16,26 +17,28 @@ import ( "github.com/mesos/mesos-go/api/v1/lib/recordio" ) +// Deprecation notice: these error variables are no longer returned by this module. Use APIError instead. +// TODO(jdef) remove these error vars after v0.0.3 var ( - // ErrNotLeader is returned by Do calls that are sent to a non leading Mesos master. + // ErrNotLeader is returned by Do calls that are sent to a non leading Mesos master. Deprecated. ErrNotLeader = errors.New("mesos: call sent to a non-leading master") - // ErrAuth is returned by Do calls that are not successfully authenticated. + // ErrAuth is returned by Do calls that are not successfully authenticated. Deprecated. ErrAuth = errors.New("mesos: call not authenticated") - // ErrUnsubscribed is returned by Do calls that are sent before a subscription is established. + // ErrUnsubscribed is returned by Do calls that are sent before a subscription is established. Deprecated. ErrUnsubscribed = errors.New("mesos: no subscription established") - // ErrVersion is returned by Do calls that are sent to an incompatible API version. + // ErrVersion is returned by Do calls that are sent to an incompatible API version. Deprecated. ErrVersion = errors.New("mesos: incompatible API version") - // ErrMalformed is returned by Do calls that are malformed. + // ErrMalformed is returned by Do calls that are malformed. Deprecated. ErrMalformed = errors.New("mesos: malformed request") - // ErrMediaType is returned by Do calls that are sent with an unsupported media type. + // ErrMediaType is returned by Do calls that are sent with an unsupported media type. Deprecated. ErrMediaType = errors.New("mesos: unsupported media type") // ErrRateLimit is returned by Do calls that are rate limited. This is a temporary condition - // that should clear. + // that should clear. Deprecated. ErrRateLimit = errors.New("mesos: rate limited") // ErrUnavailable is returned by Do calls that are sent to a master or agent that's in recovery, or - // does not yet realize that it's the leader. This is a temporary condition that should clear. + // does not yet realize that it's the leader. This is a temporary condition that should clear. Deprecated. ErrUnavailable = errors.New("mesos: mesos server unavailable") - // ErrNotFound could happen if the master or agent libprocess has not yet set up http routes + // ErrNotFound could happen if the master or agent libprocess has not yet set up http routes. Deprecated. ErrNotFound = errors.New("mesos: mesos http endpoint not found") // codeErrors maps HTTP response codes to their respective errors. @@ -54,21 +57,45 @@ var ( } defaultErrorMapper = ErrorMapperFunc(func(code int) error { + // for now, just scrape the string of the deprecated error var and use that + // as the APIError.Message. Eventually we'll get rid of the Err* variables. + // TODO(jdef) simplify this after v0.0.3 err, ok := codeErrors[code] if !ok { - err = ProtocolError(code) + err = &APIError{Code: code} + } + return &APIError{ + Code: code, + Message: err.Error(), } - return err }) ) // ProtocolError is a generic error type returned for expected status codes // received from Mesos. +// Deprecated: no longer used in favor of APIError. +// TODO(jdef) remove this after v0.0.3 type ProtocolError int // Error implements error interface func (pe ProtocolError) Error() string { return fmt.Sprintf("Unexpected Mesos HTTP error: %d", int(pe)) } +// APIError captures HTTP error codes and messages generated by Mesos. +type APIError struct { + Code int // Code is the HTTP response status code generated by Mesos + Message string // Message briefly summarizes the nature of the error + Details string // Details captures the HTTP response entity, if any, supplied by Mesos +} + +func (err *APIError) Error() string { + return err.Message +} + +func IsErrNotLeader(err error) bool { + apiErr, ok := err.(*APIError) + return ok && apiErr.Code == http.StatusTemporaryRedirect +} + const ( debug = false // TODO(jdef) kill me at some point @@ -104,7 +131,8 @@ type Response struct { // implements mesos.Response func (r *Response) Decoder() encoding.Decoder { return r.decoder } -// ErrorMapperFunc generates an error for the given statusCode +// ErrorMapperFunc generates an error for the given statusCode. +// WARNING: this API will change in an upcoming release. type ErrorMapperFunc func(statusCode int) error // ResponseHandler is invoked to process an HTTP response @@ -128,7 +156,7 @@ type Client struct { func New(opts ...Opt) *Client { c := &Client{ codec: &encoding.ProtobufCodec, - do: With(), + do: With(DefaultConfigOpt...), header: http.Header{}, errorMapper: defaultErrorMapper, } @@ -233,6 +261,11 @@ func (c *Client) HandleResponse(res *http.Response, err error) (mesos.Response, // noop; no decoder for these types of calls default: err = c.errorMapper(res.StatusCode) + if apiErr, ok := err.(*APIError); ok && res.Body != nil { + // Annotate the APIError with Details from the response + buf, _ := ioutil.ReadAll(res.Body) + apiErr.Details = string(buf) + } } return &Response{ decoder: events, @@ -352,6 +385,9 @@ type Config struct { type ConfigOpt func(*Config) +// DefaultConfigOpt represents the default client config options. +var DefaultConfigOpt []ConfigOpt + // With returns a DoFunc that executes HTTP round-trips. // The default implementation provides reasonable defaults for timeouts: // keep-alive, connection, request/response read/write, and TLS handshake. @@ -384,7 +420,8 @@ func With(opt ...ConfigOpt) DoFunc { return config.client.Do } -// Timeout returns an ConfigOpt that sets a Config's timeout and keep-alive timeout. +// Timeout returns an ConfigOpt that sets a Config's response header timeout, tls handshake timeout, +// and dialer timeout. func Timeout(d time.Duration) ConfigOpt { return func(c *Config) { c.transport.ResponseHeaderTimeout = d diff --git a/api/v1/lib/httpcli/httpsched/httpsched.go b/api/v1/lib/httpcli/httpsched/httpsched.go index 1965bcc3..6affef9b 100644 --- a/api/v1/lib/httpcli/httpsched/httpsched.go +++ b/api/v1/lib/httpcli/httpsched/httpsched.go @@ -38,7 +38,7 @@ type ( redirect RedirectSettings } - // Caller is the public interface this framework scheduler's should consume + // Caller is the public interface a framework scheduler's should consume Caller interface { calls.Caller // httpDo is intentionally package-private; clients of this package may extend a Caller @@ -164,7 +164,7 @@ func (mre *mesosRedirectionError) Error() string { func (cli *client) redirectHandler() httpcli.Opt { return httpcli.HandleResponse(func(hres *http.Response, err error) (mesos.Response, error) { resp, err := cli.HandleResponse(hres, err) // default response handler - if err == nil || (err != nil && err != httpcli.ErrNotLeader) { + if err == nil || (err != nil && !httpcli.IsErrNotLeader(err)) { return resp, err } res, ok := resp.(*httpcli.Response) diff --git a/api/v1/lib/httpcli/httpsched/state.go b/api/v1/lib/httpcli/httpsched/state.go index 9c330762..bfb882e2 100644 --- a/api/v1/lib/httpcli/httpsched/state.go +++ b/api/v1/lib/httpcli/httpsched/state.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/mesos/mesos-go/api/v1/lib" + "github.com/mesos/mesos-go/api/v1/lib/encoding" "github.com/mesos/mesos-go/api/v1/lib/httpcli" "github.com/mesos/mesos-go/api/v1/lib/scheduler" "github.com/mesos/mesos-go/api/v1/lib/scheduler/calls" @@ -24,6 +25,7 @@ var ( ) type ( + // state implements calls.Caller and tracks connectivity with Mesos state struct { client *client // client is a handle to the original underlying HTTP client @@ -42,11 +44,9 @@ type ( func maybeLogged(f httpcli.DoFunc) httpcli.DoFunc { if debug { return func(req *http.Request) (*http.Response, error) { - if debug { - log.Println("wrapping request") - } + log.Println("wrapping request", req.URL, req.Header) resp, err := f(req) - if debug && err == nil { + if err == nil { log.Printf("status %d", resp.StatusCode) for k := range resp.Header { log.Println("header " + k + ": " + resp.Header.Get(k)) @@ -94,14 +94,38 @@ func disconnectedFn(state *state) stateFn { ) // (c) execute the call, save the result in resp, err - state.resp, state.err = subscribeCaller.Call(state.call) + stateResp, stateErr := subscribeCaller.Call(state.call) + state.err = stateErr - // (d) if err != nil return unsubscribedFn - if state.err != nil { + // (d) if err != nil return disconnectedFn since we're unsubscribed + if stateErr != nil { + if stateResp != nil { + stateResp.Close() + } + state.resp = nil return disconnectedFn } - // (e) else prepare callerTemporary w/ special header, return subscribingFn + // wrap the response: any errors processing the subscription stream should result in a + // transition to a disconnected state ASAP. + state.resp = &mesos.ResponseWrapper{ + Response: stateResp, + DecoderFunc: func() encoding.Decoder { + decoder := stateResp.Decoder() + return func(u encoding.Unmarshaler) (err error) { + err = decoder(u) + if err != nil { + state.m.Lock() + defer state.m.Unlock() + state.fn = disconnectedFn + _ = stateResp.Close() // swallow any error here + } + return + } + }, + } + + // (e) else prepare callerTemporary w/ special header, return connectedFn since we're now subscribed state.caller = &callerTemporary{ opt: httpcli.DefaultHeader(headerMesosStreamID, mesosStreamID), callerInternal: state.client, @@ -120,7 +144,7 @@ func connectedFn(state *state) stateFn { // (b) execute call, save the result in resp, err state.resp, state.err = state.caller.Call(state.call) - // (c) return connectedFn; TODO(jdef) detect specific Mesos error codes as triggers -> disconnectedFn? + // stay connected, don't attempt to interpret errors here return connectedFn } @@ -129,5 +153,10 @@ func (state *state) Call(call *scheduler.Call) (resp mesos.Response, err error) defer state.m.Unlock() state.call = call state.fn = state.fn(state) + + if debug && state.err != nil { + log.Print(*call, state.err) + } + return state.resp, state.err } diff --git a/api/v1/lib/scheduler/calls/caller.go b/api/v1/lib/scheduler/calls/caller.go index a9aa1dc9..75502b94 100644 --- a/api/v1/lib/scheduler/calls/caller.go +++ b/api/v1/lib/scheduler/calls/caller.go @@ -53,6 +53,22 @@ func (d Decorator) If(b bool) Decorator { return result } +// When applies the given decorator when the supplied bool func returns true. The supplied +// condtional func `f` is evaluated upon every Caller invocation. +func When(f func() bool, maybeDecorate Decorator) Decorator { + if f == nil { + return noopDecorator + } + return func(h Caller) Caller { + return CallerFunc(func(c *scheduler.Call) (mesos.Response, error) { + if f() { + return maybeDecorate(h).Call(c) + } + return h.Call(c) + }) + } +} + // Apply applies the Decorators in the order they're listed such that the last Decorator invoked // generates the final (wrapping) Caller that is ultimately returned. func (ds Decorators) Combine() (result Decorator) { @@ -75,11 +91,14 @@ func (ds Decorators) Combine() (result Decorator) { return } -// FrameworkCaller generates and returns a Decorator that applies the given frameworkID to all calls. -func FrameworkCaller(frameworkID string) Decorator { +// FrameworkCaller generates and returns a Decorator that applies the given frameworkID to all calls (except SUBSCRIBE). +func FrameworkCaller(frameworkID func() string) Decorator { return func(h Caller) Caller { return CallerFunc(func(c *scheduler.Call) (mesos.Response, error) { - c.FrameworkID = &mesos.FrameworkID{Value: frameworkID} + // never overwrite framework ID for subscribe calls; the scheduler must do that part + if c.Type == nil || *c.Type != scheduler.Call_SUBSCRIBE { + c.FrameworkID = &mesos.FrameworkID{Value: frameworkID()} + } return h.Call(c) }) }