Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MESOS: fix e2e suite.SchedulerPredicates validates resource limits of pods that are allowed to run #18348

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 6 additions & 4 deletions cluster/mesos/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mesosmaster1:
- MESOS_QUORUM=1
- MESOS_REGISTRY=in_memory
- MESOS_WORK_DIR=/var/lib/mesos
- MESOS_ROLES=role1
- MESOS_ROLES=public
links:
- etcd
- "ambassador:apiserver"
Expand All @@ -38,12 +38,13 @@ mesosslave:
- >
NAME=$$(cut -f2 -d/ <<<$${MESOSMASTER1_NAME}) &&
N=$${NAME##*_} &&
PUBLIC_RESOURCES="$$(if [ $${N} = 2 ]; then echo ";cpus(public):2;mem(public):640;ports(public):[7000-7999]"; fi)" &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still have ports here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clarified above

DOCKER_NETWORK_OFFSET=0.0.$${N}.0
exec wrapdocker mesos-slave
--work_dir="/var/tmp/mesos/$${N}"
--attributes="rack:$${N};gen:201$${N};role:role$${N}"
--attributes="rack:$${N};gen:201$${N}"
--hostname=$$(getent hosts mesosslave | cut -d' ' -f1 | sort -u | tail -1)
--resources="cpus:4;mem:1280;disk:25600;ports:[8000-21099];cpus(role$${N}):1;mem(role$${N}):640;disk(role$${N}):25600;ports(role$${N}):[7000-7999]"
--resources="cpus:4;mem:1280;disk:25600;ports:[8000-21099]$${PUBLIC_RESOURCES}"
command: []
environment:
- MESOS_MASTER=mesosmaster1:5050
Expand Down Expand Up @@ -144,7 +145,8 @@ scheduler:
--mesos-executor-cpus=1.0
--mesos-sandbox-overlay=/opt/sandbox-overlay.tar.gz
--static-pods-config=/opt/static-pods
--mesos-roles=*,role1
--mesos-framework-roles=*,public
--mesos-default-pod-roles=*,public
--v=4
--executor-logv=4
--profiling=true
Expand Down
2 changes: 2 additions & 0 deletions contrib/mesos/pkg/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {
pod,
executorinfo,
nil,
nil,
)

assert.Equal(t, nil, err, "must be able to create a task from a pod")
Expand Down Expand Up @@ -390,6 +391,7 @@ func TestExecutorFrameworkMessage(t *testing.T) {
pod,
executorinfo,
nil,
nil,
)

podTask.Spec = &podtask.Spec{
Expand Down
13 changes: 9 additions & 4 deletions contrib/mesos/pkg/executor/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ func nodeInfo(si *mesos.SlaveInfo, ei *mesos.ExecutorInfo) NodeInfo {
}
switch r.GetName() {
case "cpus":
executorCPU = r.GetScalar().GetValue()
executorCPU += r.GetScalar().GetValue()
case "mem":
executorMem = r.GetScalar().GetValue()
executorMem += r.GetScalar().GetValue()
}
}
}
Expand All @@ -55,10 +55,15 @@ func nodeInfo(si *mesos.SlaveInfo, ei *mesos.ExecutorInfo) NodeInfo {
// We intentionally take the floor of executorCPU because cores are integers
// and we would loose a complete cpu here if the value is <1.
// TODO(sttts): switch to float64 when "Machine Allocables" are implemented
ni.Cores = int(r.GetScalar().GetValue() - float64(int(executorCPU)))
ni.Cores += int(r.GetScalar().GetValue())
case "mem":
ni.Mem = int64(r.GetScalar().GetValue()-executorMem) * 1024 * 1024
ni.Mem += int64(r.GetScalar().GetValue()) * 1024 * 1024
}
}

// TODO(sttts): subtract executorCPU/Mem from static pod resources before subtracting them from the capacity
ni.Cores -= int(executorCPU)
ni.Mem -= int64(executorMem) * 1024 * 1024

return ni
}
34 changes: 18 additions & 16 deletions contrib/mesos/pkg/scheduler/components/algorithm/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ type SchedulerAlgorithm interface {

// SchedulerAlgorithm implements the algorithm.ScheduleAlgorithm interface
type schedulerAlgorithm struct {
sched scheduler.Scheduler
podUpdates queue.FIFO
podScheduler podschedulers.PodScheduler
prototype *mesosproto.ExecutorInfo
roles []string
defaultCpus mresource.CPUShares
defaultMem mresource.MegaBytes
sched scheduler.Scheduler
podUpdates queue.FIFO
podScheduler podschedulers.PodScheduler
prototype *mesosproto.ExecutorInfo
frameworkRoles []string
defaultPodRoles []string
defaultCpus mresource.CPUShares
defaultMem mresource.MegaBytes
}

// New returns a new SchedulerAlgorithm
Expand All @@ -58,18 +59,19 @@ func New(
podUpdates queue.FIFO,
podScheduler podschedulers.PodScheduler,
prototype *mesosproto.ExecutorInfo,
roles []string,
frameworkRoles, defaultPodRoles []string,
defaultCpus mresource.CPUShares,
defaultMem mresource.MegaBytes,
) SchedulerAlgorithm {
return &schedulerAlgorithm{
sched: sched,
podUpdates: podUpdates,
podScheduler: podScheduler,
roles: roles,
prototype: prototype,
defaultCpus: defaultCpus,
defaultMem: defaultMem,
sched: sched,
podUpdates: podUpdates,
podScheduler: podScheduler,
frameworkRoles: frameworkRoles,
defaultPodRoles: defaultPodRoles,
prototype: prototype,
defaultCpus: defaultCpus,
defaultMem: defaultMem,
}
}

Expand Down Expand Up @@ -107,7 +109,7 @@ func (k *schedulerAlgorithm) Schedule(pod *api.Pod) (string, error) {
// From here on we can expect that the pod spec of a task has proper limits for CPU and memory.
k.limitPod(pod)

podTask, err := podtask.New(ctx, "", pod, k.prototype, k.roles)
podTask, err := podtask.New(ctx, "", pod, k.prototype, k.frameworkRoles, k.defaultPodRoles)
if err != nil {
log.Warningf("aborting Schedule, unable to create podtask object %+v: %v", pod, err)
return "", err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func TestDeleteOne_PendingPod(t *testing.T) {
pod.Pod,
&mesosproto.ExecutorInfo{},
nil,
nil,
)
if err != nil {
t.Fatalf("failed to create task: %v", err)
Expand Down Expand Up @@ -113,6 +114,7 @@ func TestDeleteOne_Running(t *testing.T) {
pod.Pod,
&mesosproto.ExecutorInfo{},
nil,
nil,
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions contrib/mesos/pkg/scheduler/components/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func New(
mux *http.ServeMux,
lw *cache.ListWatch,
prototype *mesos.ExecutorInfo,
roles []string,
frameworkRoles, defaultPodRoles []string,
defaultCpus mresource.CPUShares,
defaultMem mresource.MegaBytes,
) scheduler.Scheduler {
Expand All @@ -81,7 +81,7 @@ func New(

q := queuer.New(queue.NewDelayFIFO(), podUpdates)

algorithm := algorithm.New(core, podUpdates, ps, prototype, roles, defaultCpus, defaultMem)
algorithm := algorithm.New(core, podUpdates, ps, prototype, frameworkRoles, defaultPodRoles, defaultCpus, defaultMem)

podDeleter := deleter.New(core, q)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ func newLifecycleTest(t *testing.T) lifecycleTest {
&podsListWatch.ListWatch,
ei,
[]string{"*"},
[]string{"*"},
mresource.DefaultDefaultContainerCPULimit,
mresource.DefaultDefaultContainerMemLimit,
)
Expand Down
64 changes: 35 additions & 29 deletions contrib/mesos/pkg/scheduler/podtask/pod_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const (
Deleted = FlagType("deleted")
)

var defaultRoles = []string{"*"}
var starRole = []string{"*"}

// A struct that describes a pod task.
type T struct {
Expand All @@ -68,13 +68,14 @@ type T struct {
CreateTime time.Time
UpdatedTime time.Time // time of the most recent StatusUpdate we've seen from the mesos master

podStatus api.PodStatus
prototype *mesos.ExecutorInfo // readonly
allowedRoles []string // roles under which pods are allowed to be launched
podKey string
launchTime time.Time
bindTime time.Time
mapper HostPortMapper
podStatus api.PodStatus
prototype *mesos.ExecutorInfo // readonly
frameworkRoles []string // Mesos framework roles, pods are allowed to be launched with those
defaultPodRoles []string // roles under which pods are scheduled if none are specified in labels
podKey string
launchTime time.Time
bindTime time.Time
mapper HostPortMapper
}

type Port struct {
Expand Down Expand Up @@ -168,34 +169,38 @@ func (t *T) Has(f FlagType) (exists bool) {
return
}

// Roles returns the valid roles under which this pod task can be scheduled.
// If the pod has roles labels defined they are being used
// else default pod roles are being returned.
func (t *T) Roles() []string {
var roles []string

if r, ok := t.Pod.ObjectMeta.Labels[annotation.RolesKey]; ok {
roles = strings.Split(r, ",")
roles := strings.Split(r, ",")

for i, r := range roles {
roles[i] = strings.TrimSpace(r)
}

roles = filterRoles(roles, not(emptyRole), not(seenRole()))
} else {
// no roles label defined,
// by convention return the first allowed role
// to be used for launching the pod task
return []string{t.allowedRoles[0]}
return filterRoles(
roles,
not(emptyRole), not(seenRole()), inRoles(t.frameworkRoles...),
)
}

return filterRoles(roles, inRoles(t.allowedRoles...))
// no roles label defined, return defaults
return t.defaultPodRoles
}

func New(ctx api.Context, id string, pod *api.Pod, prototype *mesos.ExecutorInfo, allowedRoles []string) (*T, error) {
func New(ctx api.Context, id string, pod *api.Pod, prototype *mesos.ExecutorInfo, frameworkRoles, defaultPodRoles []string) (*T, error) {
if prototype == nil {
return nil, fmt.Errorf("illegal argument: executor is nil")
}

if len(allowedRoles) == 0 {
allowedRoles = defaultRoles
if len(frameworkRoles) == 0 {
frameworkRoles = starRole
}

if len(defaultPodRoles) == 0 {
defaultPodRoles = starRole
}

key, err := MakePodKey(ctx, pod.Name)
Expand All @@ -208,14 +213,15 @@ func New(ctx api.Context, id string, pod *api.Pod, prototype *mesos.ExecutorInfo
}

task := &T{
ID: id,
Pod: *pod,
State: StatePending,
podKey: key,
mapper: NewHostPortMapper(pod),
Flags: make(map[FlagType]struct{}),
prototype: prototype,
allowedRoles: allowedRoles,
ID: id,
Pod: *pod,
State: StatePending,
podKey: key,
mapper: NewHostPortMapper(pod),
Flags: make(map[FlagType]struct{}),
prototype: prototype,
frameworkRoles: frameworkRoles,
defaultPodRoles: defaultPodRoles,
}
task.CreateTime = time.Now()

Expand Down
21 changes: 11 additions & 10 deletions contrib/mesos/pkg/scheduler/podtask/pod_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (
t_min_mem = 128
)

func fakePodTask(id string, roles ...string) *T {
func fakePodTask(id string, allowedRoles, defaultRoles []string) *T {
t, _ := New(
api.NewDefaultContext(),
"",
Expand All @@ -45,7 +45,8 @@ func fakePodTask(id string, roles ...string) *T {
},
},
&mesos.ExecutorInfo{},
roles,
allowedRoles,
defaultRoles,
)

return t
Expand All @@ -62,12 +63,12 @@ func TestRoles(t *testing.T) {
{
map[string]string{},
nil,
defaultRoles,
starRole,
},
{
map[string]string{"other": "label"},
nil,
defaultRoles,
starRole,
},
{
map[string]string{meta.RolesKey: ""},
Expand Down Expand Up @@ -100,10 +101,10 @@ func TestRoles(t *testing.T) {
{
map[string]string{},
[]string{"role1"},
[]string{"role1"},
[]string{"*"},
},
} {
task := fakePodTask("test", tt.frameworkRoles...)
task := fakePodTask("test", tt.frameworkRoles, starRole)
task.Pod.ObjectMeta.Labels = tt.labels
assert.True(reflect.DeepEqual(task.Roles(), tt.want), "test #%d got %#v want %#v", i, task.Roles(), tt.want)
}
Expand All @@ -127,7 +128,7 @@ func (mr mockRegistry) Invalidate(hostname string) {

func TestEmptyOffer(t *testing.T) {
t.Parallel()
task := fakePodTask("foo")
task := fakePodTask("foo", nil, nil)

task.Pod.Spec = api.PodSpec{
Containers: []api.Container{{
Expand Down Expand Up @@ -156,7 +157,7 @@ func TestEmptyOffer(t *testing.T) {

func TestNoPortsInPodOrOffer(t *testing.T) {
t.Parallel()
task := fakePodTask("foo")
task := fakePodTask("foo", nil, nil)

task.Pod.Spec = api.PodSpec{
Containers: []api.Container{{
Expand Down Expand Up @@ -206,7 +207,7 @@ func TestNoPortsInPodOrOffer(t *testing.T) {

func TestAcceptOfferPorts(t *testing.T) {
t.Parallel()
task := fakePodTask("foo")
task := fakePodTask("foo", nil, nil)
pod := &task.Pod

defaultProc := NewDefaultProcurement(
Expand Down Expand Up @@ -376,7 +377,7 @@ func TestNodeSelector(t *testing.T) {
)

for _, ts := range tests {
task := fakePodTask("foo")
task := fakePodTask("foo", nil, nil)
task.Pod.Spec.NodeSelector = ts.selector
offer := &mesos.Offer{
Resources: []*mesos.Resource{
Expand Down