Skip to content

Commit

Permalink
Merge 6242813 into fe43f9c
Browse files Browse the repository at this point in the history
  • Loading branch information
jdef committed May 5, 2017
2 parents fe43f9c + 6242813 commit 66ecb79
Show file tree
Hide file tree
Showing 17 changed files with 684 additions and 812 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ $(COVERAGE_TARGETS):

.PHONY: vet
vet:
go $@ $(PACKAGES)
go $@ $(PACKAGES) $(BINARIES)

.PHONY: codecs
codecs: protobufs ffjson
Expand Down
15 changes: 8 additions & 7 deletions api/v1/cmd/example-scheduler/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/mesos/mesos-go/api/v1/lib/backoff"
xmetrics "github.com/mesos/mesos-go/api/v1/lib/extras/metrics"
"github.com/mesos/mesos-go/api/v1/lib/extras/scheduler/controller"
"github.com/mesos/mesos-go/api/v1/lib/extras/store"
"github.com/mesos/mesos-go/api/v1/lib/scheduler"
"github.com/mesos/mesos-go/api/v1/lib/scheduler/calls"
"github.com/mesos/mesos-go/api/v1/lib/scheduler/events"
Expand Down Expand Up @@ -50,17 +51,17 @@ func Run(cfg Config) error {

func buildControllerConfig(state *internalState, shutdown <-chan struct{}) controller.Config {
var (
frameworkIDStore = NewInMemoryIDStore()
frameworkIDStore = store.NewInMemorySingleton()
controlContext = &controller.ContextAdapter{
DoneFunc: state.isDone,
FrameworkIDFunc: func() string { return frameworkIDStore.Get() },
DoneFunc: state.done.Closed,
FrameworkIDFunc: frameworkIDStore.Get,
ErrorFunc: func(err error) {
if err != nil {
if err != io.EOF {
log.Println(err)
}
if _, ok := err.(StateError); ok {
state.markDone()
state.done.Close()
}
return
}
Expand Down Expand Up @@ -90,7 +91,7 @@ func buildControllerConfig(state *internalState, shutdown <-chan struct{}) contr
}

// buildEventHandler generates and returns a handler to process events received from the subscription.
func buildEventHandler(state *internalState, frameworkIDStore IDStore) events.Handler {
func buildEventHandler(state *internalState, frameworkIDStore store.Singleton) events.Handler {
// TODO(jdef) would be nice to merge this ack handler with the status update handler below; need to
// figure out appropriate error propagation among chained handlers.
ack := events.AcknowledgeUpdates(func() calls.Caller { return state.cli })
Expand Down Expand Up @@ -256,7 +257,7 @@ func statusUpdate(state *internalState, s mesos.TaskStatus) {

if state.tasksFinished == state.totalTasks {
log.Println("mission accomplished, terminating")
state.markDone()
state.done.Close()
} else {
tryReviveOffers(state)
}
Expand All @@ -267,7 +268,7 @@ func statusUpdate(state *internalState, s mesos.TaskStatus) {
" with reason " + s.GetReason().String() +
" from source " + s.GetSource().String() +
" with message '" + s.GetMessage() + "'")
state.markDone()
state.done.Close()
}
}

Expand Down
37 changes: 7 additions & 30 deletions api/v1/cmd/example-scheduler/app/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
"math/rand"
"net/http"
"os"
"sync"
"time"

proto "github.com/gogo/protobuf/proto"
"github.com/mesos/mesos-go/api/v1/lib"
"github.com/mesos/mesos-go/api/v1/lib/backoff"
"github.com/mesos/mesos-go/api/v1/lib/extras/latch"
"github.com/mesos/mesos-go/api/v1/lib/httpcli"
"github.com/mesos/mesos-go/api/v1/lib/httpcli/httpsched"
"github.com/mesos/mesos-go/api/v1/lib/scheduler/calls"
Expand Down Expand Up @@ -87,17 +87,17 @@ func prepareExecutorInfo(

func buildWantsTaskResources(config Config) (r mesos.Resources) {
r.Add(
*mesos.BuildResource().Name("cpus").Scalar(config.taskCPU).Resource,
*mesos.BuildResource().Name("mem").Scalar(config.taskMemory).Resource,
*mesos.CPUs(config.taskCPU).Resource,
*mesos.Memory(config.taskMemory).Resource,
)
log.Println("wants-task-resources = " + r.String())
return
}

func buildWantsExecutorResources(config Config) (r mesos.Resources) {
r.Add(
*mesos.BuildResource().Name("cpus").Scalar(config.execCPU).Resource,
*mesos.BuildResource().Name("mem").Scalar(config.execMemory).Resource,
*mesos.CPUs(config.execCPU).Resource,
*mesos.Memory(config.execMemory).Resource,
)
log.Println("wants-executor-resources = " + r.String())
return
Expand All @@ -117,13 +117,6 @@ func buildHTTPSched(cfg Config, creds credentials) calls.Caller {
httpcli.Do(httpcli.With(
authConfigOpt,
httpcli.Timeout(cfg.timeout),
httpcli.Transport(func(t *http.Transport) {
// all calls should be ack'd by the server within this interval.
// TODO(jdef) it probably doesn't make sense if this value is larger
// than cfg.timeout.
t.ResponseHeaderTimeout = 15 * time.Second
t.MaxIdleConnsPerHost = 2 // don't depend on go's default
}),
)),
)
if cfg.compression {
Expand Down Expand Up @@ -216,26 +209,11 @@ func newInternalState(cfg Config) (*internalState, error) {
metricsAPI: metricsAPI,
cli: buildHTTPSched(cfg, creds),
random: rand.New(rand.NewSource(time.Now().Unix())),
done: make(chan struct{}),
done: latch.New(),
}
return state, nil
}

func (state *internalState) markDone() {
state.doneOnce.Do(func() {
close(state.done)
})
}

func (state *internalState) isDone() bool {
select {
case <-state.done:
return true
default:
return false
}
}

type internalState struct {
tasksLaunched int
tasksFinished int
Expand All @@ -249,7 +227,6 @@ type internalState struct {
reviveTokens <-chan struct{}
metricsAPI *metricsAPI
err error
done chan struct{}
doneOnce sync.Once
done latch.Interface
random *rand.Rand
}
43 changes: 0 additions & 43 deletions api/v1/cmd/example-scheduler/app/store.go

This file was deleted.

Loading

0 comments on commit 66ecb79

Please sign in to comment.