Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry job update after failure to prevent modification conflict #37077

Merged
merged 1 commit into from
Nov 19, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions test/e2e/batch_v1_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,9 @@ var _ = framework.KubeDescribe("V1Job", func() {
// the job stabilized and won't be synced until modification or full
// resync happens, we don't want to wait for the latter so we force
// sync modifying it
job.Spec.Parallelism = &completions
job, err = updateV1Job(f.ClientSet, f.Namespace.Name, job)
_, err = framework.UpdateJobWithRetries(f.ClientSet, f.Namespace.Name, job.Name, func(update *batch.Job) {
update.Spec.Parallelism = &completions
})
Expect(err).NotTo(HaveOccurred())
err = waitForV1JobFail(f.ClientSet, f.Namespace.Name, job.Name, v1JobTimeout)
}
Expand Down
1 change: 1 addition & 0 deletions test/e2e/framework/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ go_library(
"//pkg/api/validation:go_default_library",
"//pkg/apimachinery/registered:go_default_library",
"//pkg/apis/apps:go_default_library",
"//pkg/apis/batch:go_default_library",
"//pkg/apis/componentconfig:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
Expand Down
25 changes: 25 additions & 0 deletions test/e2e/framework/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/extensions"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
Expand Down Expand Up @@ -3388,6 +3389,30 @@ func UpdateStatefulSetWithRetries(c clientset.Interface, namespace, name string,
return statefulSet, pollErr
}

type updateJobFunc func(*batch.Job)

func UpdateJobWithRetries(c clientset.Interface, namespace, name string, applyUpdate updateJobFunc) (job *batch.Job, err error) {
jobs := c.Batch().Jobs(namespace)
var updateErr error
pollErr := wait.PollImmediate(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
if job, err = jobs.Get(name); err != nil {
return false, err
}
// Apply the update, then attempt to push it to the apiserver.
applyUpdate(job)
if job, err = jobs.Update(job); err == nil {
Logf("Updating job %s", name)
return true, nil
}
updateErr = err
return false, nil
})
if pollErr == wait.ErrWaitTimeout {
pollErr = fmt.Errorf("couldn't apply the provided updated to job %q: %v", name, updateErr)
}
return job, pollErr
}

// NodeAddresses returns the first address of the given type of each node.
func NodeAddresses(nodelist *api.NodeList, addrType api.NodeAddressType) []string {
hosts := []string{}
Expand Down
5 changes: 3 additions & 2 deletions test/e2e/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,9 @@ var _ = framework.KubeDescribe("Job", func() {
// the job stabilized and won't be synced until modification or full
// resync happens, we don't want to wait for the latter so we force
// sync modifying it
job.Spec.Parallelism = &completions
job, err = updateJob(f.ClientSet, f.Namespace.Name, job)
_, err = framework.UpdateJobWithRetries(f.ClientSet, f.Namespace.Name, job.Name, func(update *batch.Job) {
update.Spec.Parallelism = &completions
})
Expect(err).NotTo(HaveOccurred())
err = waitForJobFail(f.ClientSet, f.Namespace.Name, job.Name, jobTimeout)
}
Expand Down