Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MESOS: check for errors during resource allocation in the scheduler plugin #15976

67 changes: 45 additions & 22 deletions contrib/mesos/pkg/scheduler/plugin.go
Expand Up @@ -278,7 +278,18 @@ func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.NodeLister) (str
log.Infof("aborting Schedule, pod has been deleted %+v", pod)
return "", noSuchPodErr
}
return k.doSchedule(k.api.tasks().Register(k.api.createPodTask(ctx, pod)))

task, err := k.api.createPodTask(ctx, pod)
if err != nil {
return "", err
}

task, err = k.api.tasks().Register(task)
if err != nil {
return "", err
}

return k.doSchedule(task)

//TODO(jdef) it's possible that the pod state has diverged from what
//we knew previously, we should probably update the task.Pod state here
Expand All @@ -294,61 +305,73 @@ func (k *kubeScheduler) Schedule(pod *api.Pod, unused algorithm.NodeLister) (str
// but we're going to let someone else handle it, probably the mesos task error handler
return "", fmt.Errorf("task %s has already been launched, aborting schedule", task.ID)
} else {
return k.doSchedule(task, nil)
return k.doSchedule(task)
}

default:
return "", fmt.Errorf("task %s is not pending, nothing to schedule", task.ID)
}
}

// Call ScheduleFunc and subtract some resources, returning the name of the machine the task is scheduled on
func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) {
// doSchedule schedules the given task and returns the machine the task is scheduled on
// or an error if the scheduling failed.
func (k *kubeScheduler) doSchedule(task *podtask.T) (string, error) {
var offer offers.Perishable
var err error

if task.HasAcceptedOffer() {
// verify that the offer is still on the table
offerId := task.GetOfferId()
if offer, ok := k.api.offers().Get(offerId); ok && !offer.HasExpired() {
// skip tasks that have already have assigned offers
offer = task.Offer
} else {
var ok bool
offer, ok = k.api.offers().Get(task.GetOfferId())

if !ok || offer.HasExpired() {
task.Offer.Release()
task.Reset()
if err = k.api.tasks().Update(task); err != nil {
return "", err
}
}
}
if err == nil && offer == nil {

if offer == nil {
offer, err = k.api.algorithm().SchedulePod(k.api.offers(), k.api, task)
}

if err != nil {
return "", err
}

details := offer.Details()
if details == nil {
return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID)
}

slaveId := details.GetSlaveId().GetValue()
if slaveHostName := k.api.slaveHostNameFor(slaveId); slaveHostName == "" {
slaveHostName := k.api.slaveHostNameFor(slaveId)
if slaveHostName == "" {
// not much sense in Release()ing the offer here since its owner died
offer.Release()
k.api.offers().Invalidate(details.Id.GetValue())
return "", fmt.Errorf("Slave disappeared (%v) while scheduling task %v", slaveId, task.ID)
} else {
if task.Offer != nil && task.Offer != offer {
return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer)
}
}

task.Offer = offer
k.api.algorithm().Procurement()(task, details) // TODO(jdef) why is nothing checking the error returned here?
if task.Offer != nil && task.Offer != offer {
return "", fmt.Errorf("task.offer assignment must be idempotent, task %+v: offer %+v", task, offer)
}

if err := k.api.tasks().Update(task); err != nil {
offer.Release()
return "", err
}
return slaveHostName, nil
task.Offer = offer
if err := k.api.algorithm().Procurement()(task, details); err != nil {
offer.Release()
task.Reset()
return "", err
}

if err := k.api.tasks().Update(task); err != nil {
offer.Release()
return "", err
}

return slaveHostName, nil
}

type queuer struct {
Expand Down
16 changes: 14 additions & 2 deletions contrib/mesos/pkg/scheduler/plugin_test.go
Expand Up @@ -880,11 +880,17 @@ func TestDeleteOne_PendingPod(t *testing.T) {
UID: "foo0",
Namespace: api.NamespaceDefault,
}}}
_, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{}))

task, err := podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{})
if err != nil {
t.Fatalf("failed to create task: %v", err)
}

_, err = reg.Register(task)
if err != nil {
t.Fatalf("failed to register task: %v", err)
}

// preconditions
qr := newQueuer(nil)
qr.podQueue.Add(pod, queue.ReplaceExisting)
Expand Down Expand Up @@ -917,7 +923,13 @@ func TestDeleteOne_Running(t *testing.T) {
UID: "foo0",
Namespace: api.NamespaceDefault,
}}}
task, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{}))

task, err := podtask.New(api.NewDefaultContext(), "bar", *pod.Pod, &mesos.ExecutorInfo{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

task, err = reg.Register(task)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand Down
27 changes: 13 additions & 14 deletions contrib/mesos/pkg/scheduler/podtask/registry.go
Expand Up @@ -41,7 +41,7 @@ const (
type Registry interface {
// register the specified task with this registry, as long as the current error
// condition is nil. if no errors occur then return a copy of the registered task.
Register(*T, error) (*T, error)
Register(*T) (*T, error)

// unregister the specified task from this registry
Unregister(*T)
Expand Down Expand Up @@ -103,20 +103,19 @@ func (k *inMemoryRegistry) ForPod(podID string) (task *T, currentState StateType
}

// registers a pod task unless the spec'd error is not nil
func (k *inMemoryRegistry) Register(task *T, err error) (*T, error) {
if err == nil {
k.rw.Lock()
defer k.rw.Unlock()
if _, found := k.podToTask[task.podKey]; found {
return nil, fmt.Errorf("task already registered for pod key %q", task.podKey)
}
if _, found := k.taskRegistry[task.ID]; found {
return nil, fmt.Errorf("task already registered for id %q", task.ID)
}
k.podToTask[task.podKey] = task.ID
k.taskRegistry[task.ID] = task
func (k *inMemoryRegistry) Register(task *T) (*T, error) {
k.rw.Lock()
defer k.rw.Unlock()
if _, found := k.podToTask[task.podKey]; found {
return nil, fmt.Errorf("task already registered for pod key %q", task.podKey)
}
if _, found := k.taskRegistry[task.ID]; found {
return nil, fmt.Errorf("task already registered for id %q", task.ID)
}
return task.Clone(), err
k.podToTask[task.podKey] = task.ID
k.taskRegistry[task.ID] = task

return task.Clone(), nil
}

// updates internal task state. updates are limited to Spec, Flags, and Offer for
Expand Down
16 changes: 8 additions & 8 deletions contrib/mesos/pkg/scheduler/podtask/registry_test.go
Expand Up @@ -38,14 +38,14 @@ func TestInMemoryRegistry_RegisterGetUnregister(t *testing.T) {

// add a task
a, _ := fakePodTask("a")
a_clone, err := registry.Register(a, nil)
a_clone, err := registry.Register(a)
assert.NoError(err)
assert.Equal(a_clone.ID, a.ID)
assert.Equal(a_clone.podKey, a.podKey)

// add another task
b, _ := fakePodTask("b")
b_clone, err := registry.Register(b, nil)
b_clone, err := registry.Register(b)
assert.NoError(err)
assert.Equal(b_clone.ID, b.ID)
assert.Equal(b_clone.podKey, b.podKey)
Expand Down Expand Up @@ -79,21 +79,21 @@ func TestInMemoryRegistry_RegisterGetUnregister(t *testing.T) {
assert.Nil(task)

// re-add a task
a_clone, err = registry.Register(a, nil)
a_clone, err = registry.Register(a)
assert.Error(err)
assert.Nil(a_clone)

// re-add a task with another podKey, but same task id
another_a := a.Clone()
another_a.podKey = "another-pod"
another_a_clone, err := registry.Register(another_a, nil)
another_a_clone, err := registry.Register(another_a)
assert.Error(err)
assert.Nil(another_a_clone)

// re-add a task with another task ID, but same podKey
another_b := b.Clone()
another_b.ID = "another-task-id"
another_b_clone, err := registry.Register(another_b, nil)
another_b_clone, err := registry.Register(another_b)
assert.Error(err)
assert.Nil(another_b_clone)

Expand Down Expand Up @@ -124,7 +124,7 @@ func TestInMemoryRegistry_State(t *testing.T) {

// add a task
a, _ := fakePodTask("a")
a_clone, err := registry.Register(a, nil)
a_clone, err := registry.Register(a)
assert.NoError(err)
assert.Equal(a.State, a_clone.State)

Expand Down Expand Up @@ -167,7 +167,7 @@ func TestInMemoryRegistry_Update(t *testing.T) {
// create registry
registry := NewInMemoryRegistry()
a, _ := fakePodTask("a")
registry.Register(a.Clone(), nil) // here clone a because we change it below
registry.Register(a.Clone()) // here clone a because we change it below

// state changes are ignored
a.State = StateRunning
Expand Down Expand Up @@ -256,7 +256,7 @@ func testStateTrace(t *testing.T, transitions []transition) *Registry {

registry := NewInMemoryRegistry()
a, _ := fakePodTask("a")
a, _ = registry.Register(a, nil)
a, _ = registry.Register(a)

// initial pending state
assert.Equal(a.State, StatePending)
Expand Down
4 changes: 2 additions & 2 deletions contrib/mesos/pkg/scheduler/scheduler.go
Expand Up @@ -502,7 +502,7 @@ func (k *KubernetesScheduler) reconcileNonTerminalTask(driver bindings.Scheduler
} else if pod, err := k.client.Pods(namespace).Get(name); err == nil {
if t, ok, err := podtask.RecoverFrom(*pod); ok {
log.Infof("recovered task %v from metadata in pod %v/%v", taskId, namespace, name)
_, err := k.taskRegistry.Register(t, nil)
_, err := k.taskRegistry.Register(t)
if err != nil {
// someone beat us to it?!
log.Warningf("failed to register recovered task: %v", err)
Expand Down Expand Up @@ -912,7 +912,7 @@ func (ks *KubernetesScheduler) recoverTasks() error {
log.Errorf("failed to delete pod '%v/%v': %v", pod.Namespace, pod.Name, err)
}
} else if ok {
ks.taskRegistry.Register(t, nil)
ks.taskRegistry.Register(t)
recoverSlave(t)
log.Infof("recovered task %v from pod %v/%v", t.ID, pod.Namespace, pod.Name)
}
Expand Down