Skip to content

Commit

Permalink
cleanup old jobs
Browse files Browse the repository at this point in the history
Signed-off-by: Ankita Thomas <ankithom@redhat.com>
Upstream-repository: operator-lifecycle-manager
Upstream-commit: 4fc64d230962fa9cd69a1dc2bc3e4244af452e13
  • Loading branch information
ankitathomas committed Feb 8, 2024
1 parent 8c3a7b9 commit ec74cef
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/sha256"
"fmt"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -657,47 +658,35 @@ func (c *ConfigMapUnpacker) ensureConfigmap(csRef *corev1.ObjectReference, name

func (c *ConfigMapUnpacker) ensureJob(cmRef *corev1.ObjectReference, bundlePath string, secrets []corev1.LocalObjectReference, timeout time.Duration, unpackRetryInterval time.Duration) (job *batchv1.Job, err error) {
fresh := c.job(cmRef, bundlePath, secrets, timeout)
job, err = c.jobLister.Jobs(fresh.GetNamespace()).Get(fresh.GetName())
var jobs, toDelete []*batchv1.Job
jobs, err = c.jobLister.Jobs(fresh.GetNamespace()).List(k8slabels.ValidatedSetSelector{bundleUnpackRefLabel: cmRef.Name})
if err != nil {
if apierrors.IsNotFound(err) {
job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{})
}

return
}
if len(jobs) == 0 {
job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{})
return
}

maxRetainedJobs := 5 // TODO: make this configurable
job, toDelete = sortUnpackJobs(jobs, maxRetainedJobs) // choose latest or on-failed job attempt

// only check for retries if an unpackRetryInterval is specified
if unpackRetryInterval > 0 {
if failedCond, isFailed := getCondition(job, batchv1.JobFailed); isFailed {
lastFailureTime := failedCond.LastTransitionTime.Time
if _, isFailed := getCondition(job, batchv1.JobFailed); isFailed {
// Look for other unpack jobs for the same bundle
var jobs []*batchv1.Job
jobs, err = c.jobLister.Jobs(fresh.GetNamespace()).List(k8slabels.ValidatedSetSelector{bundleUnpackRefLabel: cmRef.Name})
if err != nil {
return
}

var failed bool
var cond *batchv1.JobCondition
for _, j := range jobs {
cond, failed = getCondition(j, batchv1.JobFailed)
if !failed {
// found an in-progress unpack attempt
job = j
break
}
if cond != nil && lastFailureTime.Before(cond.LastTransitionTime.Time) {
lastFailureTime = cond.LastTransitionTime.Time
}
}

if failed {
if time.Now().After(lastFailureTime.Add(unpackRetryInterval)) {
if cond, failed := getCondition(job, batchv1.JobFailed); failed {
if time.Now().After(cond.LastTransitionTime.Time.Add(unpackRetryInterval)) {
fresh.SetName(names.SimpleNameGenerator.GenerateName(fresh.GetName()))
job, err = c.client.BatchV1().Jobs(fresh.GetNamespace()).Create(context.TODO(), fresh, metav1.CreateOptions{})
}
return
}

// cleanup old failed jobs, but don't clean up successful jobs to avoid repeat unpacking
for _, j := range toDelete {
_ = c.client.BatchV1().Jobs(j.GetNamespace()).Delete(context.TODO(), j.GetName(), metav1.DeleteOptions{})
}
return
}
}

Expand Down Expand Up @@ -840,6 +829,37 @@ func getCondition(job *batchv1.Job, conditionType batchv1.JobConditionType) (con
return
}

func sortUnpackJobs(jobs []*batchv1.Job, maxRetainedJobs int) (latest *batchv1.Job, toDelete []*batchv1.Job) {
if len(jobs) == 0 {
return
}
// sort jobs so that latest job is first
// with preference for non-failed jobs
sort.Slice(jobs, func(i, j int) bool {
condI, failedI := getCondition(jobs[i], batchv1.JobFailed)
condJ, failedJ := getCondition(jobs[j], batchv1.JobFailed)
if failedI != failedJ {
return !failedI // non-failed job goes first
}
return condI.LastTransitionTime.After(condJ.LastTransitionTime.Time)
})
latest = jobs[0]
if len(jobs) <= maxRetainedJobs {
return
}
if maxRetainedJobs == 0 {
toDelete = jobs[1:]
return
}

// cleanup old failed jobs, n-1 recent jobs and the oldest job
for i := 0; i < maxRetainedJobs && i+maxRetainedJobs < len(jobs); i++ {
toDelete = append(toDelete, jobs[maxRetainedJobs+i])
}

return
}

// OperatorGroupBundleUnpackTimeout returns bundle timeout from annotation if specified.
// If the timeout annotation is not set, return timeout < 0 which is subsequently ignored.
// This is to overrides the --bundle-unpack-timeout flag value on per-OperatorGroup basis.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ func TestConfigMapUnpacker(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: digestHash,
Namespace: "ns-a",
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: digestHash},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Expand Down Expand Up @@ -706,6 +707,7 @@ func TestConfigMapUnpacker(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: digestHash,
Namespace: "ns-a",
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: digestHash},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Expand Down Expand Up @@ -968,6 +970,7 @@ func TestConfigMapUnpacker(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: pathHash,
Namespace: "ns-a",
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: pathHash},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Expand Down Expand Up @@ -1200,6 +1203,7 @@ func TestConfigMapUnpacker(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: pathHash,
Namespace: "ns-a",
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: pathHash},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Expand Down Expand Up @@ -1443,6 +1447,7 @@ func TestConfigMapUnpacker(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: pathHash,
Namespace: "ns-a",
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: pathHash},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Expand Down Expand Up @@ -1933,3 +1938,101 @@ func TestOperatorGroupBundleUnpackRetryInterval(t *testing.T) {
})
}
}

func TestSortUnpackJobs(t *testing.T) {
// if there is a non-failed job, it should be first
// otherwise, the latest job should be first
//first n-1 jobs and oldest job are preserved
testJob := func(name string, failed bool, ts int64) *batchv1.Job {
conditions := []batchv1.JobCondition{}
if failed {
conditions = append(conditions, batchv1.JobCondition{
Type: batchv1.JobFailed,
Status: corev1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: time.Unix(ts, 0)},
})
}
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue, bundleUnpackRefLabel: "test"},
},
Status: batchv1.JobStatus{
Conditions: conditions,
},
}
}
failedJobs := []*batchv1.Job{
testJob("f-1", true, 1),
testJob("f-2", true, 2),
testJob("f-3", true, 3),
testJob("f-4", true, 4),
testJob("f-5", true, 5),
}
nonFailedJob := testJob("s-1", false, 1)
for _, tc := range []struct {
name string
jobs []*batchv1.Job
maxRetained int
expectedLatest *batchv1.Job
expectedToDelete []*batchv1.Job
}{
{
name: "no job history",
maxRetained: 0,
jobs: []*batchv1.Job{
failedJobs[1],
failedJobs[2],
failedJobs[0],
},
expectedLatest: failedJobs[2],
expectedToDelete: []*batchv1.Job{
failedJobs[1],
failedJobs[0],
},
}, {
name: "empty job list",
maxRetained: 1,
}, {
name: "retain oldest",
maxRetained: 1,
jobs: []*batchv1.Job{
failedJobs[2],
failedJobs[0],
failedJobs[1],
},
expectedToDelete: []*batchv1.Job{
failedJobs[1],
},
expectedLatest: failedJobs[2],
}, {
name: "multiple old jobs",
maxRetained: 2,
jobs: []*batchv1.Job{
failedJobs[1],
failedJobs[0],
failedJobs[2],
failedJobs[3],
failedJobs[4],
},
expectedLatest: failedJobs[4],
expectedToDelete: []*batchv1.Job{
failedJobs[1],
failedJobs[2],
},
}, {
name: "select non-failed as latest",
maxRetained: 3,
jobs: []*batchv1.Job{
failedJobs[0],
failedJobs[1],
nonFailedJob,
},
expectedLatest: nonFailedJob,
},
} {
latest, toDelete := sortUnpackJobs(tc.jobs, tc.maxRetained)
assert.Equal(t, tc.expectedLatest, latest)
assert.ElementsMatch(t, tc.expectedToDelete, toDelete)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1510,7 +1510,7 @@ type UnpackedBundleReference struct {
Properties string `json:"properties"`
}

func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.Step, bundleLookups []v1alpha1.BundleLookup, unpackTimeout, minUnpackRetryInterval time.Duration) (bool, []*v1alpha1.Step, []v1alpha1.BundleLookup, error) {
func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.Step, bundleLookups []v1alpha1.BundleLookup, unpackTimeout, unpackRetryInterval time.Duration) (bool, []*v1alpha1.Step, []v1alpha1.BundleLookup, error) {
unpacked := true

outBundleLookups := make([]v1alpha1.BundleLookup, len(bundleLookups))
Expand All @@ -1525,7 +1525,7 @@ func (o *Operator) unpackBundles(namespace string, installPlanSteps []*v1alpha1.
var errs []error
for i := 0; i < len(outBundleLookups); i++ {
lookup := outBundleLookups[i]
res, err := o.bundleUnpacker.UnpackBundle(&lookup, unpackTimeout, minUnpackRetryInterval)
res, err := o.bundleUnpacker.UnpackBundle(&lookup, unpackTimeout, unpackRetryInterval)
if err != nil {
errs = append(errs, err)
continue
Expand Down
1 change: 0 additions & 1 deletion staging/operator-lifecycle-manager/test/e2e/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func createDockerRegistry(client operatorclient.ClientInterface, namespace strin
Port: int32(5000),
},
},
Type: corev1.ServiceTypeNodePort,
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2524,6 +2524,11 @@ var _ = Describe("Subscription", func() {
})
When("bundle unpack retries are enabled", func() {
It("should retry failing unpack jobs", func() {
if ok, err := inKind(c); ok && err == nil {
Skip("This spec fails when run using KIND cluster. See https://github.com/operator-framework/operator-lifecycle-manager/issues/2420 for more details")
} else if err != nil {
Skip("Could not determine whether running in a kind cluster. Skipping.")
}
By("Ensuring a registry to host bundle images")
local, err := Local(c)
Expect(err).NotTo(HaveOccurred(), "cannot determine if test running locally or on CI: %s", err)
Expand Down Expand Up @@ -2579,20 +2584,14 @@ var _ = Describe("Subscription", func() {
}
}

// testImage is the name of the image used throughout the test - the image overwritten by skopeo
// the tag is generated randomly and appended to the end of the testImage
// The remote image to be copied onto the local registry
srcImage := "quay.io/olmtest/example-operator-bundle:"
srcTag := "0.1.0"
bundleImage := fmt.Sprint(registryURL, "/unpack-retry-bundle", ":")

// on-cluster image ref
bundleImage := registryURL + "/unpack-retry-bundle:"
bundleTag := genName("x")
//// hash hashes data with sha256 and returns the hex string.
//func hash(data string) string {
// // A SHA256 hash is 64 characters, which is within the 253 character limit for kube resource names
// h := fmt.Sprintf("%x", sha256.Sum256([]byte(data)))
//
// // Make the hash 63 characters instead to comply with the 63 character limit for labels
// return fmt.Sprintf(h[:len(h)-1])
//}

unpackRetryCatalog := fmt.Sprintf(`
schema: olm.package
name: unpack-retry-package
Expand Down

0 comments on commit ec74cef

Please sign in to comment.