Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MESOS: host port endpoints refactor #21276

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion contrib/mesos/docs/issues.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ Host ports that are not defined, or else defined as zero, will automatically be

To disable the work-around and revert to vanilla Kubernetes service endpoint termination:

- execute the k8sm controller-manager with `-host_port_endpoints=false`;
- execute the k8sm scheduler with `-host-port-endpoints=false`
- execute the k8sm controller-manager with `-host-port-endpoints=false`

Then the usual Kubernetes network assumptions must be fulfilled for Kubernetes to work with Mesos, i.e. each container must get a cluster-wide routable IP (compare [Kubernetes Networking documentation](../../../docs/design/networking.md#container-to-container)).

Expand Down
2 changes: 1 addition & 1 deletion contrib/mesos/pkg/controllermanager/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func NewCMServer() *CMServer {
// AddFlags adds flags for a specific CMServer to the specified FlagSet
func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
s.CMServer.AddFlags(fs)
fs.BoolVar(&s.UseHostPortEndpoints, "host_port_endpoints", s.UseHostPortEndpoints, "Map service endpoints to hostIP:hostPort instead of podIP:containerPort. Default true.")
fs.BoolVar(&s.UseHostPortEndpoints, "host-port-endpoints", s.UseHostPortEndpoints, "Map service endpoints to hostIP:hostPort instead of podIP:containerPort. Default true.")
}

func (s *CMServer) resyncPeriod() time.Duration {
Expand Down
18 changes: 10 additions & 8 deletions contrib/mesos/pkg/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/podutil"
kmruntime "k8s.io/kubernetes/contrib/mesos/pkg/runtime"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask/hostport"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
Expand Down Expand Up @@ -197,11 +198,11 @@ func TestExecutorLaunchAndKillTask(t *testing.T) {

podTask, err := podtask.New(
api.NewDefaultContext(),
"",
podtask.Config{
Prototype: executorinfo,
HostPortStrategy: hostport.StrategyWildcard,
},
pod,
executorinfo,
nil,
nil,
)
assert.Equal(t, nil, err, "must be able to create a task from a pod")

Expand Down Expand Up @@ -407,11 +408,12 @@ func TestExecutorFrameworkMessage(t *testing.T) {
executorinfo := &mesosproto.ExecutorInfo{}
podTask, _ := podtask.New(
api.NewDefaultContext(),
"foo",
podtask.Config{
ID: "foo",
Prototype: executorinfo,
HostPortStrategy: hostport.StrategyWildcard,
},
pod,
executorinfo,
nil,
nil,
)
pod.Annotations = map[string]string{
"k8s.mesosphere.io/taskId": podTask.ID,
Expand Down
44 changes: 19 additions & 25 deletions contrib/mesos/pkg/scheduler/components/algorithm/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ import (
"fmt"

log "github.com/golang/glog"
"github.com/mesos/mesos-go/mesosproto"
"k8s.io/kubernetes/contrib/mesos/pkg/offers"
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/algorithm/podschedulers"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
)
Expand All @@ -42,14 +41,12 @@ type SchedulerAlgorithm interface {

// SchedulerAlgorithm implements the algorithm.ScheduleAlgorithm interface
type schedulerAlgorithm struct {
sched scheduler.Scheduler
podUpdates queue.FIFO
podScheduler podschedulers.PodScheduler
prototype *mesosproto.ExecutorInfo
frameworkRoles []string
defaultPodRoles []string
defaultCpus mresource.CPUShares
defaultMem mresource.MegaBytes
sched scheduler.Scheduler
podUpdates queue.FIFO
podScheduler podschedulers.PodScheduler
taskConfig podtask.Config
defaultCpus resources.CPUShares
defaultMem resources.MegaBytes
}

// New returns a new SchedulerAlgorithm
Expand All @@ -58,20 +55,17 @@ func New(
sched scheduler.Scheduler,
podUpdates queue.FIFO,
podScheduler podschedulers.PodScheduler,
prototype *mesosproto.ExecutorInfo,
frameworkRoles, defaultPodRoles []string,
defaultCpus mresource.CPUShares,
defaultMem mresource.MegaBytes,
taskConfig podtask.Config,
defaultCpus resources.CPUShares,
defaultMem resources.MegaBytes,
) SchedulerAlgorithm {
return &schedulerAlgorithm{
sched: sched,
podUpdates: podUpdates,
podScheduler: podScheduler,
frameworkRoles: frameworkRoles,
defaultPodRoles: defaultPodRoles,
prototype: prototype,
defaultCpus: defaultCpus,
defaultMem: defaultMem,
sched: sched,
podUpdates: podUpdates,
podScheduler: podScheduler,
taskConfig: taskConfig,
defaultCpus: defaultCpus,
defaultMem: defaultMem,
}
}

Expand Down Expand Up @@ -109,7 +103,7 @@ func (k *schedulerAlgorithm) Schedule(pod *api.Pod) (string, error) {
// From here on we can expect that the pod spec of a task has proper limits for CPU and memory.
k.limitPod(pod)

podTask, err := podtask.New(ctx, "", pod, k.prototype, k.frameworkRoles, k.defaultPodRoles)
podTask, err := podtask.New(ctx, k.taskConfig, pod)
if err != nil {
log.Warningf("aborting Schedule, unable to create podtask object %+v: %v", pod, err)
return "", err
Expand Down Expand Up @@ -146,12 +140,12 @@ func (k *schedulerAlgorithm) Schedule(pod *api.Pod) (string, error) {

// limitPod limits the given pod based on the scheduler's default limits.
func (k *schedulerAlgorithm) limitPod(pod *api.Pod) error {
cpuRequest, cpuLimit, _, err := mresource.LimitPodCPU(pod, k.defaultCpus)
cpuRequest, cpuLimit, _, err := resources.LimitPodCPU(pod, k.defaultCpus)
if err != nil {
return err
}

memRequest, memLimit, _, err := mresource.LimitPodMem(pod, k.defaultMem)
memRequest, memLimit, _, err := resources.LimitPodMem(pod, k.defaultMem)
if err != nil {
return err
}
Expand Down
19 changes: 11 additions & 8 deletions contrib/mesos/pkg/scheduler/components/deleter/deleter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask/hostport"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
"k8s.io/kubernetes/pkg/api"
)
Expand Down Expand Up @@ -63,11 +64,12 @@ func TestDeleteOne_PendingPod(t *testing.T) {
}}}
task, err := podtask.New(
api.NewDefaultContext(),
"bar",
podtask.Config{
ID: "bar",
Prototype: &mesosproto.ExecutorInfo{},
HostPortStrategy: hostport.StrategyWildcard,
},
pod.Pod,
&mesosproto.ExecutorInfo{},
nil,
nil,
)
if err != nil {
t.Fatalf("failed to create task: %v", err)
Expand Down Expand Up @@ -110,11 +112,12 @@ func TestDeleteOne_Running(t *testing.T) {
}}}
task, err := podtask.New(
api.NewDefaultContext(),
"bar",
podtask.Config{
ID: "bar",
Prototype: &mesosproto.ExecutorInfo{},
HostPortStrategy: hostport.StrategyWildcard,
},
pod.Pod,
&mesosproto.ExecutorInfo{},
nil,
nil,
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
Expand Down
11 changes: 5 additions & 6 deletions contrib/mesos/pkg/scheduler/components/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
Expand All @@ -65,10 +65,9 @@ func New(
terminate <-chan struct{},
mux *http.ServeMux,
lw *cache.ListWatch,
prototype *mesos.ExecutorInfo,
frameworkRoles, defaultPodRoles []string,
defaultCpus mresource.CPUShares,
defaultMem mresource.MegaBytes,
taskConfig podtask.Config,
defaultCpus resources.CPUShares,
defaultMem resources.MegaBytes,
) scheduler.Scheduler {
core := &sched{
framework: fw,
Expand All @@ -82,7 +81,7 @@ func New(

q := queuer.New(queue.NewDelayFIFO(), podUpdates)

algorithm := algorithm.New(core, podUpdates, ps, prototype, frameworkRoles, defaultPodRoles, defaultCpus, defaultMem)
algorithm := algorithm.New(core, podUpdates, ps, taskConfig, defaultCpus, defaultMem)

podDeleter := deleter.New(core, q)

Expand Down
16 changes: 10 additions & 6 deletions contrib/mesos/pkg/scheduler/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
mresource "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resource"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask/hostport"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/resources"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
Expand Down Expand Up @@ -524,11 +525,14 @@ func newLifecycleTest(t *testing.T) lifecycleTest {
schedulerProc.Terminal(),
http.DefaultServeMux,
&podsListWatch.ListWatch,
ei,
[]string{"*"},
[]string{"*"},
mresource.DefaultDefaultContainerCPULimit,
mresource.DefaultDefaultContainerMemLimit,
podtask.Config{
Prototype: ei,
FrameworkRoles: []string{"*"},
DefaultPodRoles: []string{"*"},
HostPortStrategy: hostport.StrategyWildcard,
},
resources.DefaultDefaultContainerCPULimit,
resources.DefaultDefaultContainerMemLimit,
)
assert.NotNil(scheduler)

Expand Down