Skip to content

Commit

Permalink
lib: break bits out of resources into extras, resourcefilters, resour…
Browse files Browse the repository at this point in the history
…cetest
  • Loading branch information
James DeFelice committed Jun 13, 2017
1 parent 600b7c8 commit 13d8b85
Show file tree
Hide file tree
Showing 15 changed files with 1,004 additions and 888 deletions.
8 changes: 5 additions & 3 deletions api/v1/cmd/example-scheduler/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import (
"github.com/mesos/mesos-go/api/v1/lib"
"github.com/mesos/mesos-go/api/v1/lib/backoff"
xmetrics "github.com/mesos/mesos-go/api/v1/lib/extras/metrics"
"github.com/mesos/mesos-go/api/v1/lib/extras/resources"
"github.com/mesos/mesos-go/api/v1/lib/extras/scheduler/callrules"
"github.com/mesos/mesos-go/api/v1/lib/extras/scheduler/controller"
"github.com/mesos/mesos-go/api/v1/lib/extras/scheduler/eventrules"
"github.com/mesos/mesos-go/api/v1/lib/extras/store"
"github.com/mesos/mesos-go/api/v1/lib/resourcefilters"
"github.com/mesos/mesos-go/api/v1/lib/scheduler"
"github.com/mesos/mesos-go/api/v1/lib/scheduler/calls"
"github.com/mesos/mesos-go/api/v1/lib/scheduler/events"
Expand Down Expand Up @@ -163,9 +165,9 @@ func resourceOffers(state *internalState) events.HandlerFunc {

// avoid the expense of computing these if we can...
if state.config.summaryMetrics && state.config.resourceTypeMetrics {
for name, restype := range flattened.Types() {
for name, restype := range resources.Types(flattened...) {
if restype == mesos.SCALAR {
sum := flattened.SumScalars(mesos.NamedResources(name))
sum := resources.SumScalars(resourcefilters.Named(name), flattened...)
state.metricsAPI.offeredResources(sum.GetValue(), name)
}
}
Expand All @@ -184,7 +186,7 @@ func resourceOffers(state *internalState) events.HandlerFunc {
TaskID: mesos.TaskID{Value: strconv.Itoa(taskID)},
AgentID: offers[i].AgentID,
Executor: state.executor,
Resources: remaining.Find(state.wantsTaskResources.Flatten(mesos.RoleName(state.role).Assign())),
Resources: resources.Find(state.wantsTaskResources.Flatten(mesos.RoleName(state.role).Assign()), remaining...),
}
task.Name = "Task " + task.TaskID.Value

Expand Down
8 changes: 4 additions & 4 deletions api/v1/cmd/example-scheduler/app/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,17 @@ func prepareExecutorInfo(

func buildWantsTaskResources(config Config) (r mesos.Resources) {
r.Add(
resources.CPUs(config.taskCPU).Resource,
resources.Memory(config.taskMemory).Resource,
resources.NewCPUs(config.taskCPU).Resource,
resources.NewMemory(config.taskMemory).Resource,
)
log.Println("wants-task-resources = " + r.String())
return
}

func buildWantsExecutorResources(config Config) (r mesos.Resources) {
r.Add(
resources.CPUs(config.execCPU).Resource,
resources.Memory(config.execMemory).Resource,
resources.NewCPUs(config.execCPU).Resource,
resources.NewMemory(config.execMemory).Resource,
)
log.Println("wants-executor-resources = " + r.String())
return
Expand Down
6 changes: 3 additions & 3 deletions api/v1/cmd/msh/msh.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ func main() {
}

wantsResources = mesos.Resources{
resources.CPUs(CPUs).Resource,
resources.Memory(Memory).Resource,
resources.NewCPUs(CPUs).Resource,
resources.NewMemory(Memory).Resource,
}
taskPrototype = mesos.TaskInfo{
Name: TaskName,
Expand Down Expand Up @@ -169,7 +169,7 @@ func resourceOffers(caller calls.Caller) events.HandlerFunc {
task := taskPrototype
task.TaskID = mesos.TaskID{Value: time.Now().Format(RFC3339a)}
task.AgentID = match.AgentID
task.Resources = mesos.Resources(match.Resources).Find(wantsResources.Flatten(Role.Assign()))
task.Resources = resources.Find(wantsResources.Flatten(Role.Assign()), match.Resources...)

err = calls.CallNoData(ctx, caller, calls.Accept(
calls.OfferOperations{calls.OpLaunch(task)}.WithOffers(match.ID),
Expand Down
125 changes: 125 additions & 0 deletions api/v1/lib/extras/resources/aggregate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package resources

import (
"github.com/mesos/mesos-go/api/v1/lib"
"github.com/mesos/mesos-go/api/v1/lib/resourcefilters"
)

func SumScalars(rf resourcefilters.Filter, resources ...mesos.Resource) *mesos.Value_Scalar {
predicate := resourcefilters.Filters{rf, resourcefilters.Scalar}
var x *mesos.Value_Scalar
for i := range resources {
if !predicate.Accepts(&resources[i]) {
continue
}
x = x.Add(resources[i].GetScalar())
}
return x
}

func SumRanges(rf resourcefilters.Filter, resources ...mesos.Resource) *mesos.Value_Ranges {
predicate := resourcefilters.Filters{rf, resourcefilters.Range}
var x *mesos.Value_Ranges
for i := range resources {
if !predicate.Accepts(&resources[i]) {
continue
}
x = x.Add(resources[i].GetRanges())
}
return x
}

func SumSets(rf resourcefilters.Filter, resources ...mesos.Resource) *mesos.Value_Set {
predicate := resourcefilters.Filters{rf, resourcefilters.Set}
var x *mesos.Value_Set
for i := range resources {
if !predicate.Accepts(&resources[i]) {
continue
}
x = x.Add(resources[i].GetSet())
}
return x
}

func CPUs(resources ...mesos.Resource) (float64, bool) {
v := SumScalars(resourcefilters.Named(ResourceNameCPUs), resources...)
if v != nil {
return v.Value, true
}
return 0, false
}

func GPUs(resources ...mesos.Resource) (float64, bool) {
v := SumScalars(resourcefilters.Named(ResourceNameGPUs), resources...)
if v != nil {
return v.Value, true
}
return 0, false
}

func Memory(resources ...mesos.Resource) (uint64, bool) {
v := SumScalars(resourcefilters.Named(ResourceNameMem), resources...)
if v != nil {
return uint64(v.Value), true
}
return 0, false
}

func Disk(resources ...mesos.Resource) (uint64, bool) {
v := SumScalars(resourcefilters.Named(ResourceNameDisk), resources...)
if v != nil {
return uint64(v.Value), true
}
return 0, false
}

func Ports(resources ...mesos.Resource) (mesos.Ranges, bool) {
v := SumRanges(resourcefilters.Named(ResourceNamePorts), resources...)
if v != nil {
return mesos.Ranges(v.Range), true
}
return nil, false
}

func Types(resources ...mesos.Resource) map[string]mesos.Value_Type {
m := map[string]mesos.Value_Type{}
for i := range resources {
m[resources[i].GetName()] = resources[i].GetType()
}
return m
}

func Names(resources ...mesos.Resource) (names []string) {
m := map[string]struct{}{}
for i := range resources {
n := resources[i].GetName()
if _, ok := m[n]; !ok {
m[n] = struct{}{}
names = append(names, n)
}
}
return
}

func SumAndCompare(expected mesos.Resources, resources ...mesos.Resource) bool {
// from: https://github.com/apache/mesos/blob/master/src/common/resources.cpp
// This is a sanity check to ensure the amount of each type of
// resource does not change.
// TODO(jieyu): Currently, we only check known resource types like
// cpus, mem, disk, ports, etc. We should generalize this.
var (
c1, c2 = CPUs(expected...)
m1, m2 = Memory(expected...)
d1, d2 = Disk(expected...)
p1, p2 = Ports(expected...)

c3, c4 = CPUs(resources...)
m3, m4 = Memory(resources...)
d3, d4 = Disk(resources...)
p3, p4 = Ports(resources...)
)
return c1 == c3 && c2 == c4 &&
m1 == m3 && m2 == m4 &&
d1 == d3 && d2 == d4 &&
p1.Equivalent(p3) && p2 == p4
}
16 changes: 8 additions & 8 deletions api/v1/lib/extras/resources/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@ type (
RangeBuilder struct{ mesos.Ranges }
)

func CPUs(value float64) *Builder {
return Build().Name("cpus").Scalar(value)
func NewCPUs(value float64) *Builder {
return Build().Name(ResourceNameCPUs).Scalar(value)
}

func Memory(value float64) *Builder {
return Build().Name("mem").Scalar(value)
func NewMemory(value float64) *Builder {
return Build().Name(ResourceNameMem).Scalar(value)
}

func Disk(value float64) *Builder {
return Build().Name("disk").Scalar(value)
func NewDisk(value float64) *Builder {
return Build().Name(ResourceNameDisk).Scalar(value)
}

func GPUs(value uint) *Builder {
return Build().Name("gpus").Scalar(float64(value))
func NewGPUs(value uint) *Builder {
return Build().Name(ResourceNameGPUs).Scalar(float64(value))
}

func BuildRanges() *RangeBuilder {
Expand Down
53 changes: 53 additions & 0 deletions api/v1/lib/extras/resources/find.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package resources

import (
"github.com/mesos/mesos-go/api/v1/lib"
"github.com/mesos/mesos-go/api/v1/lib/resourcefilters"
)

func Find(wants mesos.Resources, from ...mesos.Resource) (total mesos.Resources) {
for i := range wants {
found := find(wants[i], from...)

// each want *must* be found
if len(found) == 0 {
return nil
}

total.Add(found...)
}
return total
}

func find(want mesos.Resource, from ...mesos.Resource) mesos.Resources {
var (
total = mesos.Resources(from).Clone()
remaining = mesos.Resources{want}.Flatten()
found mesos.Resources
predicates = resourcefilters.Filters{
resourcefilters.ReservedByRole(want.GetRole()),
resourcefilters.Unreserved,
resourcefilters.Any,
}
)
for _, predicate := range predicates {
filtered := resourcefilters.Select(predicate, total...)
for i := range filtered {
// need to flatten to ignore the roles in ContainsAll()
flattened := mesos.Resources{filtered[i]}.Flatten()
if flattened.ContainsAll(remaining) {
// want has been found, return the result
return found.Add(remaining.Flatten(
mesos.RoleName(filtered[i].GetRole()).Assign(),
filtered[i].Reservation.Assign())...)
}
if remaining.ContainsAll(flattened) {
found.Add1(filtered[i])
total.Subtract1(filtered[i])
remaining.Subtract(flattened...)
break
}
}
}
return nil
}
9 changes: 9 additions & 0 deletions api/v1/lib/extras/resources/names.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package resources

const (
ResourceNameCPUs = "cpus"
ResourceNameDisk = "disk"
ResourceNameGPUs = "gpus"
ResourceNameMem = "mem"
ResourceNamePorts = "ports"
)
Loading

0 comments on commit 13d8b85

Please sign in to comment.