-
Notifications
You must be signed in to change notification settings - Fork 114
/
controller.go
263 lines (239 loc) · 8.96 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
package tekton
import (
"bytes"
"context"
"fmt"
"os"
"text/template"
lighthousev1alpha1 "github.com/jenkins-x/lighthouse/pkg/apis/lighthouse/v1alpha1"
configjob "github.com/jenkins-x/lighthouse/pkg/config/job"
"github.com/jenkins-x/lighthouse/pkg/util"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
pipelinev1beta1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
const jobOwnerKey = ".metadata.controller"
var apiGVStr = lighthousev1alpha1.SchemeGroupVersion.String()
// LighthouseJobReconciler reconciles a LighthouseJob object
type LighthouseJobReconciler struct {
client client.Client
apiReader client.Reader
logger *logrus.Entry
scheme *runtime.Scheme
idGenerator buildIDGenerator
dashboardURL string
dashboardTemplate string
namespace string
disableLogging bool
}
// NewLighthouseJobReconciler creates a LighthouseJob reconciler
func NewLighthouseJobReconciler(client client.Client, apiReader client.Reader, scheme *runtime.Scheme, dashboardURL string, dashboardTemplate string, namespace string) *LighthouseJobReconciler {
if dashboardTemplate == "" {
dashboardTemplate = os.Getenv("LIGHTHOUSE_DASHBOARD_TEMPLATE")
}
return &LighthouseJobReconciler{
client: client,
apiReader: apiReader,
logger: logrus.NewEntry(logrus.StandardLogger()).WithField("controller", controllerName),
scheme: scheme,
dashboardURL: dashboardURL,
dashboardTemplate: dashboardTemplate,
namespace: namespace,
idGenerator: &epochBuildIDGenerator{},
}
}
// SetupWithManager sets up the reconcilier with it's manager
func (r *LighthouseJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
indexFunc := func(rawObj client.Object) []string {
obj := rawObj.(*pipelinev1beta1.PipelineRun)
owner := metav1.GetControllerOf(obj)
// TODO: would be nice to get kind from the type rather than a hard coded string
if owner == nil || owner.APIVersion != apiGVStr || owner.Kind != "LighthouseJob" {
return nil
}
return []string{owner.Name}
}
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &pipelinev1beta1.PipelineRun{}, jobOwnerKey, indexFunc); err != nil {
return err
}
return ctrl.NewControllerManagedBy(mgr).
For(&lighthousev1alpha1.LighthouseJob{}).
WithEventFilter(predicate.ResourceVersionChangedPredicate{}).
Owns(&pipelinev1beta1.PipelineRun{}).
Complete(r)
}
// Reconcile represents an iteration of the reconciliation loop
func (r *LighthouseJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
if ctx == nil {
ctx = context.Background()
}
r.logger.Infof("Reconcile LighthouseJob %+v", req)
// get lighthouse job
var job lighthousev1alpha1.LighthouseJob
if err := r.client.Get(ctx, req.NamespacedName, &job); err != nil {
r.logger.Warningf("Unable to get LighthouseJob: %s", err)
// we'll ignore not-found errors, since they can't be fixed by an immediate
// requeue (we'll need to wait for a new notification), and we can get them
// on deleted requests.
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// filter on job agent
if job.Spec.Agent != configjob.TektonPipelineAgent {
return ctrl.Result{}, nil
}
// get job's pipeline runs
var pipelineRunList pipelinev1beta1.PipelineRunList
if err := r.client.List(ctx, &pipelineRunList, client.InNamespace(req.Namespace), client.MatchingFields{jobOwnerKey: req.Name}); err != nil {
r.logger.Errorf("Failed list pipeline runs: %s", err)
return ctrl.Result{}, err
}
// if pipeline run does not exist, create it
if len(pipelineRunList.Items) == 0 {
if job.Status.State == lighthousev1alpha1.TriggeredState {
// construct a pipeline run
pipelineRun, err := makePipelineRun(ctx, job, r.namespace, r.logger, r.idGenerator, r.apiReader)
if err != nil {
r.logger.Errorf("Failed to make pipeline run: %s", err)
return ctrl.Result{}, err
}
// link it to the current lighthouse job
if err := ctrl.SetControllerReference(&job, pipelineRun, r.scheme); err != nil {
r.logger.Errorf("Failed to set owner reference: %s", err)
return ctrl.Result{}, err
}
// lets disable the blockOwnerDeletion as it fails on OpenShift
for i := range pipelineRun.OwnerReferences {
ref := &pipelineRun.OwnerReferences[i]
if ref.Kind == "LighthouseJob" && ref.BlockOwnerDeletion != nil {
ref.BlockOwnerDeletion = nil
}
}
// TODO: changing the status should be a consequence of a pipeline run being created
// update status
status := lighthousev1alpha1.LighthouseJobStatus{
State: lighthousev1alpha1.PendingState,
StartTime: metav1.Now(),
}
f := func(job *lighthousev1alpha1.LighthouseJob) error {
job.Status = status
if err := r.client.Status().Update(ctx, job); err != nil {
r.logger.Errorf("Failed to update LighthouseJob status: %s", err)
return err
}
return nil
}
err = r.retryModifyJob(ctx, req.NamespacedName, &job, f)
if err != nil {
return ctrl.Result{}, err
}
// create pipeline run
if err := r.client.Create(ctx, pipelineRun); err != nil {
r.logger.Errorf("Failed to create pipeline run: %s", err)
return ctrl.Result{}, err
}
}
} else if len(pipelineRunList.Items) == 1 {
// if pipeline run exists, create it and update status
pipelineRun := pipelineRunList.Items[0]
if !r.disableLogging {
r.logger.Infof("Reconcile PipelineRun %+v", pipelineRun)
}
// update build id
if job.Labels[util.BuildNumLabel] != pipelineRun.Labels[util.BuildNumLabel] {
f := func(job *lighthousev1alpha1.LighthouseJob) error {
job.Labels[util.BuildNumLabel] = pipelineRun.Labels[util.BuildNumLabel]
if err := r.client.Update(ctx, job); err != nil {
return errors.Wrapf(err, "failed to add build label Project status")
}
return nil
}
err := r.retryModifyJob(ctx, req.NamespacedName, &job, f)
if err != nil {
return ctrl.Result{}, err
}
}
f := func(job *lighthousev1alpha1.LighthouseJob) error {
if r.dashboardURL != "" {
job.Status.ReportURL = r.getPipelingetPipelineTargetURLeTargetURL(pipelineRun)
}
job.Status.Activity = ConvertPipelineRun(&pipelineRun)
if err := r.client.Status().Update(ctx, job); err != nil {
return errors.Wrapf(err, "failed to update LighthouseJob status")
}
return nil
}
err := r.retryModifyJob(ctx, req.NamespacedName, &job, f)
if err != nil {
return ctrl.Result{}, err
}
} else {
r.logger.Errorf("A lighthouse job should never have more than 1 pipeline run")
}
return ctrl.Result{}, nil
}
func (r *LighthouseJobReconciler) getPipelingetPipelineTargetURLeTargetURL(pipelineRun pipelinev1beta1.PipelineRun) string {
if r.dashboardTemplate == "" {
return fmt.Sprintf("%s/#/namespaces/%s/pipelineruns/%s", trimDashboardURL(r.dashboardURL), r.namespace, pipelineRun.Name)
}
funcMap := map[string]interface{}{}
tmpl, err := template.New("value.gotmpl").Option("missingkey=error").Funcs(funcMap).Parse(r.dashboardTemplate)
if err != nil {
r.logger.WithError(err).Warnf("failed to parse template: %s", r.dashboardTemplate)
return ""
}
labels := pipelineRun.Labels
if labels == nil {
labels = map[string]string{}
}
namespace := pipelineRun.Namespace
if namespace == "" {
namespace = r.namespace
}
templateData := map[string]interface{}{
"Branch": labels[util.BranchLabel],
"BuildNum": labels[util.BuildNumLabel],
"Context": labels[util.ContextLabel],
"Namespace": namespace,
"Org": labels[util.OrgLabel],
"PipelineRun": pipelineRun.Name,
"Pull": labels[util.PullLabel],
"Repo": labels[util.RepoLabel],
}
var buf bytes.Buffer
err = tmpl.Execute(&buf, templateData)
if err != nil {
r.logger.WithError(err).Warnf("failed to parse template: %s for PipelineRun %s", r.dashboardTemplate, pipelineRun.Name)
return ""
}
return fmt.Sprintf("%s/%s", trimDashboardURL(r.dashboardURL), buf.String())
}
const retryCount = 5
// retryModifyJob tries to modify the Job retrying if it fails
func (r *LighthouseJobReconciler) retryModifyJob(ctx context.Context, ns client.ObjectKey, job *lighthousev1alpha1.LighthouseJob, f func(job *lighthousev1alpha1.LighthouseJob) error) error {
i := 0
for {
i++
err := f(job)
if err == nil {
if i > 1 {
r.logger.Infof("took %d attempts to update Job %s", i, job.Name)
}
return nil
}
if i >= retryCount {
return errors.Wrapf(err, "failed to update Job %s after %d attempts", job.Name, retryCount)
}
if err := r.client.Get(ctx, ns, job); err != nil {
r.logger.Warningf("Unable to get LighthouseJob %s due to: %s", job.Name, err)
// we'll ignore not-found errors, since they can't be fixed by an immediate
// requeue (we'll need to wait for a new notification), and we can get them
// on deleted requests.
return client.IgnoreNotFound(err)
}
}
}