Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MESOS: Add pod resource request support #17054

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
89 changes: 0 additions & 89 deletions contrib/mesos/pkg/scheduler/podtask/pod_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ import (
"github.com/gogo/protobuf/proto"
mesos "github.com/mesos/mesos-go/mesosproto"
mutil "github.com/mesos/mesos-go/mesosutil"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
)

const (
Expand All @@ -43,93 +41,6 @@ func fakePodTask(id string) (*T, error) {
})
}

func TestUnlimitedResources(t *testing.T) {
assert := assert.New(t)

task, _ := fakePodTask("unlimited")
pod := &task.Pod
pod.Spec = api.PodSpec{
Containers: []api.Container{{
Name: "a",
Ports: []api.ContainerPort{{
HostPort: 123,
}},
Resources: api.ResourceRequirements{
Limits: api.ResourceList{
api.ResourceCPU: *resource.NewQuantity(3, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(768*1024*1024, resource.BinarySI),
},
},
}, {
Name: "b",
}, {
Name: "c",
}},
}

beforeLimitingCPU := mresource.CPUForPod(pod, mresource.DefaultDefaultContainerCPULimit)
beforeLimitingMem := mresource.MemForPod(pod, mresource.DefaultDefaultContainerMemLimit)

unboundedCPU := mresource.LimitPodCPU(pod, mresource.DefaultDefaultContainerCPULimit)
unboundedMem := mresource.LimitPodMem(pod, mresource.DefaultDefaultContainerMemLimit)

cpu := mresource.PodCPULimit(pod)
mem := mresource.PodMemLimit(pod)

assert.True(unboundedCPU, "CPU resources are defined as unlimited")
assert.True(unboundedMem, "mem resources are defined as unlimited")

assert.Equal(2*float64(mresource.DefaultDefaultContainerCPULimit)+3.0, float64(cpu))
assert.Equal(2*float64(mresource.DefaultDefaultContainerMemLimit)+768.0, float64(mem))

assert.Equal(cpu, beforeLimitingCPU)
assert.Equal(mem, beforeLimitingMem)
}

func TestLimitedResources(t *testing.T) {
assert := assert.New(t)

task, _ := fakePodTask("limited")
pod := &task.Pod
pod.Spec = api.PodSpec{
Containers: []api.Container{{
Name: "a",
Resources: api.ResourceRequirements{
Limits: api.ResourceList{
api.ResourceCPU: *resource.NewQuantity(1, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(256*1024*1024, resource.BinarySI),
},
},
}, {
Name: "b",
Resources: api.ResourceRequirements{
Limits: api.ResourceList{
api.ResourceCPU: *resource.NewQuantity(2, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(512*1024*1024, resource.BinarySI),
},
},
}},
}

beforeLimitingCPU := mresource.CPUForPod(pod, mresource.DefaultDefaultContainerCPULimit)
beforeLimitingMem := mresource.MemForPod(pod, mresource.DefaultDefaultContainerMemLimit)

unboundedCPU := mresource.LimitPodCPU(pod, mresource.DefaultDefaultContainerCPULimit)
unboundedMem := mresource.LimitPodMem(pod, mresource.DefaultDefaultContainerMemLimit)

cpu := mresource.PodCPULimit(pod)
mem := mresource.PodMemLimit(pod)

assert.False(unboundedCPU, "CPU resources are defined as limited")
assert.False(unboundedMem, "mem resources are defined as limited")

assert.Equal(3.0, float64(cpu))
assert.Equal(768.0, float64(mem))

assert.Equal(cpu, beforeLimitingCPU)
assert.Equal(mem, beforeLimitingMem)
}

func TestEmptyOffer(t *testing.T) {
t.Parallel()
task, err := fakePodTask("foo")
Expand Down
2 changes: 1 addition & 1 deletion contrib/mesos/pkg/scheduler/podtask/port_mapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func TestWildcardHostPortMatching(t *testing.T) {
}},
}},
}
task, err = New(api.NewDefaultContext(), "", *pod, &mesos.ExecutorInfo{})
task, err = New(api.NewDefaultContext(), "", pod)
if err != nil {
t.Fatal(err)
}
Expand Down
11 changes: 9 additions & 2 deletions contrib/mesos/pkg/scheduler/podtask/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,15 @@ func NewPodFitsResourcesPredicate(c mresource.CPUShares, m mresource.MegaBytes)
// calculate cpu and mem sum over all containers of the pod
// TODO (@sttts): also support pod.spec.resources.limit.request
// TODO (@sttts): take into account the executor resources
cpu := mresource.CPUForPod(&t.Pod, c)
mem := mresource.MemForPod(&t.Pod, m)
_, cpu, _, err := mresource.CPUForPod(&t.Pod, c)
if err != nil {
return false
}
_, mem, _, err := mresource.MemForPod(&t.Pod, m)
if err != nil {
return false
}

log.V(4).Infof("trying to match offer with pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem)
if (cpu > offeredCpus) || (mem > offeredMem) {
log.V(3).Infof("not enough resources for pod %v/%v: cpus: %.2f mem: %.2f MB", t.Pod.Namespace, t.Pod.Name, cpu, mem)
Expand Down
35 changes: 15 additions & 20 deletions contrib/mesos/pkg/scheduler/podtask/procurement.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@ import (
// k8s api.Pod.Spec's that don't declare resources (all containers in k8s-mesos require cpu
// and memory limits).
func NewDefaultProcurement(c mresource.CPUShares, m mresource.MegaBytes) Procurement {
requireSome := &RequireSomePodResources{
resourceProcurer := &RequirePodResources{
defaultContainerCPULimit: c,
defaultContainerMemLimit: m,
}
return AllOrNothingProcurement([]Procurement{
ValidateProcurement,
NodeProcurement,
requireSome.Procure,
PodResourcesProcurement,
resourceProcurer.Procure,
PortsProcurement,
}).Procure
}
Expand Down Expand Up @@ -80,37 +79,33 @@ func NodeProcurement(t *T, offer *mesos.Offer) error {
return nil
}

type RequireSomePodResources struct {
type RequirePodResources struct {
defaultContainerCPULimit mresource.CPUShares
defaultContainerMemLimit mresource.MegaBytes
}

func (r *RequireSomePodResources) Procure(t *T, offer *mesos.Offer) error {
func (r *RequirePodResources) Procure(t *T, offer *mesos.Offer) error {
// write resource limits into the pod spec which is transferred to the executor. From here
// on we can expect that the pod spec of a task has proper limits for CPU and memory.
// TODO(sttts): For a later separation of the kubelet and the executor also patch the pod on the apiserver
// TODO(sttts): fall back to requested resources if resource limit cannot be fulfilled by the offer
// TODO(jdef): changing the state of t.Pod here feels dirty, especially since we don't use a kosher
// method to clone the api.Pod state in T.Clone(). This needs some love.
if unlimitedCPU := mresource.LimitPodCPU(&t.Pod, r.defaultContainerCPULimit); unlimitedCPU {
log.V(2).Infof("Pod %s/%s without cpu limits is admitted %.2f cpu shares", t.Pod.Namespace, t.Pod.Name, mresource.PodCPULimit(&t.Pod))
_, cpuLimit, _, err := mresource.LimitPodCPU(&t.Pod, r.defaultContainerCPULimit)
if err != nil {
return err
}
if unlimitedMem := mresource.LimitPodMem(&t.Pod, r.defaultContainerMemLimit); unlimitedMem {
log.V(2).Infof("Pod %s/%s without memory limits is admitted %.2f MB", t.Pod.Namespace, t.Pod.Name, mresource.PodMemLimit(&t.Pod))

_, memLimit, _, err := mresource.LimitPodMem(&t.Pod, r.defaultContainerMemLimit)
if err != nil {
return err
}
return nil
}

// PodResourcesProcurement converts k8s pod cpu and memory resource requirements into
// mesos resource allocations.
func PodResourcesProcurement(t *T, offer *mesos.Offer) error {
// compute used resources
cpu := mresource.PodCPULimit(&t.Pod)
mem := mresource.PodMemLimit(&t.Pod)
log.V(3).Infof("Recording offer(s) %s/%s against pod %v: cpu: %.2f, mem: %.2f MB", offer.Id, t.Pod.Namespace, t.Pod.Name, cpuLimit, memLimit)

log.V(3).Infof("Recording offer(s) %s/%s against pod %v: cpu: %.2f, mem: %.2f MB", offer.Id, t.Pod.Namespace, t.Pod.Name, cpu, mem)
t.Spec.CPU = cpuLimit
t.Spec.Memory = memLimit

t.Spec.CPU = cpu
t.Spec.Memory = mem
return nil
}

Expand Down
147 changes: 93 additions & 54 deletions contrib/mesos/pkg/scheduler/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,87 +19,126 @@ package resource
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
)

const (
DefaultDefaultContainerCPULimit = CPUShares(0.25) // CPUs allocated for pods without CPU limit
DefaultDefaultContainerMemLimit = MegaBytes(64.0) // memory allocated for pods without memory limit
MinimumContainerCPU = CPUShares(0.01) // minimum CPUs allowed by Mesos
MinimumContainerMem = MegaBytes(32.0) // minimum memory allowed by Mesos
)

// CPUFromPodSpec computes the cpu shares that the pod is admitted to use. Containers
// without CPU limit are NOT taken into account.
func PodCPULimit(pod *api.Pod) CPUShares {
cpuQuantity := resourcequotacontroller.PodCPU(pod)
return CPUShares(float64(cpuQuantity.MilliValue()) / 1000.0)
}
var (
zero = *resource.NewQuantity(0, resource.BinarySI)
)

// MemFromPodSpec computes the amount of memory that the pod is admitted to use. Containers
// without memory limit are NOT taken into account.
func PodMemLimit(pod *api.Pod) MegaBytes {
memQuantity := resourcequotacontroller.PodMemory(pod)
return MegaBytes(float64(memQuantity.Value()) / 1024.0 / 1024.0)
}
// podResource computes requested resources and the limit. If write is true,
// it will also write missing requests and limits into the pod.
func podResources(pod *api.Pod, resourceName api.ResourceName, def, min resource.Quantity, write bool) (
requestSum *resource.Quantity,
limitSum *resource.Quantity,
modified bool,
err error,
) {
requestSum = (&zero).Copy()
limitSum = (&zero).Copy()
modified = false
err = nil

// limitPodResource sets the given default resource limit for each container that
// does not limit the given resource yet. limitPodResource returns true if and only if
// at least one container had no limit for that resource.
func limitPodResource(pod *api.Pod, resourceName api.ResourceName, defaultLimit resource.Quantity) bool {
unlimited := false
for j := range pod.Spec.Containers {
container := &pod.Spec.Containers[j]

// create maps
if container.Resources.Limits == nil {
container.Resources.Limits = api.ResourceList{}
}
_, ok := container.Resources.Limits[resourceName]
if !ok {
container.Resources.Limits[resourceName] = defaultLimit
unlimited = true
if container.Resources.Requests == nil {
container.Resources.Requests = api.ResourceList{}
}
}
return unlimited
}

// unlimitedPodResources counts how many containers in the pod have no limit for the given resource
func unlimitedCountainerNum(pod *api.Pod, resourceName api.ResourceName) int {
unlimited := 0
for j := range pod.Spec.Containers {
container := &pod.Spec.Containers[j]
// request and limit defined?
request, requestFound := container.Resources.Requests[resourceName]
limit, limitFound := container.Resources.Limits[resourceName]

// fill-in missing request and/or limit
if !requestFound && !limitFound {
limit = def
request = def
modified = true
} else if requestFound && !limitFound {
limit = request
modified = true
} else if !requestFound && limitFound {
// TODO(sttts): possibly use the default here?
request = limit
modified = true
}

if container.Resources.Limits == nil {
unlimited += 1
continue
// make request and limit at least as big as min
if (&request).Cmp(min) < 0 {
request = *(&min).Copy()
modified = true
}
if (&limit).Cmp(min) < 0 {
limit = *(&min).Copy()
modified = true
}

// add up the request and limit sum for all containers
err = requestSum.Add(request)
if err != nil {
return
}
err = limitSum.Add(limit)
if err != nil {
return
}

if _, ok := container.Resources.Limits[resourceName]; !ok {
unlimited += 1
// optionally write request and limit back
if write {
container.Resources.Requests[resourceName] = request
container.Resources.Limits[resourceName] = limit
}
}
return unlimited
return
}

// limitPodCPU sets DefaultContainerCPUs for the CPU limit of each container that
// does not limit its CPU resource yet. limitPodCPU returns true if and only if
// at least one container had no CPU limit set.
func LimitPodCPU(pod *api.Pod, defaultLimit CPUShares) bool {
defaultCPUQuantity := resource.NewMilliQuantity(int64(float64(defaultLimit)*1000.0), resource.DecimalSI)
return limitPodResource(pod, api.ResourceCPU, *defaultCPUQuantity)
// LimitPodCPU sets default CPU requests and limits of each container that
// does not limit its CPU resource yet. LimitPodCPU returns the new request,
// limit and whether the pod was modified.
func LimitPodCPU(pod *api.Pod, defaultLimit CPUShares) (request, limit CPUShares, modified bool, err error) {
r, l, m, err := podResources(pod, api.ResourceCPU, *defaultLimit.Quantity(), *MinimumContainerCPU.Quantity(), true)
if err != nil {
return 0.0, 0.0, false, err
}
return NewCPUShares(*r), NewCPUShares(*l), m, nil
}

// limitPodMem sets DefaultContainerMem for the memory limit of each container that
// does not limit its memory resource yet. limitPodMem returns true if and only if
// at least one container had no memory limit set.
func LimitPodMem(pod *api.Pod, defaultLimit MegaBytes) bool {
defaultMemQuantity := resource.NewQuantity(int64(float64(defaultLimit)*1024.0*1024.0), resource.BinarySI)
return limitPodResource(pod, api.ResourceMemory, *defaultMemQuantity)
// LimitPodMem sets default memory requests and limits of each container that
// does not limit its memory resource yet. LimitPodMem returns the new request,
// limit and whether the pod was modified.
func LimitPodMem(pod *api.Pod, defaultLimit MegaBytes) (request, limit MegaBytes, modified bool, err error) {
r, l, m, err := podResources(pod, api.ResourceMemory, *defaultLimit.Quantity(), *MinimumContainerMem.Quantity(), true)
if err != nil {
return 0.0, 0.0, false, err
}
return NewMegaBytes(*r), NewMegaBytes(*l), m, nil
}

// CPUForPod computes the limits from the spec plus the default CPU limit for unlimited containers
func CPUForPod(pod *api.Pod, defaultLimit CPUShares) CPUShares {
return PodCPULimit(pod) + CPUShares(unlimitedCountainerNum(pod, api.ResourceCPU))*defaultLimit
// CPUForPod computes the limits from the spec plus the default CPU limit difference for unlimited containers
func CPUForPod(pod *api.Pod, defaultLimit CPUShares) (request, limit CPUShares, modified bool, err error) {
r, l, m, err := podResources(pod, api.ResourceCPU, *defaultLimit.Quantity(), *MinimumContainerCPU.Quantity(), false)
if err != nil {
return 0.0, 0.0, false, err
}
return NewCPUShares(*r), NewCPUShares(*l), m, nil
}

// MemForPod computes the limits from the spec plus the default memory limit for unlimited containers
func MemForPod(pod *api.Pod, defaultLimit MegaBytes) MegaBytes {
return PodMemLimit(pod) + MegaBytes(unlimitedCountainerNum(pod, api.ResourceMemory))*defaultLimit
// MemForPod computes the limits from the spec plus the default memory limit difference for unlimited containers
func MemForPod(pod *api.Pod, defaultLimit MegaBytes) (request, limit MegaBytes, modified bool, err error) {
r, l, m, err := podResources(pod, api.ResourceMemory, *defaultLimit.Quantity(), *MinimumContainerMem.Quantity(), true)
if err != nil {
return 0.0, 0.0, false, err
}
return NewMegaBytes(*r), NewMegaBytes(*l), m, nil
}