Skip to content

Commit

Permalink
Merge 8f3819c into 62cdc96
Browse files Browse the repository at this point in the history
  • Loading branch information
jdef committed Mar 12, 2018
2 parents 62cdc96 + 8f3819c commit 1187f40
Show file tree
Hide file tree
Showing 47 changed files with 55,022 additions and 18,484 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
2018-xx-xx: v0.0.7
TODO
1.5.x protobuf support
operations: new helpers for additional offer operations
scheduler/calls: new helpers for ack offer op update AND reconcile offer op
extras/scheduler: rule that acks offer op updates
master: helpers for teardown and mark-agent-gone
additional test cases for new reservation validation
operations: support reservation refinements

2018-03-12: v0.0.6
1.4.x protobuf support
Expand Down
33 changes: 22 additions & 11 deletions api/v1/cmd/example-scheduler/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func resourceOffers(state *internalState) events.HandlerFunc {
wantsExecutorResources = mesos.Resources(state.executor.Resources)
}

flattened := resources.Flatten(remaining)
flattened := remaining.ToUnreserved()

// avoid the expense of computing these if we can...
if state.config.summaryMetrics && state.config.resourceTypeMetrics {
Expand All @@ -174,6 +174,21 @@ func resourceOffers(state *internalState) events.HandlerFunc {

taskWantsResources := state.wantsTaskResources.Plus(wantsExecutorResources...)
for state.tasksLaunched < state.totalTasks && resources.ContainsAll(flattened, taskWantsResources) {
found := func() mesos.Resources {
if state.config.role == "*" {
return resources.Find(state.wantsTaskResources, remaining...)
}
reservation := mesos.Resource_ReservationInfo{
Type: mesos.Resource_ReservationInfo_STATIC.Enum(),
Role: &state.config.role,
}
return resources.Find(state.wantsTaskResources.PushReservation(reservation))
}()

if len(found) == 0 {
panic("illegal state: failed to find the resources that were supposedly contained")
}

state.tasksLaunched++
taskID := state.tasksLaunched

Expand All @@ -182,20 +197,16 @@ func resourceOffers(state *internalState) events.HandlerFunc {
}

task := mesos.TaskInfo{
TaskID: mesos.TaskID{Value: strconv.Itoa(taskID)},
AgentID: offers[i].AgentID,
Executor: state.executor,
Resources: resources.Find(
resources.Flatten(state.wantsTaskResources, resources.Role(state.role).Assign()),
remaining...,
),
TaskID: mesos.TaskID{Value: strconv.Itoa(taskID)},
AgentID: offers[i].AgentID,
Executor: state.executor,
Resources: found,
}
task.Name = "Task " + task.TaskID.Value

remaining.Subtract(task.Resources...)
tasks = append(tasks, task)

flattened = resources.Flatten(remaining)
remaining.Subtract(task.Resources...)
flattened = remaining.ToUnreserved()
}

// build Accept call to launch all of the tasks we've assembled
Expand Down
1 change: 1 addition & 0 deletions api/v1/cmd/example-scheduler/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func NewConfig() Config {
return Config{
user: env("FRAMEWORK_USER", "root"),
name: env("FRAMEWORK_NAME", "example"),
role: env("FRAMEWORK_ROLE", "*"),
url: env("MESOS_MASTER_HTTP", "http://:5050/api/v1/scheduler"),
codec: codec{Codec: codecs.ByMediaType[codecs.MediaTypeProtobuf]},
timeout: envDuration("MESOS_CONNECT_TIMEOUT", "20s"),
Expand Down
3 changes: 3 additions & 0 deletions api/v1/cmd/example-scheduler/app/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ func buildFrameworkInfo(cfg Config) *mesos.FrameworkInfo {
User: cfg.user,
Name: cfg.name,
Checkpoint: &cfg.checkpoint,
Capabilities: []mesos.FrameworkInfo_Capability{
{Type: mesos.FrameworkInfo_Capability_RESERVATION_REFINEMENT},
},
}
if cfg.failoverTimeout > 0 {
frameworkInfo.FailoverTimeout = &failoverTimeout
Expand Down
4 changes: 3 additions & 1 deletion api/v1/cmd/msh/msh.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/mesos/mesos-go/api/v1/lib/httpcli/httpagent"
"github.com/mesos/mesos-go/api/v1/lib/httpcli/httpsched"
"github.com/mesos/mesos-go/api/v1/lib/resources"
"github.com/mesos/mesos-go/api/v1/lib/roles"
"github.com/mesos/mesos-go/api/v1/lib/scheduler"
"github.com/mesos/mesos-go/api/v1/lib/scheduler/calls"
"github.com/mesos/mesos-go/api/v1/lib/scheduler/events"
Expand All @@ -47,7 +48,7 @@ var (
TaskName = "msh"
MesosMaster = "127.0.0.1:5050"
User = "root"
Role = resources.Role("*")
Role = roles.Role("*")
CPUs = float64(0.010)
Memory = float64(64)

Expand Down Expand Up @@ -100,6 +101,7 @@ func main() {
resources.NewCPUs(CPUs).Resource,
resources.NewMemory(Memory).Resource,
}

taskPrototype = mesos.TaskInfo{
Name: TaskName,
Command: &mesos.CommandInfo{
Expand Down

0 comments on commit 1187f40

Please sign in to comment.