/
jobcleaner.go
109 lines (100 loc) · 3.38 KB
/
jobcleaner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package controller
import (
"strings"
"sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
batchv1Client "k8s.io/client-go/kubernetes/typed/batch/v1"
"k8s.io/client-go/tools/cache"
)
//JobCleaner is JobCleaner
type JobCleaner struct {
jobcInformer cache.SharedIndexInformer
batchClient batchv1Client.BatchV1Interface
completeTTL int
failedTTL int
deletionSlate map[string]bool
sync.Mutex
}
//NewJobCleaner cleans old jobs
func NewJobCleaner(jobcInformer cache.SharedIndexInformer, batchClient batchv1Client.BatchV1Interface) *JobCleaner {
j := &JobCleaner{
jobcInformer: jobcInformer,
batchClient: batchClient,
completeTTL: viper.GetInt("controller.completejobttl"),
failedTTL: viper.GetInt("controller.failedjobttl"),
deletionSlate: make(map[string]bool),
}
jobcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: j.cleanAddJob,
UpdateFunc: j.trackUpdateJob,
})
return j
}
func (j *JobCleaner) cleanAddJob(obj interface{}) {
job, ok := obj.(*batchv1.Job)
if !ok {
log.Errorf("Not a job object")
return
}
if !strings.Contains(job.Name, "deploy-tracker-") {
return
}
for _, condition := range job.Status.Conditions {
if condition.Type == "Complete" && condition.Status == "True" {
j.Lock()
if slated, ok := j.deletionSlate[job.Name]; !ok || !slated {
log.Tracef("JobCleaner: Job %s complete, slated for deletion in %d minutes", job.Name, j.completeTTL)
j.deletionSlate[job.Name] = true
go j.deleteJobAfter(job.Name, job.Namespace, j.completeTTL)
}
j.Unlock()
} else if condition.Type == "Failed" && condition.Status == "True" {
j.Lock()
if slated, ok := j.deletionSlate[job.Name]; !ok || !slated {
log.Tracef("JobCleaner: Job %s failed, slated for deletion in %d minutes", job.Name, j.failedTTL)
j.deletionSlate[job.Name] = true
go j.deleteJobAfter(job.Name, job.Namespace, j.failedTTL)
}
j.Unlock()
}
}
}
//Handler for jobcInformer Update
func (j *JobCleaner) trackUpdateJob(oldObj interface{}, newObj interface{}) {
j.cleanAddJob(newObj)
}
//Handler for jobcInformer Update
func (j *JobCleaner) deleteJobAfter(jobName, jobNamespace string, after int) {
time.Sleep(time.Duration(after) * time.Minute)
defer func() {
j.Lock()
j.deletionSlate[jobName] = false
j.Unlock()
}()
var gracePeriodSeconds int64
var propagationPolicy metav1.DeletionPropagation = metav1.DeletePropagationForeground
jobsClient := j.batchClient.Jobs(jobNamespace)
exJob, err := jobsClient.Get(jobName, metav1.GetOptions{})
if err == nil {
for _, condition := range exJob.Status.Conditions {
if (condition.Type == "Complete" || condition.Type == "Failed") && condition.Status == "True" {
log.Tracef("JobCleaner: Deleting job : %s", jobName)
err := jobsClient.Delete(jobName, &metav1.DeleteOptions{GracePeriodSeconds: &gracePeriodSeconds, PropagationPolicy: &propagationPolicy})
if err != nil {
log.Warnf("JobCleaner: Failed to delete job %s with error: %s", jobName, err)
}
return
}
}
log.Tracef("JobCleaner: Job %s was slated for deletion but was found running", jobName)
return
}
//Happens quite oftern where a job is rescheduled for deletion by an update sent by the api server
//So we just log it and do nothing
log.Tracef("JobCleaner: Job %s was already deleted", jobName)
return
}