Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job: Respect ControllerRef #42176

Merged
merged 12 commits into from
Apr 20, 2017
12 changes: 8 additions & 4 deletions pkg/client/listers/batch/internalversion/job_expansion.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ import (
// JobListerExpansion allows custom methods to be added to
// JobLister.
type JobListerExpansion interface {
// GetPodJobs returns a list of jobs managing a pod. An error is returned only
// if no matching jobs are found.
// GetPodJobs returns a list of Jobs that potentially
// match a Pod. Only the one specified in the Pod's ControllerRef
// will actually manage it.
// Returns an error only if no matching Jobs are found.
GetPodJobs(pod *api.Pod) (jobs []batch.Job, err error)
}

// GetPodJobs returns a list of jobs managing a pod. An error is returned only
// if no matching jobs are found.
// GetPodJobs returns a list of Jobs that potentially
// match a Pod. Only the one specified in the Pod's ControllerRef
// will actually manage it.
// Returns an error only if no matching Jobs are found.
func (l *jobLister) GetPodJobs(pod *api.Pod) (jobs []batch.Job, err error) {
if len(pod.Labels) == 0 {
err = fmt.Errorf("no jobs found for pod %v because it has no labels", pod.Name)
Expand Down
12 changes: 8 additions & 4 deletions pkg/client/listers/batch/v1/job_expansion.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ import (
// JobListerExpansion allows custom methods to be added to
// JobLister.
type JobListerExpansion interface {
// GetPodJobs returns a list of jobs managing a pod. An error is returned only
// if no matching jobs are found.
// GetPodJobs returns a list of Jobs that potentially
// match a Pod. Only the one specified in the Pod's ControllerRef
// will actually manage it.
// Returns an error only if no matching Jobs are found.
GetPodJobs(pod *v1.Pod) (jobs []batch.Job, err error)
}

// GetPodJobs returns a list of jobs managing a pod. An error is returned only
// if no matching jobs are found.
// GetPodJobs returns a list of Jobs that potentially
// match a Pod. Only the one specified in the Pod's ControllerRef
// will actually manage it.
// Returns an error only if no matching Jobs are found.
func (l *jobLister) GetPodJobs(pod *v1.Pod) (jobs []batch.Job, err error) {
if len(pod.Labels) == 0 {
err = fmt.Errorf("no jobs found for pod %v because it has no labels", pod.Name)
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/job/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ go_library(
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
Expand Down Expand Up @@ -59,6 +60,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
Expand Down
153 changes: 126 additions & 27 deletions pkg/controller/job/jobcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
Expand All @@ -46,6 +47,9 @@ import (
"github.com/golang/glog"
)

// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = batch.SchemeGroupVersion.WithKind("Job")

type JobController struct {
kubeClient clientset.Interface
podControl controller.PodControlInterface
Expand Down Expand Up @@ -140,18 +144,43 @@ func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
<-stopCh
}

// getPodJob returns the job managing the given pod.
func (jm *JobController) getPodJob(pod *v1.Pod) *batch.Job {
// getPodJobs returns a list of Jobs that potentially match a Pod.
func (jm *JobController) getPodJobs(pod *v1.Pod) []*batch.Job {
jobs, err := jm.jobLister.GetPodJobs(pod)
if err != nil {
glog.V(4).Infof("No jobs found for pod %v, job controller will avoid syncing", pod.Name)
return nil
}
if len(jobs) > 1 {
// ControllerRef will ensure we don't do anything crazy, but more than one
// item in this list nevertheless constitutes user error.
utilruntime.HandleError(fmt.Errorf("user error! more than one job is selecting pods with labels: %+v", pod.Labels))
sort.Sort(byCreationTimestamp(jobs))
}
return &jobs[0]
ret := make([]*batch.Job, 0, len(jobs))
for i := range jobs {
ret = append(ret, &jobs[i])
}
return ret
}

// resolveControllerRef returns the controller referenced by a ControllerRef,
// or nil if the ControllerRef could not be resolved to a matching controller
// of the corrrect Kind.
func (jm *JobController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *batch.Job {
// We can't look up by UID, so look up by Name and then verify UID.
// Don't even try to look up by Name if it's the wrong Kind.
if controllerRef.Kind != controllerKind.Kind {
return nil
}
job, err := jm.jobLister.Jobs(namespace).Get(controllerRef.Name)
if err != nil {
return nil
}
if job.UID != controllerRef.UID {
// The controller we found with this Name is not the same one that the
// ControllerRef points to.
return nil
}
return job
}

// When a pod is created, enqueue the controller that manages it and update it's expectations.
Expand All @@ -163,14 +192,28 @@ func (jm *JobController) addPod(obj interface{}) {
jm.deletePod(pod)
return
}
if job := jm.getPodJob(pod); job != nil {

// If it has a ControllerRef, that's all that matters.
if controllerRef := controller.GetControllerOf(pod); controllerRef != nil {
job := jm.resolveControllerRef(pod.Namespace, controllerRef)
if job == nil {
return
}
jobKey, err := controller.KeyFunc(job)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a comment on another ControllerRef PR, the reviewer asked me to remove this type of logging during watch handlers.

return
}
jm.expectations.CreationObserved(jobKey)
jm.enqueueController(job)
return
}

// Otherwise, it's an orphan. Get a list of all matching controllers and sync
// them to see if anyone wants to adopt it.
// DO NOT observe creation because no controller should be waiting for an
// orphan.
for _, job := range jm.getPodJobs(pod) {
jm.enqueueController(job)
}
}

Expand All @@ -193,15 +236,34 @@ func (jm *JobController) updatePod(old, cur interface{}) {
jm.deletePod(curPod)
return
}
if job := jm.getPodJob(curPod); job != nil {

labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)

curControllerRef := controller.GetControllerOf(curPod)
oldControllerRef := controller.GetControllerOf(oldPod)
controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any.
if job := jm.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil {
jm.enqueueController(job)
}
}

// If it has a ControllerRef, that's all that matters.
if curControllerRef != nil {
job := jm.resolveControllerRef(curPod.Namespace, curControllerRef)
if job == nil {
return
}
jm.enqueueController(job)
return
}
// Only need to get the old job if the labels changed.
if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) {
// If the old and new job are the same, the first one that syncs
// will set expectations preventing any damage from the second.
if oldJob := jm.getPodJob(oldPod); oldJob != nil {
jm.enqueueController(oldJob)

// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
if labelChanged || controllerRefChanged {
for _, job := range jm.getPodJobs(curPod) {
jm.enqueueController(job)
}
}
}
Expand All @@ -218,24 +280,31 @@ func (jm *JobController) deletePod(obj interface{}) {
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %+v", obj))
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %+v", obj))
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj))
return
}
}
if job := jm.getPodJob(pod); job != nil {
jobKey, err := controller.KeyFunc(job)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
return
}
jm.expectations.DeletionObserved(jobKey)
jm.enqueueController(job)

controllerRef := controller.GetControllerOf(pod)
if controllerRef == nil {
// No controller should care about orphans being deleted.
return
}
job := jm.resolveControllerRef(pod.Namespace, controllerRef)
if job == nil {
return
}
jobKey, err := controller.KeyFunc(job)
if err != nil {
return
}
jm.expectations.DeletionObserved(jobKey)
jm.enqueueController(job)
}

// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
Expand Down Expand Up @@ -281,6 +350,36 @@ func (jm *JobController) processNextWorkItem() bool {
return true
}

// getPodsForJob returns the set of pods that this Job should manage.
// It also reconciles ControllerRef by adopting/orphaning.
// Note that the returned Pods are pointers into the cache.
func (jm *JobController) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) {
selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("couldn't convert Job selector: %v", err)
}
// List all pods to include those that don't match the selector anymore
// but have a ControllerRef pointing to this controller.
pods, err := jm.podStore.Pods(j.Namespace).List(labels.Everything())
if err != nil {
return nil, err
}
// If any adoptions are attempted, we should first recheck for deletion
// with an uncached quorum read sometime after listing Pods (see #42639).
canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
fresh, err := jm.kubeClient.BatchV1().Jobs(j.Namespace).Get(j.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
if fresh.UID != j.UID {
return nil, fmt.Errorf("original Job %v/%v is gone: got uid %v, wanted %v", j.Namespace, j.Name, fresh.UID, j.UID)
}
return fresh, nil
})
cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind, canAdoptFunc)
return cm.ClaimPods(pods)
}

// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
Expand Down Expand Up @@ -312,8 +411,8 @@ func (jm *JobController) syncJob(key string) error {
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
// the store after we've checked the expectation, the job sync is just deferred till the next relist.
jobNeedsSync := jm.expectations.SatisfiedExpectations(key)
selector, _ := metav1.LabelSelectorAsSelector(job.Spec.Selector)
pods, err := jm.podStore.Pods(job.Namespace).List(selector)

pods, err := jm.getPodsForJob(&job)
if err != nil {
return err
}
Expand Down Expand Up @@ -507,7 +606,7 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b
for i := int32(0); i < diff; i++ {
go func() {
defer wait.Done()
if err := jm.podControl.CreatePods(job.Namespace, &job.Spec.Template, job); err != nil {
if err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, newControllerRef(job)); err != nil {
defer utilruntime.HandleError(err)
// Decrement the expected number of creates because the informer won't observe this pod
jm.expectations.CreationObserved(jobKey)
Expand Down