From 9bbfc35fa9f0ff4ddebf74ae956ffaf8ed91fd2e Mon Sep 17 00:00:00 2001 From: Sergiusz Urbaniak Date: Tue, 20 Oct 2015 11:53:41 -0700 Subject: [PATCH 1/8] scheduler: add err check for procurement --- contrib/mesos/pkg/scheduler/plugin.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index 1c8d01eede76..37dedda7c6dc 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -341,7 +341,11 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) { } task.Offer = offer - k.api.algorithm().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here? + if err := k.api.algorithm().Procurement()(task, details); err != nil { + offer.Release() + task.Reset() + return "", err + } if err := k.api.tasks().Update(task); err != nil { offer.Release() From 0ad0c1f2b1aa7614cef7ede252cd94e3acd33d6f Mon Sep 17 00:00:00 2001 From: Sergiusz Urbaniak Date: Tue, 20 Oct 2015 14:24:20 -0700 Subject: [PATCH 2/8] scheduler: remove error param from doSchedule func doSchedule currently accepts err values from previous invocation delegating error handling in a location different from the caller which can be hard to debug and is not a good practice. We still maintain the same invariants after the refactoring. If an err happened in a previous invocation to Register, the returned task object was nil causing task.AcceptedOffer() to return false. By not invoking doSchedule in case of an error we can eliminate the first `err == nil` check in doScheduler. --- contrib/mesos/pkg/scheduler/plugin.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index 37dedda7c6dc..fbde9ca89ebb 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -278,7 +278,11 @@ func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.NodeLister) (str log.Infof("aborting Schedule, pod has been deleted %+v", pod) return "", noSuchPodErr } - return k.doSchedule(k.api.tasks().Register(k.api.createPodTask(ctx, pod))) + task, err := k.api.tasks().Register(k.api.createPodTask(ctx, pod)) + if err != nil { + return "", err + } + return k.doSchedule(task) //TODO(jdef) it's possible that the pod state has diverged from what //we knew previously, we should probably update the task.Pod state here @@ -294,7 +298,7 @@ func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.NodeLister) (str // but we're going to let someone else handle it, probably the mesos task error handler return "", fmt.Errorf("task %s has already been launched, aborting schedule", task.ID) } else { - return k.doSchedule(task, nil) + return k.doSchedule(task) } default: @@ -303,8 +307,10 @@ func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.NodeLister) (str } // Call ScheduleFunc and subtract some resources, returning the name of the machine the task is scheduled on -func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) { +func (k *kubeScheduler) doSchedule(task *podtask.T) (string, error) { var offer offers.Perishable + var err error + if task.HasAcceptedOffer() { // verify that the offer is still on the table offerId := task.GetOfferId() @@ -319,7 +325,7 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) { } } } - if err == nil && offer == nil { + if offer == nil { offer, err = k.api.algorithm().SchedulePod(k.api.offers(), k.api, task) } if err != nil { From d53b716e97108d34d6a9dc72bdbbadce8a433d5f Mon Sep 17 00:00:00 2001 From: Sergiusz Urbaniak Date: Tue, 20 Oct 2015 14:25:41 -0700 Subject: [PATCH 3/8] scheduler: add newlines for readibility in doSchedule --- contrib/mesos/pkg/scheduler/plugin.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index fbde9ca89ebb..3733033af86c 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -325,16 +325,20 @@ func (k *kubeScheduler) doSchedule(task *podtask.T) (string, error) { } } } + if offer == nil { offer, err = k.api.algorithm().SchedulePod(k.api.offers(), k.api, task) } + if err != nil { return "", err } + details := offer.Details() if details == nil { return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID) } + slaveId := details.GetSlaveId().GetValue() if slaveHostName := k.api.slaveHostNameFor(slaveId); slaveHostName == "" { // not much sense in Release()ing the offer here since its owner died From 71bd54adc619f882257cc1987941ad7d6ae32481 Mon Sep 17 00:00:00 2001 From: Sergiusz Urbaniak Date: Tue, 20 Oct 2015 14:40:24 -0700 Subject: [PATCH 4/8] scheduler: remove shadowed `offer` var in doSchedule The variable `offer` is shadowed in the if block causing the `then` branch to be non-effective. --- contrib/mesos/pkg/scheduler/plugin.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index 3733033af86c..a1bd34504349 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -313,11 +313,7 @@ func (k *kubeScheduler) doSchedule(task *podtask.T) (string, error) { if task.HasAcceptedOffer() { // verify that the offer is still on the table - offerId := task.GetOfferId() - if offer, ok := k.api.offers().Get(offerId); ok && !offer.HasExpired() { - // skip tasks that have already have assigned offers - offer = task.Offer - } else { + if offer, ok := k.api.offers().Get(task.GetOfferId()); !ok || offer.HasExpired() { task.Offer.Release() task.Reset() if err = k.api.tasks().Update(task); err != nil { From c3311744c52e040ca2e3d98107df827e0fda80bd Mon Sep 17 00:00:00 2001 From: Sergiusz Urbaniak Date: Tue, 20 Oct 2015 14:48:14 -0700 Subject: [PATCH 5/8] scheduler: correct doc in doSchedule --- contrib/mesos/pkg/scheduler/plugin.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index a1bd34504349..65968b35d73e 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -306,7 +306,8 @@ func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.NodeLister) (str } } -// Call ScheduleFunc and subtract some resources, returning the name of the machine the task is scheduled on +// doSchedule schedules the given task and returns the machine the task is scheduled on +// or an error if the scheduling failed. func (k *kubeScheduler) doSchedule(task *podtask.T) (string, error) { var offer offers.Perishable var err error From 6fced095ce4808fe115ee59a83c02fe3ae0ddc7b Mon Sep 17 00:00:00 2001 From: Sergiusz Urbaniak Date: Tue, 20 Oct 2015 15:04:36 -0700 Subject: [PATCH 6/8] scheduler: simplify if-then-else handling in doSchedule --- contrib/mesos/pkg/scheduler/plugin.go | 34 ++++++++++++++------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index 65968b35d73e..0c8b7c93a91c 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -337,29 +337,31 @@ func (k *kubeScheduler) doSchedule(task *podtask.T) (string, error) { } slaveId := details.GetSlaveId().GetValue() - if slaveHostName := k.api.slaveHostNameFor(slaveId); slaveHostName == "" { + slaveHostName := k.api.slaveHostNameFor(slaveId) + if slaveHostName == "" { // not much sense in Release()ing the offer here since its owner died offer.Release() k.api.offers().Invalidate(details.Id.GetValue()) return "", fmt.Errorf("Slave disappeared (%v) while scheduling task %v", slaveId, task.ID) - } else { - if task.Offer != nil && task.Offer != offer { - return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer) - } + } - task.Offer = offer - if err := k.api.algorithm().Procurement()(task, details); err != nil { - offer.Release() - task.Reset() - return "", err - } + if task.Offer != nil && task.Offer != offer { + return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer) + } - if err := k.api.tasks().Update(task); err != nil { - offer.Release() - return "", err - } - return slaveHostName, nil + task.Offer = offer + if err := k.api.algorithm().Procurement()(task, details); err != nil { + offer.Release() + task.Reset() + return "", err + } + + if err := k.api.tasks().Update(task); err != nil { + offer.Release() + return "", err } + + return slaveHostName, nil } type queuer struct { From 6a16fe314f72baad91804e6bddb99358e3c0f8d6 Mon Sep 17 00:00:00 2001 From: Sergiusz Urbaniak Date: Tue, 10 Nov 2015 14:08:50 +0100 Subject: [PATCH 7/8] scheduler: remove err param from Register --- contrib/mesos/pkg/scheduler/plugin.go | 9 ++++++- contrib/mesos/pkg/scheduler/plugin_test.go | 16 +++++++++-- .../mesos/pkg/scheduler/podtask/registry.go | 27 +++++++++---------- .../pkg/scheduler/podtask/registry_test.go | 16 +++++------ contrib/mesos/pkg/scheduler/scheduler.go | 4 +-- 5 files changed, 45 insertions(+), 27 deletions(-) diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index 0c8b7c93a91c..d7a6d77aa733 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -278,10 +278,17 @@ func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.NodeLister) (str log.Infof("aborting Schedule, pod has been deleted %+v", pod) return "", noSuchPodErr } - task, err := k.api.tasks().Register(k.api.createPodTask(ctx, pod)) + + task, err := k.api.createPodTask(ctx, pod) + if err != nil { + return "", err + } + + task, err = k.api.tasks().Register(task) if err != nil { return "", err } + return k.doSchedule(task) //TODO(jdef) it's possible that the pod state has diverged from what diff --git a/contrib/mesos/pkg/scheduler/plugin_test.go b/contrib/mesos/pkg/scheduler/plugin_test.go index 016a126deb5f..dbe446cb1f52 100644 --- a/contrib/mesos/pkg/scheduler/plugin_test.go +++ b/contrib/mesos/pkg/scheduler/plugin_test.go @@ -880,11 +880,17 @@ func TestDeleteOne_PendingPod(t *testing.T) { UID: "foo0", Namespace: api.NamespaceDefault, }}} - _, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{})) + + task, err := podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{}) if err != nil { t.Fatalf("failed to create task: %v", err) } + _, err = reg.Register(task) + if err != nil { + t.Fatalf("failed to register task: %v", err) + } + // preconditions qr := newQueuer(nil) qr.podQueue.Add(pod, queue.ReplaceExisting) @@ -917,7 +923,13 @@ func TestDeleteOne_Running(t *testing.T) { UID: "foo0", Namespace: api.NamespaceDefault, }}} - task, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{})) + + task, err := podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + task, err = reg.Register(task) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/contrib/mesos/pkg/scheduler/podtask/registry.go b/contrib/mesos/pkg/scheduler/podtask/registry.go index dedd359eaa52..f08e21474f40 100644 --- a/contrib/mesos/pkg/scheduler/podtask/registry.go +++ b/contrib/mesos/pkg/scheduler/podtask/registry.go @@ -41,7 +41,7 @@ const ( type Registry interface { // register the specified task with this registry, as long as the current error // condition is nil. if no errors occur then return a copy of the registered task. - Register(*T, error) (*T, error) + Register(*T) (*T, error) // unregister the specified task from this registry Unregister(*T) @@ -103,20 +103,19 @@ func (k *inMemoryRegistry) ForPod(podID string) (task *T, currentState StateType } // registers a pod task unless the spec'd error is not nil -func (k *inMemoryRegistry) Register(task *T, err error) (*T, error) { - if err == nil { - k.rw.Lock() - defer k.rw.Unlock() - if _, found := k.podToTask[task.podKey]; found { - return nil, fmt.Errorf("task already registered for pod key %q", task.podKey) - } - if _, found := k.taskRegistry[task.ID]; found { - return nil, fmt.Errorf("task already registered for id %q", task.ID) - } - k.podToTask[task.podKey] = task.ID - k.taskRegistry[task.ID] = task +func (k *inMemoryRegistry) Register(task *T) (*T, error) { + k.rw.Lock() + defer k.rw.Unlock() + if _, found := k.podToTask[task.podKey]; found { + return nil, fmt.Errorf("task already registered for pod key %q", task.podKey) + } + if _, found := k.taskRegistry[task.ID]; found { + return nil, fmt.Errorf("task already registered for id %q", task.ID) } - return task.Clone(), err + k.podToTask[task.podKey] = task.ID + k.taskRegistry[task.ID] = task + + return task.Clone(), nil } // updates internal task state. updates are limited to Spec, Flags, and Offer for diff --git a/contrib/mesos/pkg/scheduler/podtask/registry_test.go b/contrib/mesos/pkg/scheduler/podtask/registry_test.go index c3a2ef6082b0..65f318c0fe07 100644 --- a/contrib/mesos/pkg/scheduler/podtask/registry_test.go +++ b/contrib/mesos/pkg/scheduler/podtask/registry_test.go @@ -38,14 +38,14 @@ func TestInMemoryRegistry_RegisterGetUnregister(t *testing.T) { // add a task a, _ := fakePodTask("a") - a_clone, err := registry.Register(a, nil) + a_clone, err := registry.Register(a) assert.NoError(err) assert.Equal(a_clone.ID, a.ID) assert.Equal(a_clone.podKey, a.podKey) // add another task b, _ := fakePodTask("b") - b_clone, err := registry.Register(b, nil) + b_clone, err := registry.Register(b) assert.NoError(err) assert.Equal(b_clone.ID, b.ID) assert.Equal(b_clone.podKey, b.podKey) @@ -79,21 +79,21 @@ func TestInMemoryRegistry_RegisterGetUnregister(t *testing.T) { assert.Nil(task) // re-add a task - a_clone, err = registry.Register(a, nil) + a_clone, err = registry.Register(a) assert.Error(err) assert.Nil(a_clone) // re-add a task with another podKey, but same task id another_a := a.Clone() another_a.podKey = "another-pod" - another_a_clone, err := registry.Register(another_a, nil) + another_a_clone, err := registry.Register(another_a) assert.Error(err) assert.Nil(another_a_clone) // re-add a task with another task ID, but same podKey another_b := b.Clone() another_b.ID = "another-task-id" - another_b_clone, err := registry.Register(another_b, nil) + another_b_clone, err := registry.Register(another_b) assert.Error(err) assert.Nil(another_b_clone) @@ -124,7 +124,7 @@ func TestInMemoryRegistry_State(t *testing.T) { // add a task a, _ := fakePodTask("a") - a_clone, err := registry.Register(a, nil) + a_clone, err := registry.Register(a) assert.NoError(err) assert.Equal(a.State, a_clone.State) @@ -167,7 +167,7 @@ func TestInMemoryRegistry_Update(t *testing.T) { // create registry registry := NewInMemoryRegistry() a, _ := fakePodTask("a") - registry.Register(a.Clone(), nil) // here clone a because we change it below + registry.Register(a.Clone()) // here clone a because we change it below // state changes are ignored a.State = StateRunning @@ -256,7 +256,7 @@ func testStateTrace(t *testing.T, transitions []transition) *Registry { registry := NewInMemoryRegistry() a, _ := fakePodTask("a") - a, _ = registry.Register(a, nil) + a, _ = registry.Register(a) // initial pending state assert.Equal(a.State, StatePending) diff --git a/contrib/mesos/pkg/scheduler/scheduler.go b/contrib/mesos/pkg/scheduler/scheduler.go index 55e6d7bfa6d2..381db17644be 100644 --- a/contrib/mesos/pkg/scheduler/scheduler.go +++ b/contrib/mesos/pkg/scheduler/scheduler.go @@ -502,7 +502,7 @@ func (k *KubernetesScheduler) reconcileNonTerminalTask(driver bindings.Scheduler } else if pod, err := k.client.Pods(namespace).Get(name); err == nil { if t, ok, err := podtask.RecoverFrom(*pod); ok { log.Infof("recovered task %v from metadata in pod %v/%v", taskId, namespace, name) - _, err := k.taskRegistry.Register(t, nil) + _, err := k.taskRegistry.Register(t) if err != nil { // someone beat us to it?! log.Warningf("failed to register recovered task: %v", err) @@ -912,7 +912,7 @@ func (ks *KubernetesScheduler) recoverTasks() error { log.Errorf("failed to delete pod '%v/%v': %v", pod.Namespace, pod.Name, err) } } else if ok { - ks.taskRegistry.Register(t, nil) + ks.taskRegistry.Register(t) recoverSlave(t) log.Infof("recovered task %v from pod %v/%v", t.ID, pod.Namespace, pod.Name) } From 9b7e40570e5914f7e799738f9cb86e63068177c4 Mon Sep 17 00:00:00 2001 From: Sergiusz Urbaniak Date: Tue, 10 Nov 2015 20:54:58 +0100 Subject: [PATCH 8/8] scheduler: fix shadowing in doSchedule --- contrib/mesos/pkg/scheduler/plugin.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/contrib/mesos/pkg/scheduler/plugin.go b/contrib/mesos/pkg/scheduler/plugin.go index d7a6d77aa733..0bbf03f300c2 100644 --- a/contrib/mesos/pkg/scheduler/plugin.go +++ b/contrib/mesos/pkg/scheduler/plugin.go @@ -321,7 +321,10 @@ func (k *kubeScheduler) doSchedule(task *podtask.T) (string, error) { if task.HasAcceptedOffer() { // verify that the offer is still on the table - if offer, ok := k.api.offers().Get(task.GetOfferId()); !ok || offer.HasExpired() { + var ok bool + offer, ok = k.api.offers().Get(task.GetOfferId()) + + if !ok || offer.HasExpired() { task.Offer.Release() task.Reset() if err = k.api.tasks().Update(task); err != nil {