/
job_controller.go
468 lines (401 loc) · 16.2 KB
/
job_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
/*
* Copyright (c) 2020, 2024, Oracle and/or its affiliates.
* Licensed under the Universal Permissive License v 1.0 as shown at
* http://oss.oracle.com/licenses/upl.
*/
package job
import (
"context"
"fmt"
"github.com/go-logr/logr"
coh "github.com/oracle/coherence-operator/api/v1"
"github.com/oracle/coherence-operator/controllers/reconciler"
"github.com/oracle/coherence-operator/pkg/probe"
"github.com/oracle/coherence-operator/pkg/utils"
"github.com/pkg/errors"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"time"
)
const (
// The name of this controller. This is used in events, log messages, etc.
controllerName = "controllers.Job"
CreateMessage string = "successfully created Job for Coherence resource '%s'"
FailedToPatchMessage string = "failed to patch Coherence resource %s due to error\n%s"
)
// blank assignment to verify that ReconcileServiceMonitor implements reconcile.Reconciler.
// If the reconcile.Reconciler API was to change then we'd get a compile error here.
var _ reconcile.Reconciler = &ReconcileJob{}
// NewJobReconciler returns a new Job reconciler.
func NewJobReconciler(mgr manager.Manager) reconciler.SecondaryResourceReconciler {
r := &ReconcileJob{
ReconcileSecondaryResource: reconciler.ReconcileSecondaryResource{
Kind: coh.ResourceTypeJob,
Template: &batchv1.Job{},
},
}
r.SetCommonReconciler(controllerName, mgr)
return r
}
// ReconcileJob is a reconciler for Jobs.
type ReconcileJob struct {
reconciler.ReconcileSecondaryResource
}
func (in *ReconcileJob) GetReconciler() reconcile.Reconciler { return in }
func (in *ReconcileJob) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
logger := in.GetLog().WithValues("Namespace", request.Namespace, "Name", request.Name, "Kind", "Job")
logger.Info("Starting reconcile")
// Attempt to lock the requested resource. If the resource is locked then another
// request for the same resource is already in progress so requeue this one.
if ok := in.Lock(request); !ok {
return reconcile.Result{Requeue: true, RequeueAfter: 0}, nil
}
// Make sure that the request is unlocked when this method exits
defer in.Unlock(request)
storage, err := utils.NewStorage(request.NamespacedName, in.GetManager())
if err != nil {
return reconcile.Result{}, err
}
result, err := in.ReconcileAllResourceOfKind(ctx, request, nil, storage)
logger.Info("Completed reconcile")
return result, err
}
// ReconcileAllResourceOfKind will process the specified reconcile request for the specified deployment.
// The previous state being reconciled can be obtained from the storage parameter.
func (in *ReconcileJob) ReconcileAllResourceOfKind(ctx context.Context, request reconcile.Request, deployment coh.CoherenceResource, storage utils.Storage) (reconcile.Result, error) {
result := reconcile.Result{}
var statuses []coh.CoherenceJobProbeStatus
if !storage.IsJob(request) {
// Nothing to do, not running as a Job
return result, nil
}
logger := in.GetLog().WithValues("Namespace", request.Namespace, "Name", request.Name)
logger.Info("Reconciling Job")
// Fetch the Job's current state
jobCurrent, jobExists, err := in.MaybeFindJob(ctx, request.Namespace, request.Name)
if err != nil {
logger.Info("Finished reconciling Job. Error getting Job", "error", err.Error())
return result, errors.Wrapf(err, "getting Job %s/%s", request.Namespace, request.Name)
}
jobCompleted := false
if jobExists {
jobCompleted = jobCurrent.Status.CompletionTime != nil
}
if jobExists && jobCurrent.GetDeletionTimestamp() != nil {
logger.Info("Finished reconciling Job. The Job is being deleted")
// The Job exists but is being deleted
return result, nil
}
if jobExists && deployment == nil {
// find the owning Coherence resource
if deployment, err = in.FindOwningCoherenceResource(ctx, jobCurrent); err != nil {
logger.Info("Finished reconciling Job. Error finding parent Coherence resource", "error", err.Error())
return reconcile.Result{}, err
}
}
switch {
case deployment == nil || deployment.GetReplicas() == 0:
// The Coherence resource does not exist, or it exists and is scaling down to zero replicas
if jobExists {
// The Job does exist though, so it needs to be deleted.
if deployment != nil {
// If we get here, we must be scaling down to zero as the Coherence resource exists
// If the Coherence resource did not exist then service suspension already happened
// when the Coherence resource was deleted.
logger.Info("Scaling down to zero")
err = in.UpdateDeploymentStatusActionsState(ctx, request.NamespacedName, false)
// TODO: what to do with error?
if err != nil {
logger.Info("Error updating CoherenceJob status", "error", err.Error())
}
}
// delete the Job
err = in.Delete(ctx, request.Namespace, request.Name, logger)
} else {
// The Job and parent resource have been deleted so no more to do
err = in.updateDeploymentStatus(ctx, request, nil)
return reconcile.Result{}, err
}
case !jobExists:
// Job does not exist but deployment does so create the Job (checking any start quorum)
result, err = in.createJob(ctx, deployment, storage, logger)
case jobCompleted:
// Nothing to do, the job is complete
err = in.updateDeploymentStatus(ctx, request, nil)
return reconcile.Result{}, err
default:
// Both Job and deployment exists so this is maybe an update
result, err = in.updateJob(ctx, deployment, jobCurrent.DeepCopy(), storage, logger)
if err == nil {
statuses, err = in.maybeExecuteProbe(ctx, jobCurrent, deployment, logger)
}
}
if err != nil {
logger.Info("Finished reconciling Job with error", "error", err.Error())
return result, err
}
err = in.updateDeploymentStatus(ctx, request, statuses)
if err != nil {
return result, err
}
logger.Info("Finished reconciling Job")
return result, nil
}
func (in *ReconcileJob) createJob(ctx context.Context, deployment coh.CoherenceResource, storage utils.Storage, logger logr.Logger) (reconcile.Result, error) {
logger.Info("Creating Job")
ok, reason := in.CanCreate(ctx, deployment)
if !ok {
// start quorum not met, send event and update deployment status
in.GetEventRecorder().Event(deployment, corev1.EventTypeNormal, "Waiting", reason)
_ = in.UpdateCoherenceJobStatusCondition(ctx, deployment.GetNamespacedName(), coh.Condition{
Type: coh.ConditionTypeWaiting,
Status: corev1.ConditionTrue,
Reason: "StatusQuorum",
Message: reason,
})
return reconcile.Result{Requeue: true, RequeueAfter: time.Second * 30}, nil
}
err := in.Create(ctx, deployment.GetName(), storage, logger)
if err == nil {
// ensure that the deployment has a Created status
err := in.UpdateCoherenceJobStatusPhase(ctx, deployment.GetNamespacedName(), coh.ConditionTypeCreated)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "updating deployment status")
}
// send a successful creation event
msg := fmt.Sprintf(CreateMessage, deployment.GetName())
in.GetEventRecorder().Event(deployment, corev1.EventTypeNormal, reconciler.EventReasonCreated, msg)
}
logger.Info("Created Job")
return reconcile.Result{}, err
}
func (in *ReconcileJob) updateJob(ctx context.Context, deployment coh.CoherenceResource, job *batchv1.Job, storage utils.Storage, logger logr.Logger) (reconcile.Result, error) {
// get the desired resource state from the store
resource, found := storage.GetLatest().GetResource(coh.ResourceTypeJob, job.Name)
if !found {
// Desired state not found requeue and the request should sort itself out next time around
logger.Info("Cannot locate desired state for Job, possibly a deletion, re-queuing request")
return reconcile.Result{Requeue: true}, nil
}
if resource.IsDelete() {
// we should never get here, requeue and the request should sort itself out next time around
logger.Info("In update path for Job, but is a deletion - re-queuing request")
return reconcile.Result{Requeue: true}, nil
}
desired := resource.Spec.(*batchv1.Job)
// copy the job as the patch
return in.patchJob(ctx, deployment, job, desired, storage, logger)
}
// Patch the Job if required, returning a bool to indicate whether a patch was applied.
func (in *ReconcileJob) patchJob(ctx context.Context, deployment coh.CoherenceResource, job, desired *batchv1.Job, storage utils.Storage, logger logr.Logger) (reconcile.Result, error) {
hashMatches := in.HashLabelsMatch(job, storage)
if hashMatches {
return reconcile.Result{}, nil
}
resource, _ := storage.GetPrevious().GetResource(coh.ResourceTypeJob, job.GetName())
original := &batchv1.Job{}
if resource.IsPresent() {
err := resource.As(original)
if err != nil {
return in.HandleErrAndRequeue(ctx, err, deployment, fmt.Sprintf(FailedToPatchMessage, deployment.GetName(), err.Error()), logger)
}
} else {
// there was no previous
original = desired
}
errorList := coh.ValidateJobUpdate(desired, original)
if len(errorList) > 0 {
msg := fmt.Sprintf("upddates to the statefuleset would have been invalid, the update will not be re-queued: %v", errorList)
events := in.GetEventRecorder()
events.Event(deployment, corev1.EventTypeWarning, reconciler.EventReasonUpdated, msg)
return reconcile.Result{Requeue: false}, fmt.Errorf(msg)
}
// copy the job, so we do not alter the passed in job
current := job.DeepCopy()
// We NEVER patch finalizers
original.ObjectMeta.Finalizers = current.ObjectMeta.Finalizers
desired.ObjectMeta.Finalizers = current.ObjectMeta.Finalizers
// We need to ensure we do not create a patch due to differences in
// Job Status, so we blank out the status fields
desired.Status = batchv1.JobStatus{}
current.Status = batchv1.JobStatus{}
original.Status = batchv1.JobStatus{}
desiredPodSpec := desired.Spec.Template
currentPodSpec := current.Spec.Template
originalPodSpec := original.Spec.Template
// ensure we do not patch any fields that may be set by a previous version of the Operator
// as this will cause a rolling update of the Pods, typically these are fields where
// the Operator sets defaults, and we changed the default behaviour
in.BlankContainerFields(deployment, &desiredPodSpec)
in.BlankContainerFields(deployment, ¤tPodSpec)
in.BlankContainerFields(deployment, &originalPodSpec)
// Sort the environment variables, so we do not patch on just a re-ordering of env vars
in.SortEnvForAllContainers(&desiredPodSpec)
in.SortEnvForAllContainers(¤tPodSpec)
in.SortEnvForAllContainers(&originalPodSpec)
// ensure the Coherence image is present so that we do not patch on a Coherence resource
// from pre-3.1.x that does not have images set
spec := deployment.GetSpec()
if spec.Image == nil {
cohImage := in.GetCoherenceImage(&desiredPodSpec)
in.SetCoherenceImage(&originalPodSpec, cohImage)
in.SetCoherenceImage(¤tPodSpec, cohImage)
}
// ensure the Operator image is present so that we do not patch on a Coherence resource
// from pre-3.1.x that does not have images set
if spec.CoherenceUtils == nil || spec.CoherenceUtils.Image == nil {
operatorImage := in.GetOperatorImage(&desiredPodSpec)
in.SetOperatorImage(&originalPodSpec, operatorImage)
in.SetOperatorImage(¤tPodSpec, operatorImage)
}
// a callback function that the 3-way patch method will call just before it applies a patch
// if there is any patch to apply, this will check StatusHA if required and update the deployment status
callback := func() {
// ensure that the deployment has an "Upgrading" status
if err := in.UpdateCoherenceJobStatusPhase(ctx, deployment.GetNamespacedName(), coh.ConditionTypeRollingUpgrade); err != nil {
logger.Error(err, "Error updating deployment status to Upgrading")
}
}
// fix the CreationTimestamp so that it is not in the patch
desired.SetCreationTimestamp(current.GetCreationTimestamp())
// create the patch to see whether there is anything to update
patch, data, err := in.CreateThreeWayPatch(current.GetName(), original, desired, current, reconciler.PatchIgnore)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to create patch for Job/%s", current.GetName())
}
if patch == nil {
// nothing to patch so just return
return reconcile.Result{}, nil
}
// now apply the patch
patched, err := in.ApplyThreeWayPatchWithCallback(ctx, current.GetName(), current, patch, data, callback)
// log the result of patching
switch {
case err != nil:
logger.Info("Error patching Job " + err.Error())
return in.HandleErrAndRequeue(ctx, err, deployment, fmt.Sprintf(FailedToPatchMessage, deployment.GetName(), err.Error()), logger)
case !patched:
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, nil
}
// updateDeploymentStatus updates the Coherence resource's status.
func (in *ReconcileJob) updateDeploymentStatus(ctx context.Context, request reconcile.Request, probeStatuses []coh.CoherenceJobProbeStatus) error {
var err error
var job *batchv1.Job
job, _, err = in.MaybeFindJob(ctx, request.Namespace, request.Name)
if err != nil {
// an error occurred
err = errors.Wrapf(err, "getting Job %s", request.Name)
return err
}
cj := &coh.CoherenceJob{}
err = in.GetClient().Get(ctx, request.NamespacedName, cj)
switch {
case err != nil && apierrors.IsNotFound(err):
// deployment not found - possibly deleted
err = nil
case err != nil:
// an error occurred
err = errors.Wrapf(err, "getting deployment %s", request.Name)
case cj.GetDeletionTimestamp() != nil:
// deployment is being deleted
err = nil
default:
updated := cj.DeepCopy()
var jobStatus *batchv1.JobStatus
if job == nil {
jobStatus = nil
} else {
jobStatus = &job.Status
}
if updated.Status.UpdateFromJob(cj, jobStatus, probeStatuses) {
err = in.GetClient().Status().Update(ctx, updated)
}
}
return err
}
func (in *ReconcileJob) maybeExecuteProbe(ctx context.Context, job *batchv1.Job, deployment coh.CoherenceResource, logger logr.Logger) ([]coh.CoherenceJobProbeStatus, error) {
var statuses []coh.CoherenceJobProbeStatus
spec, _ := deployment.GetJobResourceSpec()
action := spec.ReadyAction
if action == nil {
return statuses, nil
}
// get the
var readyCount int32
if action.ReadyCount != nil {
readyCount = *action.ReadyCount
} else {
readyCount = deployment.GetReplicas()
}
count := job.Status.Succeeded
if job.Status.Ready != nil {
count += *job.Status.Ready
}
if count < readyCount {
return statuses, nil
}
c := in.GetClient()
labels := client.MatchingLabels{}
for k, v := range job.Spec.Selector.MatchLabels {
labels[k] = v
}
list := corev1.PodList{}
err := c.List(ctx, &list, client.InNamespace(deployment.GetNamespace()), labels)
if err != nil {
return statuses, errors.Wrapf(err, "error getting list of Pods for Job %s", job.Name)
}
if list.Size() == 0 {
return statuses, nil
}
p := probe.CoherenceProbe{
Client: in.GetClient(),
Config: in.GetManager().GetConfig(),
}
status := deployment.GetStatus()
for _, pod := range list.Items {
name := pod.Name
probeStatus := status.FindJobProbeStatus(name)
podCondition := in.findPodReadyCondition(pod)
if in.shouldExecuteProbe(probeStatus, podCondition) {
_, err := p.RunProbe(ctx, pod, &action.Probe)
if err == nil {
logger.Info(fmt.Sprintf("Executed probe using pod %s", name), "Error", "nil")
probeStatus.Success = ptr.To(true)
} else {
logger.Info(fmt.Sprintf("Executed probe using pod %s", name), "Error", err)
probeStatus.Success = ptr.To(false)
probeStatus.Error = ptr.To(err.Error())
}
now := metav1.Now()
probeStatus.LastProbeTime = &now
probeStatus.LastReadyTime = &podCondition.LastTransitionTime
statuses = append(statuses, probeStatus)
}
}
return statuses, nil
}
func (in *ReconcileJob) findPodReadyCondition(pod corev1.Pod) *corev1.PodCondition {
for _, c := range pod.Status.Conditions {
if c.Type == corev1.PodReady {
return &c
}
}
return nil
}
func (in *ReconcileJob) shouldExecuteProbe(probeStatus coh.CoherenceJobProbeStatus, podCondition *corev1.PodCondition) bool {
if podCondition == nil || podCondition.Status != corev1.ConditionTrue {
return false
}
if podCondition.LastTransitionTime.Before(probeStatus.LastReadyTime) {
return false
}
return true
}