This repository has been archived by the owner on Jan 14, 2024. It is now read-only.
/
scheduledbackup_jobs_observer.go
108 lines (94 loc) · 4.26 KB
/
scheduledbackup_jobs_observer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package controllers
import (
"context"
"github.com/pkg/errors"
riotkitorgv1alpha1 "github.com/riotkit-org/backup-maker-controller/pkg/apis/riotkit/v1alpha1"
"github.com/riotkit-org/backup-maker-controller/pkg/client/clientset/versioned/typed/riotkit/v1alpha1"
"github.com/riotkit-org/backup-maker-controller/pkg/domain"
"github.com/riotkit-org/backup-maker-controller/pkg/factory"
"github.com/riotkit-org/backup-maker-controller/pkg/integration"
"github.com/riotkit-org/backup-maker-controller/pkg/locking"
"github.com/sirupsen/logrus"
batchv1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"time"
)
// -----------------------------------------------------------------------------------------------------------------------------------------------------------------
// RequestedBackupAction is spawning multiple objects, mostly objects creating Pods. Those Pods are performing a Backup or Restore action,
// and each action has a RESULT - In progress, Failed or Succeeded. This result is collected and applied back to the RequestedBackupAction as an aggregated status.
// -----------------------------------------------------------------------------------------------------------------------------------------------------------------
type JobsManagedByScheduledBackupObserver struct {
Client client.Client
BRClient v1alpha1.RiotkitV1alpha1Interface
Integrations *integration.AllSupportedJobResourceTypes
Fetcher factory.CachedFetcher
Locker locking.Locker
}
func (r *JobsManagedByScheduledBackupObserver) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := logrus.WithContext(ctx).WithFields(map[string]interface{}{
"name": req.NamespacedName,
"controller": "JobsManagedByScheduledBackupObserver",
})
//
// 0. Do not allow doing same action multiple times at the same moment
//
lock := r.Locker.Obtain(ctx, req)
if lock.AlreadyLocked() {
logger.Infoln("Already processed, requeuing")
return ctrl.Result{RequeueAfter: time.Second * 10}, nil
}
if lock.HasFailure() {
return ctrl.Result{}, lock.GetError()
}
defer r.Locker.Done(ctx, lock)
logger.Info("Reconciling children")
// Fetch and populate the context
aggregate, err := factory.FetchSBAggregate(ctx, r.Fetcher, r.Client, logger, req)
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "cannot fetch ScheduledBackup from cache")
}
// Collect the report about all managed resources in our context
ownedReferences := aggregate.GetReferencesOfOwnedObjects()
report, healthy, err := createOwnedReferencesHealthReport(ctx, ownedReferences, r.Integrations, logger, req.Namespace)
// The Jobs are still running, wait for them to be finished
for _, healthStatus := range report {
if healthStatus.Running {
return ctrl.Result{RequeueAfter: time.Second * 10}, nil
}
}
// Update the status
r.updateStatus(ctx, logger, aggregate, report, healthy)
return ctrl.Result{}, nil
}
func (r *JobsManagedByScheduledBackupObserver) updateStatus(ctx context.Context, logger *logrus.Entry, aggregate *domain.ScheduledBackupAggregate, report []riotkitorgv1alpha1.JobHealthStatus, healthy bool) {
retry.RetryOnConflict(retry.DefaultRetry, func() error {
res, getErr := r.BRClient.ScheduledBackups(aggregate.Namespace).Get(ctx, aggregate.Name, metav1.GetOptions{})
if getErr != nil {
return getErr
}
res.Status = aggregate.ScheduledBackup.Status
res.Status.ChildrenResourcesHealth = report
res.Status.Healthy = healthy
_, updateErr := r.BRClient.ScheduledBackups(aggregate.Namespace).UpdateStatus(ctx, res, metav1.UpdateOptions{})
logger.Debugf(".status field updated with .ChildrenResourcesHealth and .Healthy")
return updateErr
})
}
// SetupWithManager sets up the controller with the Manager.
func (r *JobsManagedByScheduledBackupObserver) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&riotkitorgv1alpha1.ScheduledBackup{}).
Owns(&batchv1.Job{}).
Owns(&batchv1.CronJob{}).
WithEventFilter(predicate.Funcs{
DeleteFunc: func(e event.DeleteEvent) bool {
return false
},
}).
Complete(r)
}