Skip to content

Commit

Permalink
overhaul existing examples and scheduler controller infra
Browse files Browse the repository at this point in the history
  • Loading branch information
James DeFelice committed May 25, 2017
1 parent 2e20eb5 commit f1f3f49
Show file tree
Hide file tree
Showing 6 changed files with 405 additions and 321 deletions.
201 changes: 91 additions & 110 deletions api/v1/cmd/example-scheduler/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package app

import (
"errors"
"fmt"
"io"
"log"
"strconv"
Expand All @@ -12,6 +11,7 @@ import (
"github.com/mesos/mesos-go/api/v1/lib/backoff"
xmetrics "github.com/mesos/mesos-go/api/v1/lib/extras/metrics"
"github.com/mesos/mesos-go/api/v1/lib/extras/scheduler/controller"
"github.com/mesos/mesos-go/api/v1/lib/extras/scheduler/eventrules"
"github.com/mesos/mesos-go/api/v1/lib/extras/store"
"github.com/mesos/mesos-go/api/v1/lib/scheduler"
"github.com/mesos/mesos-go/api/v1/lib/scheduler/calls"
Expand Down Expand Up @@ -42,108 +42,84 @@ func Run(cfg Config) error {
// TODO(jdef) how to track/handle timeout errors that occur for SUBSCRIBE calls? we should
// probably tolerate X number of subsequent subscribe failures before bailing. we'll need
// to track the lastCallAttempted along with subsequentSubscribeTimeouts.
err = controller.New().Run(buildControllerConfig(state, shutdown))
if state.err != nil {
err = state.err
}
return err
}

func buildControllerConfig(state *internalState, shutdown <-chan struct{}) controller.Config {
var (
frameworkIDStore = store.NewInMemorySingleton()
controlContext = &controller.ContextAdapter{
DoneFunc: state.done.Closed,
FrameworkIDFunc: frameworkIDStore.Get,
ErrorFunc: func(err error) {
if err != nil {
if err != io.EOF {
log.Println(err)
}
if _, ok := err.(StateError); ok {
state.done.Close()
}
return
}
log.Println("disconnected")
},
}
)
frameworkIDStore := store.DecorateSingleton(
store.NewInMemorySingleton(),
store.DoSet().AndThen(func(_ store.Setter, v string, _ error) error {
log.Println("FrameworkID", v)
return nil
}))

state.cli = calls.Decorators{
callMetrics(state.metricsAPI, time.Now, state.config.summaryMetrics),
logCalls(map[scheduler.Call_Type]string{scheduler.Call_SUBSCRIBE: "connecting..."}),
// automatically set the frameworkID for all outgoing calls
calls.SubscribedCaller(frameworkIDStore.Get),
calls.SubscribedCaller(store.GetIgnoreErrors(frameworkIDStore)), // automatically set the frameworkID for all outgoing calls
}.Apply(state.cli)

return controller.Config{
Context: controlContext,
Framework: buildFrameworkInfo(state.config),
Caller: state.cli,
RegistrationTokens: backoff.Notifier(RegistrationMinBackoff, RegistrationMaxBackoff, shutdown),

Handler: events.Decorators{
err = controller.Run(
buildFrameworkInfo(state.config),
state.cli,
controller.WithDone(state.done.Closed),
controller.WithEventHandler(
buildEventHandler(state, frameworkIDStore),
eventMetrics(state.metricsAPI, time.Now, state.config.summaryMetrics),
events.Decorator(logAllEvents).If(state.config.verbose),
}.Apply(buildEventHandler(state, frameworkIDStore)),
),
controller.WithFrameworkID(store.GetIgnoreErrors(frameworkIDStore)),
controller.WithRegistrationTokens(
backoff.Notifier(RegistrationMinBackoff, RegistrationMaxBackoff, shutdown),
),
controller.WithSubscriptionTerminated(func(err error) {
if err != nil {
if err != io.EOF {
log.Println(err)
}
if _, ok := err.(StateError); ok {
state.done.Close()
}
return
}
log.Println("disconnected")
}),
)
if state.err != nil {
err = state.err
}
return err
}

// buildEventHandler generates and returns a handler to process events received from the subscription.
func buildEventHandler(state *internalState, frameworkIDStore store.Singleton) events.Handler {
// TODO(jdef) would be nice to merge this ack handler with the status update handler below; need to
// figure out appropriate error propagation among chained handlers.
ack := events.AcknowledgeUpdates(func() calls.Caller { return state.cli })
return events.NewMux(
events.DefaultHandler(events.HandlerFunc(controller.DefaultHandler)),
events.MapFuncs(map[scheduler.Event_Type]events.HandlerFunc{
scheduler.Event_FAILURE: func(e *scheduler.Event) error {
log.Println("received a FAILURE event")
f := e.GetFailure()
failure(f.ExecutorID, f.AgentID, f.Status)
return nil
},
scheduler.Event_OFFERS: func(e *scheduler.Event) error {
logger := controller.LogEvents()
return controller.LiftErrors().Handle(events.HandlerSet{
scheduler.Event_FAILURE: logger.HandleF(func(e *scheduler.Event) error {
f := e.GetFailure()
failure(f.ExecutorID, f.AgentID, f.Status)
return nil
}),
scheduler.Event_OFFERS: trackOffersReceived(state).AndThen().HandleF(
func(e *scheduler.Event) error {
if state.config.verbose {
log.Println("received an OFFERS event")
}
offers := e.GetOffers().GetOffers()
state.metricsAPI.offersReceived.Int(len(offers))
resourceOffers(state, offers)
return nil
},
scheduler.Event_UPDATE: func(e *scheduler.Event) error {
if err := ack.HandleEvent(e); err != nil {
log.Printf("failed to ack status update for task: %+v", err)
// TODO(jdef) we don't return the error because that would cause the subscription
// to terminate; is that the right thing to do?
}
statusUpdate(state, e.GetUpdate().GetStatus())
return nil
},
scheduler.Event_SUBSCRIBED: func(e *scheduler.Event) error {
log.Println("received a SUBSCRIBED event")
frameworkID := e.GetSubscribed().GetFrameworkID().GetValue()
// order of `if` statements are important: tread carefully w/ respect to future changes
if frameworkID == "" {
// sanity check, should **never** happen
return StateError("mesos gave us an empty frameworkID")
}
if state.frameworkID != "" && state.frameworkID != frameworkID && state.config.checkpoint {
return StateError(fmt.Sprintf(
"frameworkID changed unexpectedly; failover exceeded timeout? (%s).",
state.config.failoverTimeout))
}
if state.frameworkID != frameworkID {
state.frameworkID = frameworkID
frameworkIDStore.Set(frameworkID)
log.Println("FrameworkID", frameworkID)
}
return nil
},
}),
)
return resourceOffers(state, e.GetOffers().GetOffers())
}),
scheduler.Event_UPDATE: controller.AckStatusUpdates(state.cli).AndThen().HandleF(statusUpdate(state)),
scheduler.Event_SUBSCRIBED: eventrules.Rules{
logger,
controller.TrackSubscription(frameworkIDStore, state.config.failoverTimeout),
},
})
}

func trackOffersReceived(state *internalState) eventrules.Rule {
return func(e *scheduler.Event, err error, chain eventrules.Chain) (*scheduler.Event, error) {
if err == nil {
state.metricsAPI.offersReceived.Int(len(e.GetOffers().GetOffers()))
}
return chain(e, nil)

}
}

func failure(eid *mesos.ExecutorID, aid *mesos.AgentID, stat *int32) {
Expand All @@ -163,7 +139,7 @@ func failure(eid *mesos.ExecutorID, aid *mesos.AgentID, stat *int32) {
}
}

func resourceOffers(state *internalState, offers []mesos.Offer) {
func resourceOffers(state *internalState, offers []mesos.Offer) error {
callOption := calls.RefuseSecondsWithJitter(state.random, state.config.maxRefuseSeconds)
tasksLaunchedThisCycle := 0
offersDeclined := 0
Expand Down Expand Up @@ -239,36 +215,41 @@ func resourceOffers(state *internalState, offers []mesos.Offer) {
if state.config.summaryMetrics {
state.metricsAPI.launchesPerOfferCycle(float64(tasksLaunchedThisCycle))
}
return nil
}

func statusUpdate(state *internalState, s mesos.TaskStatus) {
if state.config.verbose {
msg := "Task " + s.TaskID.Value + " is in state " + s.GetState().String()
if m := s.GetMessage(); m != "" {
msg += " with message '" + m + "'"
func statusUpdate(state *internalState) events.HandlerFunc {
return func(e *scheduler.Event) error {
s := e.GetUpdate().GetStatus()
if state.config.verbose {
msg := "Task " + s.TaskID.Value + " is in state " + s.GetState().String()
if m := s.GetMessage(); m != "" {
msg += " with message '" + m + "'"
}
log.Println(msg)
}
log.Println(msg)
}

switch st := s.GetState(); st {
case mesos.TASK_FINISHED:
state.tasksFinished++
state.metricsAPI.tasksFinished()
switch st := s.GetState(); st {
case mesos.TASK_FINISHED:
state.tasksFinished++
state.metricsAPI.tasksFinished()

if state.tasksFinished == state.totalTasks {
log.Println("mission accomplished, terminating")
if state.tasksFinished == state.totalTasks {
log.Println("mission accomplished, terminating")
state.done.Close()
} else {
tryReviveOffers(state)
}

case mesos.TASK_LOST, mesos.TASK_KILLED, mesos.TASK_FAILED, mesos.TASK_ERROR:
state.err = errors.New("Exiting because task " + s.GetTaskID().Value +
" is in an unexpected state " + st.String() +
" with reason " + s.GetReason().String() +
" from source " + s.GetSource().String() +
" with message '" + s.GetMessage() + "'")
state.done.Close()
} else {
tryReviveOffers(state)
}

case mesos.TASK_LOST, mesos.TASK_KILLED, mesos.TASK_FAILED, mesos.TASK_ERROR:
state.err = errors.New("Exiting because task " + s.GetTaskID().Value +
" is in an unexpected state " + st.String() +
" with reason " + s.GetReason().String() +
" from source " + s.GetSource().String() +
" with message '" + s.GetMessage() + "'")
state.done.Close()
return nil
}
}

Expand Down
4 changes: 2 additions & 2 deletions api/v1/cmd/example-scheduler/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (cfg *Config) AddFlags(fs *flag.FlagSet) {
fs.Var(&cfg.codec, "codec", "Codec to encode/decode scheduler API communications [protobuf, json]")
fs.StringVar(&cfg.url, "url", cfg.url, "Mesos scheduler API URL")
fs.DurationVar(&cfg.timeout, "timeout", cfg.timeout, "Mesos scheduler API connection timeout")
fs.DurationVar(&cfg.failoverTimeout, "failoverTimeout", cfg.failoverTimeout, "Framework failover timeout")
fs.BoolVar(&cfg.checkpoint, "checkpoint", cfg.checkpoint, "Enable/disable framework checkpointing")
fs.DurationVar(&cfg.failoverTimeout, "failoverTimeout", cfg.failoverTimeout, "Framework failover timeout (recover from scheduler failure)")
fs.BoolVar(&cfg.checkpoint, "checkpoint", cfg.checkpoint, "Enable/disable agent checkpointing for framework tasks (recover from agent failure)")
fs.StringVar(&cfg.principal, "principal", cfg.principal, "Framework principal with which to authenticate")
fs.StringVar(&cfg.hostname, "hostname", cfg.hostname, "Framework hostname that is advertised to the master")
fs.Var(&cfg.labels, "label", "Framework label, may be specified multiple times")
Expand Down
1 change: 0 additions & 1 deletion api/v1/cmd/example-scheduler/app/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ type internalState struct {
tasksLaunched int
tasksFinished int
totalTasks int
frameworkID string
role string
executor *mesos.ExecutorInfo
cli calls.Caller
Expand Down
Loading

0 comments on commit f1f3f49

Please sign in to comment.