Skip to content

Commit

Permalink
Merge f1f3f49 into fe43f9c
Browse files Browse the repository at this point in the history
  • Loading branch information
jdef committed May 25, 2017
2 parents fe43f9c + f1f3f49 commit e2d83c9
Show file tree
Hide file tree
Showing 27 changed files with 2,045 additions and 1,059 deletions.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ $(COVERAGE_TARGETS):

.PHONY: vet
vet:
go $@ $(PACKAGES)
go $@ $(PACKAGES) $(BINARIES)

.PHONY: codecs
codecs: protobufs ffjson
Expand Down Expand Up @@ -77,6 +77,10 @@ sync:
(cd ${API_VENDOR}; govendor sync)
(cd ${CMD_VENDOR}; govendor sync)

.PHONY: generate
generate:
go generate ./api/v1/lib/extras/scheduler/eventrules

GOPKG := github.com/mesos/mesos-go
GOPKG_DIRNAME := $(shell dirname $(GOPKG))
UID ?= $(shell id -u $$USER)
Expand Down
206 changes: 94 additions & 112 deletions api/v1/cmd/example-scheduler/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package app

import (
"errors"
"fmt"
"io"
"log"
"strconv"
Expand All @@ -12,6 +11,8 @@ import (
"github.com/mesos/mesos-go/api/v1/lib/backoff"
xmetrics "github.com/mesos/mesos-go/api/v1/lib/extras/metrics"
"github.com/mesos/mesos-go/api/v1/lib/extras/scheduler/controller"
"github.com/mesos/mesos-go/api/v1/lib/extras/scheduler/eventrules"
"github.com/mesos/mesos-go/api/v1/lib/extras/store"
"github.com/mesos/mesos-go/api/v1/lib/scheduler"
"github.com/mesos/mesos-go/api/v1/lib/scheduler/calls"
"github.com/mesos/mesos-go/api/v1/lib/scheduler/events"
Expand Down Expand Up @@ -41,108 +42,84 @@ func Run(cfg Config) error {
// TODO(jdef) how to track/handle timeout errors that occur for SUBSCRIBE calls? we should
// probably tolerate X number of subsequent subscribe failures before bailing. we'll need
// to track the lastCallAttempted along with subsequentSubscribeTimeouts.
err = controller.New().Run(buildControllerConfig(state, shutdown))
if state.err != nil {
err = state.err
}
return err
}

func buildControllerConfig(state *internalState, shutdown <-chan struct{}) controller.Config {
var (
frameworkIDStore = NewInMemoryIDStore()
controlContext = &controller.ContextAdapter{
DoneFunc: state.isDone,
FrameworkIDFunc: func() string { return frameworkIDStore.Get() },
ErrorFunc: func(err error) {
if err != nil {
if err != io.EOF {
log.Println(err)
}
if _, ok := err.(StateError); ok {
state.markDone()
}
return
}
log.Println("disconnected")
},
}
)
frameworkIDStore := store.DecorateSingleton(
store.NewInMemorySingleton(),
store.DoSet().AndThen(func(_ store.Setter, v string, _ error) error {
log.Println("FrameworkID", v)
return nil
}))

state.cli = calls.Decorators{
callMetrics(state.metricsAPI, time.Now, state.config.summaryMetrics),
logCalls(map[scheduler.Call_Type]string{scheduler.Call_SUBSCRIBE: "connecting..."}),
// automatically set the frameworkID for all outgoing calls
calls.SubscribedCaller(frameworkIDStore.Get),
calls.SubscribedCaller(store.GetIgnoreErrors(frameworkIDStore)), // automatically set the frameworkID for all outgoing calls
}.Apply(state.cli)

return controller.Config{
Context: controlContext,
Framework: buildFrameworkInfo(state.config),
Caller: state.cli,
RegistrationTokens: backoff.Notifier(RegistrationMinBackoff, RegistrationMaxBackoff, shutdown),

Handler: events.Decorators{
err = controller.Run(
buildFrameworkInfo(state.config),
state.cli,
controller.WithDone(state.done.Closed),
controller.WithEventHandler(
buildEventHandler(state, frameworkIDStore),
eventMetrics(state.metricsAPI, time.Now, state.config.summaryMetrics),
events.Decorator(logAllEvents).If(state.config.verbose),
}.Apply(buildEventHandler(state, frameworkIDStore)),
),
controller.WithFrameworkID(store.GetIgnoreErrors(frameworkIDStore)),
controller.WithRegistrationTokens(
backoff.Notifier(RegistrationMinBackoff, RegistrationMaxBackoff, shutdown),
),
controller.WithSubscriptionTerminated(func(err error) {
if err != nil {
if err != io.EOF {
log.Println(err)
}
if _, ok := err.(StateError); ok {
state.done.Close()
}
return
}
log.Println("disconnected")
}),
)
if state.err != nil {
err = state.err
}
return err
}

// buildEventHandler generates and returns a handler to process events received from the subscription.
func buildEventHandler(state *internalState, frameworkIDStore IDStore) events.Handler {
// TODO(jdef) would be nice to merge this ack handler with the status update handler below; need to
// figure out appropriate error propagation among chained handlers.
ack := events.AcknowledgeUpdates(func() calls.Caller { return state.cli })
return events.NewMux(
events.DefaultHandler(events.HandlerFunc(controller.DefaultHandler)),
events.MapFuncs(map[scheduler.Event_Type]events.HandlerFunc{
scheduler.Event_FAILURE: func(e *scheduler.Event) error {
log.Println("received a FAILURE event")
f := e.GetFailure()
failure(f.ExecutorID, f.AgentID, f.Status)
return nil
},
scheduler.Event_OFFERS: func(e *scheduler.Event) error {
func buildEventHandler(state *internalState, frameworkIDStore store.Singleton) events.Handler {
logger := controller.LogEvents()
return controller.LiftErrors().Handle(events.HandlerSet{
scheduler.Event_FAILURE: logger.HandleF(func(e *scheduler.Event) error {
f := e.GetFailure()
failure(f.ExecutorID, f.AgentID, f.Status)
return nil
}),
scheduler.Event_OFFERS: trackOffersReceived(state).AndThen().HandleF(
func(e *scheduler.Event) error {
if state.config.verbose {
log.Println("received an OFFERS event")
}
offers := e.GetOffers().GetOffers()
state.metricsAPI.offersReceived.Int(len(offers))
resourceOffers(state, offers)
return nil
},
scheduler.Event_UPDATE: func(e *scheduler.Event) error {
if err := ack.HandleEvent(e); err != nil {
log.Printf("failed to ack status update for task: %+v", err)
// TODO(jdef) we don't return the error because that would cause the subscription
// to terminate; is that the right thing to do?
}
statusUpdate(state, e.GetUpdate().GetStatus())
return nil
},
scheduler.Event_SUBSCRIBED: func(e *scheduler.Event) error {
log.Println("received a SUBSCRIBED event")
frameworkID := e.GetSubscribed().GetFrameworkID().GetValue()
// order of `if` statements are important: tread carefully w/ respect to future changes
if frameworkID == "" {
// sanity check, should **never** happen
return StateError("mesos gave us an empty frameworkID")
}
if state.frameworkID != "" && state.frameworkID != frameworkID && state.config.checkpoint {
return StateError(fmt.Sprintf(
"frameworkID changed unexpectedly; failover exceeded timeout? (%s).",
state.config.failoverTimeout))
}
if state.frameworkID != frameworkID {
state.frameworkID = frameworkID
frameworkIDStore.Set(frameworkID)
log.Println("FrameworkID", frameworkID)
}
return nil
},
}),
)
return resourceOffers(state, e.GetOffers().GetOffers())
}),
scheduler.Event_UPDATE: controller.AckStatusUpdates(state.cli).AndThen().HandleF(statusUpdate(state)),
scheduler.Event_SUBSCRIBED: eventrules.Rules{
logger,
controller.TrackSubscription(frameworkIDStore, state.config.failoverTimeout),
},
})
}

func trackOffersReceived(state *internalState) eventrules.Rule {
return func(e *scheduler.Event, err error, chain eventrules.Chain) (*scheduler.Event, error) {
if err == nil {
state.metricsAPI.offersReceived.Int(len(e.GetOffers().GetOffers()))
}
return chain(e, nil)

}
}

func failure(eid *mesos.ExecutorID, aid *mesos.AgentID, stat *int32) {
Expand All @@ -162,7 +139,7 @@ func failure(eid *mesos.ExecutorID, aid *mesos.AgentID, stat *int32) {
}
}

func resourceOffers(state *internalState, offers []mesos.Offer) {
func resourceOffers(state *internalState, offers []mesos.Offer) error {
callOption := calls.RefuseSecondsWithJitter(state.random, state.config.maxRefuseSeconds)
tasksLaunchedThisCycle := 0
offersDeclined := 0
Expand Down Expand Up @@ -238,36 +215,41 @@ func resourceOffers(state *internalState, offers []mesos.Offer) {
if state.config.summaryMetrics {
state.metricsAPI.launchesPerOfferCycle(float64(tasksLaunchedThisCycle))
}
return nil
}

func statusUpdate(state *internalState, s mesos.TaskStatus) {
if state.config.verbose {
msg := "Task " + s.TaskID.Value + " is in state " + s.GetState().String()
if m := s.GetMessage(); m != "" {
msg += " with message '" + m + "'"
func statusUpdate(state *internalState) events.HandlerFunc {
return func(e *scheduler.Event) error {
s := e.GetUpdate().GetStatus()
if state.config.verbose {
msg := "Task " + s.TaskID.Value + " is in state " + s.GetState().String()
if m := s.GetMessage(); m != "" {
msg += " with message '" + m + "'"
}
log.Println(msg)
}
log.Println(msg)
}

switch st := s.GetState(); st {
case mesos.TASK_FINISHED:
state.tasksFinished++
state.metricsAPI.tasksFinished()
switch st := s.GetState(); st {
case mesos.TASK_FINISHED:
state.tasksFinished++
state.metricsAPI.tasksFinished()

if state.tasksFinished == state.totalTasks {
log.Println("mission accomplished, terminating")
state.markDone()
} else {
tryReviveOffers(state)
}
if state.tasksFinished == state.totalTasks {
log.Println("mission accomplished, terminating")
state.done.Close()
} else {
tryReviveOffers(state)
}

case mesos.TASK_LOST, mesos.TASK_KILLED, mesos.TASK_FAILED, mesos.TASK_ERROR:
state.err = errors.New("Exiting because task " + s.GetTaskID().Value +
" is in an unexpected state " + st.String() +
" with reason " + s.GetReason().String() +
" from source " + s.GetSource().String() +
" with message '" + s.GetMessage() + "'")
state.markDone()
case mesos.TASK_LOST, mesos.TASK_KILLED, mesos.TASK_FAILED, mesos.TASK_ERROR:
state.err = errors.New("Exiting because task " + s.GetTaskID().Value +
" is in an unexpected state " + st.String() +
" with reason " + s.GetReason().String() +
" from source " + s.GetSource().String() +
" with message '" + s.GetMessage() + "'")
state.done.Close()
}
return nil
}
}

Expand Down
4 changes: 2 additions & 2 deletions api/v1/cmd/example-scheduler/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (cfg *Config) AddFlags(fs *flag.FlagSet) {
fs.Var(&cfg.codec, "codec", "Codec to encode/decode scheduler API communications [protobuf, json]")
fs.StringVar(&cfg.url, "url", cfg.url, "Mesos scheduler API URL")
fs.DurationVar(&cfg.timeout, "timeout", cfg.timeout, "Mesos scheduler API connection timeout")
fs.DurationVar(&cfg.failoverTimeout, "failoverTimeout", cfg.failoverTimeout, "Framework failover timeout")
fs.BoolVar(&cfg.checkpoint, "checkpoint", cfg.checkpoint, "Enable/disable framework checkpointing")
fs.DurationVar(&cfg.failoverTimeout, "failoverTimeout", cfg.failoverTimeout, "Framework failover timeout (recover from scheduler failure)")
fs.BoolVar(&cfg.checkpoint, "checkpoint", cfg.checkpoint, "Enable/disable agent checkpointing for framework tasks (recover from agent failure)")
fs.StringVar(&cfg.principal, "principal", cfg.principal, "Framework principal with which to authenticate")
fs.StringVar(&cfg.hostname, "hostname", cfg.hostname, "Framework hostname that is advertised to the master")
fs.Var(&cfg.labels, "label", "Framework label, may be specified multiple times")
Expand Down
38 changes: 7 additions & 31 deletions api/v1/cmd/example-scheduler/app/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
"math/rand"
"net/http"
"os"
"sync"
"time"

proto "github.com/gogo/protobuf/proto"
"github.com/mesos/mesos-go/api/v1/lib"
"github.com/mesos/mesos-go/api/v1/lib/backoff"
"github.com/mesos/mesos-go/api/v1/lib/extras/latch"
"github.com/mesos/mesos-go/api/v1/lib/httpcli"
"github.com/mesos/mesos-go/api/v1/lib/httpcli/httpsched"
"github.com/mesos/mesos-go/api/v1/lib/scheduler/calls"
Expand Down Expand Up @@ -87,17 +87,17 @@ func prepareExecutorInfo(

func buildWantsTaskResources(config Config) (r mesos.Resources) {
r.Add(
*mesos.BuildResource().Name("cpus").Scalar(config.taskCPU).Resource,
*mesos.BuildResource().Name("mem").Scalar(config.taskMemory).Resource,
mesos.CPUs(config.taskCPU).Resource,
mesos.Memory(config.taskMemory).Resource,
)
log.Println("wants-task-resources = " + r.String())
return
}

func buildWantsExecutorResources(config Config) (r mesos.Resources) {
r.Add(
*mesos.BuildResource().Name("cpus").Scalar(config.execCPU).Resource,
*mesos.BuildResource().Name("mem").Scalar(config.execMemory).Resource,
mesos.CPUs(config.execCPU).Resource,
mesos.Memory(config.execMemory).Resource,
)
log.Println("wants-executor-resources = " + r.String())
return
Expand All @@ -117,13 +117,6 @@ func buildHTTPSched(cfg Config, creds credentials) calls.Caller {
httpcli.Do(httpcli.With(
authConfigOpt,
httpcli.Timeout(cfg.timeout),
httpcli.Transport(func(t *http.Transport) {
// all calls should be ack'd by the server within this interval.
// TODO(jdef) it probably doesn't make sense if this value is larger
// than cfg.timeout.
t.ResponseHeaderTimeout = 15 * time.Second
t.MaxIdleConnsPerHost = 2 // don't depend on go's default
}),
)),
)
if cfg.compression {
Expand Down Expand Up @@ -216,31 +209,15 @@ func newInternalState(cfg Config) (*internalState, error) {
metricsAPI: metricsAPI,
cli: buildHTTPSched(cfg, creds),
random: rand.New(rand.NewSource(time.Now().Unix())),
done: make(chan struct{}),
done: latch.New(),
}
return state, nil
}

func (state *internalState) markDone() {
state.doneOnce.Do(func() {
close(state.done)
})
}

func (state *internalState) isDone() bool {
select {
case <-state.done:
return true
default:
return false
}
}

type internalState struct {
tasksLaunched int
tasksFinished int
totalTasks int
frameworkID string
role string
executor *mesos.ExecutorInfo
cli calls.Caller
Expand All @@ -249,7 +226,6 @@ type internalState struct {
reviveTokens <-chan struct{}
metricsAPI *metricsAPI
err error
done chan struct{}
doneOnce sync.Once
done latch.Interface
random *rand.Rand
}
Loading

0 comments on commit e2d83c9

Please sign in to comment.