/
job.go
176 lines (154 loc) · 6.06 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
// Job handles the internal representation of a job and it's context.
package job
import (
"context"
"crypto/sha256"
"fmt"
k8upv1 "github.com/k8up-io/k8up/v2/api/v1"
"github.com/k8up-io/k8up/v2/operator/cfg"
"github.com/k8up-io/k8up/v2/operator/monitoring"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
const (
// K8uplabel is a label that is required for the operator to differentiate
// batchv1.job objects managed by k8up from others.
K8uplabel = "k8upjob"
// K8upExclusive is needed to determine if a given job is considered exclusive or not.
K8upExclusive = "k8upjob/exclusive"
)
// Config represents the whole context for a given job. It contains everything
// that is necessary to handle the job.
type Config struct {
Client client.Client
Obj k8upv1.JobObject
Repository string
}
// NewConfig returns a new configuration.
func NewConfig(client client.Client, obj k8upv1.JobObject, repository string) Config {
return Config{
Client: client,
Obj: obj,
Repository: repository,
}
}
// MutateBatchJob mutates the given Job with generic spec applicable to all K8up-spawned Jobs.
func MutateBatchJob(batchJob *batchv1.Job, jobObj k8upv1.JobObject, config Config) error {
batchJob.Labels = labels.Merge(batchJob.Labels, labels.Set{
K8uplabel: "true",
k8upv1.LabelK8upType: jobObj.GetType().String(),
k8upv1.LabelK8upOwnedBy: jobObj.GetType().String() + "_" + jobObj.GetName(),
k8upv1.LabelRepositoryHash: Sha256Hash(config.Repository),
})
batchJob.Spec.ActiveDeadlineSeconds = config.Obj.GetActiveDeadlineSeconds()
batchJob.Spec.Template.Labels = labels.Merge(batchJob.Spec.Template.Labels, labels.Set{
K8uplabel: "true",
})
batchJob.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure
batchJob.Spec.Template.Spec.SecurityContext = jobObj.GetPodSecurityContext()
containers := batchJob.Spec.Template.Spec.Containers
if len(containers) == 0 {
containers = make([]corev1.Container, 1)
}
containers[0].Name = config.Obj.GetType().String()
containers[0].Image = cfg.Config.BackupImage
containers[0].Command = cfg.Config.BackupCommandRestic
containers[0].Resources = config.Obj.GetResources()
batchJob.Spec.Template.Spec.Containers = containers
return controllerruntime.SetControllerReference(jobObj, batchJob, config.Client.Scheme())
}
func ReconcileJobStatus(ctx context.Context, key types.NamespacedName, client client.Client, obj k8upv1.JobObject) error {
log := controllerruntime.LoggerFrom(ctx)
log.V(1).Info("reconciling job", "key", key)
batchJob := &batchv1.Job{}
err := client.Get(ctx, key, batchJob)
if err != nil {
if !errors.IsNotFound(err) {
return fmt.Errorf("unable to get job: %w", err)
}
log.V(1).Info("job not found", "key", key)
return nil
}
UpdateStatus(ctx, batchJob, obj)
log.V(1).Info("updating status")
if err := client.Status().Update(ctx, obj); err != nil {
return fmt.Errorf("obj status update failed: %w", err)
}
return nil
}
// UpdateStatus retrieves status of batchJob and sets status of obj accordingly.
func UpdateStatus(ctx context.Context, batchJob *batchv1.Job, obj k8upv1.JobObject) {
// update status conditions based on Job status
objStatus := obj.GetStatus()
message := fmt.Sprintf("job '%s' has %d active, %d succeeded and %d failed pods",
batchJob.Name, batchJob.Status.Active, batchJob.Status.Succeeded, batchJob.Status.Failed)
if HasSucceeded(batchJob.Status.Conditions) {
SetSucceeded(ctx, batchJob.Name, batchJob.Namespace, obj.GetType(), &objStatus, message)
}
if HasFailed(batchJob.Status.Conditions) {
SetFailed(ctx, batchJob.Name, batchJob.Namespace, obj.GetType(), &objStatus, message)
}
if HasStarted(batchJob.Status.Conditions) {
objStatus.SetStarted(message)
}
obj.SetStatus(objStatus)
}
func HasSucceeded(conditions []batchv1.JobCondition) bool {
successCond := FindStatusCondition(conditions, batchv1.JobComplete)
return successCond != nil && successCond.Status == corev1.ConditionTrue
}
func HasFailed(conditions []batchv1.JobCondition) bool {
failedCond := FindStatusCondition(conditions, batchv1.JobFailed)
return failedCond != nil && failedCond.Status == corev1.ConditionTrue
}
func HasStarted(conditions []batchv1.JobCondition) bool {
successCond := FindStatusCondition(conditions, batchv1.JobComplete)
failedCond := FindStatusCondition(conditions, batchv1.JobFailed)
return successCond == nil && failedCond == nil
}
func SetSucceeded(ctx context.Context, name, ns string, typ k8upv1.JobType, objStatus *k8upv1.Status, message string) {
log := controllerruntime.LoggerFrom(ctx)
if !objStatus.HasSucceeded() {
// only increase success counter if new condition
monitoring.IncSuccessCounters(ns, typ)
log.Info("Job succeeded")
}
objStatus.SetSucceeded(message)
objStatus.SetFinished(fmt.Sprintf("job '%s' completed successfully", name))
}
func SetFailed(ctx context.Context, name, ns string, typ k8upv1.JobType, objStatus *k8upv1.Status, message string) {
log := controllerruntime.LoggerFrom(ctx)
if !objStatus.HasFailed() {
// only increase fail counter if new condition
monitoring.IncFailureCounters(ns, typ)
log.Info("Job failed")
}
objStatus.SetFailed(message)
objStatus.SetFinished(fmt.Sprintf("job '%s' has failed", name))
}
// FindStatusCondition finds the condition with the given type in the batchv1.JobCondition slice.
// Returns nil if not found.
func FindStatusCondition(conditions []batchv1.JobCondition, conditionType batchv1.JobConditionType) *batchv1.JobCondition {
for _, condition := range conditions {
if condition.Type == conditionType {
return &condition
}
}
return nil
}
// Sha256Hash returns the SHA256 hash string of the given string
// Returns empty string if v is empty.
// The returned hash is shortened to 63 characters to fit into a label.
func Sha256Hash(v string) string {
if v == "" {
return ""
}
h := sha256.New()
h.Write([]byte(v))
return fmt.Sprintf("%x", h.Sum(nil))[:63]
}