Skip to content

Commit

Permalink
resources: s/ResourceName/Name; refactor summing logic w/ Reducer func
Browse files Browse the repository at this point in the history
  • Loading branch information
James DeFelice committed Jun 23, 2017
1 parent fbfea27 commit 2eff7b4
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 115 deletions.
5 changes: 2 additions & 3 deletions api/v1/cmd/example-scheduler/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/mesos/mesos-go/api/v1/lib/extras/scheduler/controller"
"github.com/mesos/mesos-go/api/v1/lib/extras/scheduler/eventrules"
"github.com/mesos/mesos-go/api/v1/lib/extras/store"
"github.com/mesos/mesos-go/api/v1/lib/resourcefilters"
"github.com/mesos/mesos-go/api/v1/lib/scheduler"
"github.com/mesos/mesos-go/api/v1/lib/scheduler/calls"
"github.com/mesos/mesos-go/api/v1/lib/scheduler/events"
Expand Down Expand Up @@ -167,8 +166,8 @@ func resourceOffers(state *internalState) events.HandlerFunc {
if state.config.summaryMetrics && state.config.resourceTypeMetrics {
for name, restype := range resources.Types(flattened...) {
if restype == mesos.SCALAR {
sum := resources.SumScalars(resourcefilters.Named(name), flattened...)
state.metricsAPI.offeredResources(sum.GetValue(), name)
sum, _ := name.Sum(flattened...)
state.metricsAPI.offeredResources(sum.GetScalar().GetValue(), name.String())
}
}
}
Expand Down
157 changes: 70 additions & 87 deletions api/v1/lib/extras/resources/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,97 +2,54 @@ package resources

import (
"github.com/mesos/mesos-go/api/v1/lib"
"github.com/mesos/mesos-go/api/v1/lib/resourcefilters"
)

func SumScalars(rf resourcefilters.Filter, resources ...mesos.Resource) *mesos.Value_Scalar {
predicate := resourcefilters.Filters{rf, resourcefilters.Scalar}
var x *mesos.Value_Scalar
for i := range resources {
if !predicate.Accepts(&resources[i]) {
continue
}
x = x.Add(resources[i].GetScalar())
}
return x
}

func SumRanges(rf resourcefilters.Filter, resources ...mesos.Resource) *mesos.Value_Ranges {
predicate := resourcefilters.Filters{rf, resourcefilters.Range}
var x *mesos.Value_Ranges
for i := range resources {
if !predicate.Accepts(&resources[i]) {
continue
}
x = x.Add(resources[i].GetRanges())
}
return x
}

func SumSets(rf resourcefilters.Filter, resources ...mesos.Resource) *mesos.Value_Set {
predicate := resourcefilters.Filters{rf, resourcefilters.Set}
var x *mesos.Value_Set
for i := range resources {
if !predicate.Accepts(&resources[i]) {
continue
}
x = x.Add(resources[i].GetSet())
func (n Name) Sum(resources ...mesos.Resource) (*mesos.Resource, bool) {
v := Reduce(Sum(n.Filter), resources...)
if v != nil {
return v, true
}
return x
return nil, false
}

func CPUs(resources ...mesos.Resource) (float64, bool) {
v := SumScalars(resourcefilters.Named(ResourceNameCPUs), resources...)
if v != nil {
return v.Value, true
}
return 0, false
v, ok := NameCPUs.Sum(resources...)
return v.GetScalar().GetValue(), ok
}

func GPUs(resources ...mesos.Resource) (float64, bool) {
v := SumScalars(resourcefilters.Named(ResourceNameGPUs), resources...)
if v != nil {
return v.Value, true
}
return 0, false
func GPUs(resources ...mesos.Resource) (uint64, bool) {
v, ok := NameGPUs.Sum(resources...)
return uint64(v.GetScalar().GetValue()), ok
}

func Memory(resources ...mesos.Resource) (uint64, bool) {
v := SumScalars(resourcefilters.Named(ResourceNameMem), resources...)
if v != nil {
return uint64(v.Value), true
}
return 0, false
v, ok := NameMem.Sum(resources...)
return uint64(v.GetScalar().GetValue()), ok
}

func Disk(resources ...mesos.Resource) (uint64, bool) {
v := SumScalars(resourcefilters.Named(ResourceNameDisk), resources...)
if v != nil {
return uint64(v.Value), true
}
return 0, false
v, ok := NameDisk.Sum(resources...)
return uint64(v.GetScalar().GetValue()), ok
}

func Ports(resources ...mesos.Resource) (mesos.Ranges, bool) {
v := SumRanges(resourcefilters.Named(ResourceNamePorts), resources...)
if v != nil {
return mesos.Ranges(v.Range), true
}
return nil, false
v, ok := NamePorts.Sum(resources...)
return mesos.Ranges(v.GetRanges().GetRange()), ok
}

func Types(resources ...mesos.Resource) map[string]mesos.Value_Type {
m := map[string]mesos.Value_Type{}
func Types(resources ...mesos.Resource) map[Name]mesos.Value_Type {
m := map[Name]mesos.Value_Type{}
for i := range resources {
m[resources[i].GetName()] = resources[i].GetType()
name := Name(resources[i].GetName())
m[name] = resources[i].GetType() // TODO(jdef) check for conflicting types?
}
return m
}

func Names(resources ...mesos.Resource) (names []string) {
m := map[string]struct{}{}
func Names(resources ...mesos.Resource) (names []Name) {
m := map[Name]struct{}{}
for i := range resources {
n := resources[i].GetName()
n := Name(resources[i].GetName())
if _, ok := m[n]; !ok {
m[n] = struct{}{}
names = append(names, n)
Expand All @@ -101,30 +58,55 @@ func Names(resources ...mesos.Resource) (names []string) {
return
}

func SumAndCompare(expected mesos.Resources, resources ...mesos.Resource) bool {
func SumAndCompare(expected []mesos.Resource, resources ...mesos.Resource) bool {
// from: https://github.com/apache/mesos/blob/master/src/common/resources.cpp
// This is a sanity check to ensure the amount of each type of
// resource does not change.
// TODO(jieyu): Currently, we only check known resource types like
// cpus, mem, disk, ports, etc. We should generalize this.
type total struct {
v *mesos.Resource
t mesos.Value_Type
ok bool
}
calcTotals := func(r []mesos.Resource) (m map[Name]total) {
m = make(map[Name]total)
for n, t := range Types(expected...) {
v, ok := n.Sum(expected...)
m[n] = total{v, t, ok}
}
return
}
var (
c1, c2 = CPUs(expected...)
m1, m2 = Memory(expected...)
d1, d2 = Disk(expected...)
p1, p2 = Ports(expected...)
g1, g2 = GPUs(expected...)

c3, c4 = CPUs(resources...)
m3, m4 = Memory(resources...)
d3, d4 = Disk(resources...)
p3, p4 = Ports(resources...)
g3, g4 = GPUs(resources...)
et = calcTotals(expected)
rt = calcTotals(resources)
)
return c1 == c3 && c2 == c4 &&
m1 == m3 && m2 == m4 &&
d1 == d3 && d2 == d4 &&
g1 == g3 && g2 == g4 &&
p1.Equivalent(p3) && p2 == p4
for n, tot := range et {
r, ok := rt[n]
if !ok {
return false
}
if tot.t != r.t || tot.ok != r.ok {
return false
}
switch r.t {
case mesos.SCALAR:
if v1, v2 := tot.v.GetScalar().GetValue(), r.v.GetScalar().GetValue(); v1 != v2 {
return false
}
case mesos.RANGES:
v1, v2 := mesos.Ranges(tot.v.GetRanges().GetRange()), mesos.Ranges(r.v.GetRanges().GetRange())
if !v1.Equivalent(v2) { // TODO(jdef): assumes that v1 and v2 are in sort-order, is that guaranteed here?
return false
}
case mesos.SET:
if tot.v.GetSet().Compare(r.v.GetSet()) != 0 {
return false
}
default:
// noop; we don't know how to sum other types, so ignore...
}
delete(rt, n)
}
return len(rt) == 0
}

type (
Expand All @@ -141,7 +123,8 @@ type (
func (fc *flattenConfig) WithRole(role string) { fc.role = role }
func (fc *flattenConfig) WithReservation(r *mesos.Resource_ReservationInfo) { fc.reservation = r }

func Flatten(resources []mesos.Resource, opts ...FlattenOpt) (flattened mesos.Resources) {
func Flatten(resources []mesos.Resource, opts ...FlattenOpt) []mesos.Resource {
var flattened mesos.Resources
fc := &flattenConfig{}
for _, f := range opts {
f(fc)
Expand All @@ -155,5 +138,5 @@ func Flatten(resources []mesos.Resource, opts ...FlattenOpt) (flattened mesos.Re
r.Reservation = fc.reservation
flattened.Add1(r)
}
return
return flattened
}
11 changes: 5 additions & 6 deletions api/v1/lib/extras/resources/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package resources_test

import (
"reflect"
"sort"
"testing"

"github.com/mesos/mesos-go/api/v1/lib"
Expand All @@ -18,9 +17,9 @@ func TestResources_Types(t *testing.T) {
Resource(Name("ports"), ValueRange(Span(11, 20))),
)
types := rez.Types(rs...)
expected := map[string]mesos.Value_Type{
"cpus": mesos.SCALAR,
"ports": mesos.RANGES,
expected := map[rez.Name]mesos.Value_Type{
rez.NameCPUs: mesos.SCALAR,
rez.NamePorts: mesos.RANGES,
}
if !reflect.DeepEqual(types, expected) {
t.Fatalf("expected %v instead of %v", expected, types)
Expand All @@ -35,8 +34,8 @@ func TestResources_Names(t *testing.T) {
Resource(Name("mem"), ValueScalar(10)),
)
names := rez.Names(rs...)
sort.Strings(names)
expected := []string{"cpus", "mem"}
rez.NameSlice(names).Sort()
expected := []rez.Name{rez.NameCPUs, rez.NameMem}
if !reflect.DeepEqual(names, expected) {
t.Fatalf("expected %v instead of %v", expected, names)
}
Expand Down
14 changes: 8 additions & 6 deletions api/v1/lib/extras/resources/builders.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package resources

import (
"fmt"

"github.com/mesos/mesos-go/api/v1/lib"
)

Expand All @@ -12,19 +14,19 @@ type (
)

func NewCPUs(value float64) *Builder {
return Build().Name(ResourceNameCPUs).Scalar(value)
return Build().Name(NameCPUs).Scalar(value)
}

func NewMemory(value float64) *Builder {
return Build().Name(ResourceNameMem).Scalar(value)
return Build().Name(NameMem).Scalar(value)
}

func NewDisk(value float64) *Builder {
return Build().Name(ResourceNameDisk).Scalar(value)
return Build().Name(NameDisk).Scalar(value)
}

func NewGPUs(value uint) *Builder {
return Build().Name(ResourceNameGPUs).Scalar(float64(value))
return Build().Name(NameGPUs).Scalar(float64(value))
}

func BuildRanges() *RangeBuilder {
Expand All @@ -41,8 +43,8 @@ func (rb *RangeBuilder) Span(bp, ep uint64) *RangeBuilder {
func Build() *Builder {
return &Builder{}
}
func (rb *Builder) Name(name string) *Builder {
rb.Resource.Name = name
func (rb *Builder) Name(name fmt.Stringer) *Builder {
rb.Resource.Name = name.String()
return rb
}
func (rb *Builder) Role(role string) *Builder {
Expand Down
2 changes: 1 addition & 1 deletion api/v1/lib/extras/resources/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func Find(wants mesos.Resources, from ...mesos.Resource) (total mesos.Resources)
func find(want mesos.Resource, from ...mesos.Resource) mesos.Resources {
var (
total = mesos.Resources(from).Clone()
remaining = Flatten(mesos.Resources{want})
remaining = mesos.Resources(Flatten(mesos.Resources{want}))
found mesos.Resources
predicates = resourcefilters.Filters{
resourcefilters.ReservedByRole(want.GetRole()),
Expand Down
31 changes: 26 additions & 5 deletions api/v1/lib/extras/resources/names.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,30 @@
package resources

import (
"sort"

"github.com/mesos/mesos-go/api/v1/lib"
)

type (
Name string
NameSlice []Name
)

const (
ResourceNameCPUs = "cpus"
ResourceNameDisk = "disk"
ResourceNameGPUs = "gpus"
ResourceNameMem = "mem"
ResourceNamePorts = "ports"
NameCPUs = Name("cpus")
NameDisk = Name("disk")
NameGPUs = Name("gpus")
NameMem = Name("mem")
NamePorts = Name("ports")
)

// String implements fmt.Stringer
func (n Name) String() string { return string(n) }
func (n Name) Filter(r *mesos.Resource) bool { return r != nil && r.Name == string(n) }

func (ns NameSlice) Len() int { return len(ns) }
func (ns NameSlice) Less(i, j int) bool { return ns[i] < ns[j] }
func (ns NameSlice) Swap(i, j int) { ns[i], ns[j] = ns[j], ns[i] }

func (ns NameSlice) Sort() { sort.Stable(ns) }
Loading

0 comments on commit 2eff7b4

Please sign in to comment.