/
kuberhealthy.go
1735 lines (1463 loc) · 61.4 KB
/
kuberhealthy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
khcheckv1 "github.com/kuberhealthy/kuberhealthy/v2/pkg/apis/khcheck/v1"
khjobv1 "github.com/kuberhealthy/kuberhealthy/v2/pkg/apis/khjob/v1"
khstatev1 "github.com/kuberhealthy/kuberhealthy/v2/pkg/apis/khstate/v1"
"github.com/kuberhealthy/kuberhealthy/v2/pkg/checks/external"
"github.com/kuberhealthy/kuberhealthy/v2/pkg/checks/external/status"
"github.com/kuberhealthy/kuberhealthy/v2/pkg/health"
"github.com/kuberhealthy/kuberhealthy/v2/pkg/masterCalculation"
"github.com/kuberhealthy/kuberhealthy/v2/pkg/metrics"
)
// Kuberhealthy represents the kuberhealthy server and its checks
type Kuberhealthy struct {
Checks []*external.Checker
ListenAddr string // the listen address, such as ":80"
MetricForwarder metrics.Client
overrideKubeClient *kubernetes.Clientset
cancelChecksFunc context.CancelFunc // invalidates the context of all running checks
cancelReaperFunc context.CancelFunc // invalidates the context of the reaper
wg sync.WaitGroup // used to track running checks
shutdownCtxFunc context.CancelFunc // used to shutdown the main control select
stateReflector *StateReflector // a reflector that can cache the current state of the khState resources
}
// NewKuberhealthy creates a new kuberhealthy checker instance
func NewKuberhealthy() *Kuberhealthy {
kh := &Kuberhealthy{}
kh.stateReflector = NewStateReflector()
return kh
}
// setCheckExecutionError sets an execution error for a check name in
// its crd status
func (k *Kuberhealthy) setCheckExecutionError(checkName string, checkNamespace string, exErr error) error {
details := khstatev1.NewWorkloadDetails(khstatev1.KHCheck)
check, err := k.getCheck(checkName, checkNamespace)
if err != nil {
return err
}
if check.Namespace != "" {
details.Namespace = check.CheckNamespace()
}
details.OK = false
details.Errors = []string{"Check execution error: " + exErr.Error()}
// we need to maintain the current UUID, which means fetching it first
khc, err := k.getCheck(checkName, checkNamespace)
if err != nil {
return fmt.Errorf("Error when setting execution error on check %s %s %w", checkName, checkNamespace, err)
}
checkState, err := getCheckState(khc)
if err != nil {
return fmt.Errorf("Error when setting execution error on check (getting check state for current UUID) %s %s %w", checkName, checkNamespace, err)
}
details.CurrentUUID = checkState.CurrentUUID
log.Debugln("Setting execution state of check", checkName, "to", details.OK, details.Errors, details.CurrentUUID, details.GetKHWorkload())
// store the check state with the CRD
err = k.storeCheckState(checkName, checkNamespace, details)
if err != nil {
return fmt.Errorf("Was unable to write an execution error to the CRD status with error: %w", err)
}
return nil
}
// setJobExecutionError sets an execution error for a job name in its crd status
func (k *Kuberhealthy) setJobExecutionError(jobName string, jobNamespace string, exErr error) error {
details := khstatev1.NewWorkloadDetails(khstatev1.KHJob)
job, err := k.getJob(jobName, jobNamespace)
if err != nil {
return err
}
if job.Namespace != "" {
details.Namespace = job.CheckNamespace()
}
details.OK = false
details.Errors = []string{"Job execution error: " + exErr.Error()}
// we need to maintain the current UUID, which means fetching it first
khj, err := k.getJob(jobName, jobNamespace)
if err != nil {
return fmt.Errorf("Error when setting execution error on job %s %s %w", jobName, jobNamespace, err)
}
jobState, err := getJobState(khj)
if err != nil {
return fmt.Errorf("Error when setting execution error on job (getting job state for current UUID) %s %s %w", jobName, jobNamespace, err)
}
details.CurrentUUID = jobState.CurrentUUID
log.Debugln("Setting execution state of job", jobName, "to", details.OK, details.Errors, details.CurrentUUID, details.GetKHWorkload())
// store the check state with the CRD
err = k.storeCheckState(jobName, jobNamespace, details)
if err != nil {
return fmt.Errorf("Was unable to write an execution error to the CRD status with error: %w", err)
}
return nil
}
// AddCheck adds a check to Kuberhealthy. Must be done before Start or StartChecks
// are called.
func (k *Kuberhealthy) AddCheck(c *external.Checker) {
k.Checks = append(k.Checks, c)
}
// Shutdown causes the kuberhealthy chec k group to shutdown gracefully
func (k *Kuberhealthy) Shutdown(doneChan chan struct{}) {
if k.shutdownCtxFunc != nil {
log.Infoln("shutdown: aborting control context")
k.shutdownCtxFunc() // stop the control system
}
time.Sleep(5) // help prevent more checks from starting in a race before control system stop happens
log.Infoln("shutdown: stopping checks")
k.StopChecks() // stop all checks
log.Infoln("shutdown: ready for main program shutdown")
doneChan <- struct{}{}
}
// StopChecks causes the kuberhealthy check group to shutdown gracefully.
// All checks are sent a shutdown command at the same time.
func (k *Kuberhealthy) StopChecks() {
log.Infoln("control:", len(k.Checks), "checks stopping...")
if k.cancelChecksFunc != nil {
k.cancelChecksFunc()
}
// call a shutdown on all checks concurrently
for _, c := range k.Checks {
go func(c external.Checker) {
log.Infoln("control: check", c.Name(), "stopping...")
err := c.Shutdown()
if err != nil {
log.Errorln("control: ERROR stopping check", c.Name(), err)
}
k.wg.Done()
log.Infoln("control: check", c.Name(), "stopped")
}(*c)
}
// wait for all checks to stop cleanly
log.Infoln("control: waiting for all checks to stop")
k.wg.Wait()
log.Infoln("control: all checks stopped.")
}
// Start inits Kuberhealthy checks and master monitoring
func (k *Kuberhealthy) Start(ctx context.Context) {
// start the khState reflector
go k.stateReflector.Start()
// if influxdb is enabled, configure it
if cfg.EnableInflux == true {
k.configureInfluxForwarding()
}
// Start the web server and restart it if it crashes
go k.StartWebServer()
// find all the external checks from the khcheckcrd resources on the cluster and keep them in sync.
// use rate limiting to avoid reconfiguration spam
maxUpdateInterval := time.Second * 10
externalChecksUpdateChan := make(chan struct{}, 50)
externalChecksUpdateChanLimited := make(chan struct{}, 50)
go notifyChanLimiter(maxUpdateInterval, externalChecksUpdateChan, externalChecksUpdateChanLimited)
go k.monitorExternalChecks(ctx, externalChecksUpdateChan)
// we use two channels to indicate when we gain or lose master status. use rate limiting to avoid
// reconfiguration spam
becameMasterChan := make(chan struct{}, 10)
lostMasterChan := make(chan struct{}, 10)
go k.masterMonitor(ctx, becameMasterChan, lostMasterChan)
// monitor for kuberhealthy jobs and trigger when a new job is added
go k.monitorKHJobs(ctx)
// get notified when kuberhealthy configuration is reloaded
configReloadChan := make(chan struct{})
go configReloadNotifier(ctx, configReloadChan)
// loop and select channels to do appropriate thing when:
// - master kuberhealthy pod changes
// - new khchecks are added or modified
// - kuberhealthy configuration changes
for {
select {
case <-ctx.Done(): // we are shutting down
log.Infoln("control: shutting down from context abort...")
return
case <-becameMasterChan: // we have become the current master instance and should run checks
// reset checks and re-add from configuration settings
log.Infoln("control: Became master. Reconfiguring and starting checks.")
k.StartChecks(ctx)
k.StartReaper(ctx)
case <-lostMasterChan: // we are no longer master
log.Infoln("control: Lost master. Stopping checks.")
k.StopChecks()
k.StopReaper()
case <-externalChecksUpdateChanLimited: // external check change detected
log.Infoln("control: Witnessed a khcheck resource change...")
// if we are master, stop, reconfigure our khchecks, and start again with the new configuration
if isMaster {
log.Infoln("control: Reloading external check configurations due to khcheck update")
k.RestartChecks(ctx)
k.RestartReaper(ctx)
}
case <-configReloadChan:
log.Infoln("control: Witnessed a kuberhealthy configuration change...")
// if we are master, stop, reconfigure our khchecks, and start again with the new configuration
if isMaster {
log.Infoln("control: Reloading external check configurations due to kuberhealthy configuration update")
k.RestartChecks(ctx)
k.RestartReaper(ctx)
}
}
}
}
// StartReaper starts the check reaper
func (k *Kuberhealthy) StartReaper(ctx context.Context) {
reaperCtx, reaperCtxCancel := context.WithCancel(ctx)
k.cancelReaperFunc = reaperCtxCancel
go reaper(reaperCtx)
}
// StopReaper stops the check reaper
func (k *Kuberhealthy) StopReaper() {
if k.cancelReaperFunc != nil {
k.cancelReaperFunc()
}
}
// RestartReaper resrtarts the check reaper
func (k *Kuberhealthy) RestartReaper(ctx context.Context) {
k.StopReaper()
k.StartReaper(ctx)
}
// RestartChecks does a stop and start on all kuberhealthy checks
func (k *Kuberhealthy) RestartChecks(ctx context.Context) {
k.StopChecks()
k.StartChecks(ctx)
}
// khStateResourceReaper runs reapKHStateResources on an interval until the context for it is canceled
func (k *Kuberhealthy) khStateResourceReaper(ctx context.Context) {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
log.Infoln("khState reaper: starting up")
for {
select {
case <-ticker.C:
log.Infoln("khState reaper: starting to run an audit")
err := k.reapKHStateResources(ctx)
if err != nil {
log.Errorln("khState reaper: Error when reaping khState resources:", err)
}
case <-ctx.Done():
log.Infoln("khState reaper: stopping")
return
}
}
}
// reapKHStateResources runs a single audit on khState resources. Any that don't have a matching khCheck are
// deleted.
func (k *Kuberhealthy) reapKHStateResources(ctx context.Context) error {
// list all khStates in the cluster
khStates, err := khStateClient.KuberhealthyStates(listenNamespace).List(metav1.ListOptions{})
if err != nil {
return fmt.Errorf("khState reaper: error listing khStates for reaping: %w", err)
}
khChecks, err := listUnstructuredKHChecks(ctx)
if err != nil {
return fmt.Errorf("khState reaper: error listing unstructured khChecks: %w", err)
}
khJobs, err := khJobClient.KuberhealthyJobs(listenNamespace).List(metav1.ListOptions{})
if err != nil {
return fmt.Errorf("khState reaper: error listing khJobs for reaping: %w", err)
}
log.Infoln("khState reaper: analyzing", len(khStates.Items), "khState resources")
// any khState that does not have a matching khCheck should be deleted (ignore errors)
for _, khState := range khStates.Items {
log.Debugln("khState reaper: analyzing khState", khState.GetName(), "in", khState.GetName())
var foundKHCheck bool
for _, kc := range khChecks.Items {
khCheck, err := convertUnstructuredKhCheck(kc)
if err != nil {
log.Errorln("Error converting unstructured object to khcheck:", err)
continue
}
log.Debugln("khState reaper:", khCheck.GetName(), "==", khState.GetName(), "&&", khCheck.GetNamespace(), "==", khState.GetNamespace())
if khCheck.GetName() == khState.GetName() && khCheck.GetNamespace() == khState.GetNamespace() {
log.Infoln("khState reaper:", khState.GetName(), "in", khState.GetNamespace(), "is still valid")
foundKHCheck = true
break
}
}
var foundKHJob bool
for _, kj := range khJobs.Items {
log.Debugln("khState reaper:", kj.GetName(), "==", khState.GetName(), "&&", kj.GetNamespace(), "==", khState.GetNamespace())
if kj.GetName() == khState.GetName() && kj.GetNamespace() == khState.GetNamespace() {
log.Infoln("khState reaper:", khState.GetName(), "in", khState.GetNamespace(), "is still valid")
foundKHJob = true
break
}
}
// if we didn't find a matching khCheck or khJob, delete the rogue khState
if !foundKHCheck && !foundKHJob {
log.Infoln("khState reaper: removing khState", khState.GetName(), "in", khState.GetNamespace())
err := khStateClient.KuberhealthyStates(khState.GetNamespace()).Delete(khState.GetName(), &metav1.DeleteOptions{})
if err != nil {
log.Errorln(fmt.Errorf("khState reaper: error when removing invalid khstate: %w", err))
}
}
}
return nil
}
// monitorKHJobs watches for newly added KHJobs and triggers them
func (k *Kuberhealthy) monitorKHJobs(ctx context.Context) {
log.Debugln("Spawned watcher for KH jobs")
for {
log.Debugln("Starting a watch for khcheck jobs")
// wait a second so we don't retry too quickly on error
time.Sleep(time.Second)
watcher, err := khJobClient.KuberhealthyJobs(listenNamespace).Watch(metav1.ListOptions{})
if err != nil {
log.Errorln("error watching for khjob objects:", err)
continue
}
// watch for the watcher context to end, or the parent context. If the parent context ends, we close the watcher.
// if the watcher context ends, we shut down this go routine to prevent a leak as it restarts
watcherCtx, watcherCtxCancel := context.WithCancel(context.Background())
go func(watchCtx context.Context, ctx context.Context, watcher watch.Interface) {
select {
case <-watchCtx.Done():
break
case <-ctx.Done():
watcher.Stop()
}
log.Debugln("khjob monitor watch stopping")
}(watcherCtx, ctx, watcher)
for khj := range watcher.ResultChan() {
switch khj.Type {
// Watch only for added events since we only care about khjobs that added / created.
// Ignore all other event types.
case watch.Added:
log.Debugln("khjob monitor saw an added event")
kj := khj.Object.(*khjobv1.KuberhealthyJob)
if verifyNewKHJob(kj.Name, kj.Namespace) {
log.Infoln("khJob is newly added, triggering khjob:", kj.Name)
k.triggerKHJob(ctx, *kj)
continue
}
log.Debugln("KHJob is not new, in phase:", kj.Spec.Phase, "Skipping added event")
continue
case watch.Error:
log.Debugln("khjob monitor saw an error event")
e := khj.Object.(*metav1.Status)
log.Errorln("Error when watching khjobs:", e.Reason)
continue
default:
log.Warningln("khjob monitor saw an unknown event type and ignored it:", khj.Type)
continue
}
}
// if the watcher breaks, shutdown the parent context monitor go routine
watcherCtxCancel()
select {
case <-ctx.Done():
log.Debugln("khjob monitor closing due to context cancellation")
return
default:
}
}
}
// watchForKHCheckChanges watches for changes to khcheck objects and returns them through the specified channel
func (k *Kuberhealthy) watchForKHCheckChanges(ctx context.Context, c chan struct{}) {
log.Debugln("Spawned watcher for KH check changes")
for {
log.Debugln("Starting a watch for khcheck object changes")
// wait a second so we don't retry too quickly on error
time.Sleep(time.Second)
// start a watch on khcheck resources
watcher, err := watchUnstructuredKHChecks(ctx)
if err != nil {
log.Errorln("error creating watcher for khcheck objects:", err)
continue
}
// watch for the watcher context to end, or the parent context. If the parent context ends, we close the watcher.
// if the watcher context ends, we shut down this go routine to prevent a leak as it restarts
watcherCtx, watcherCtxCancel := context.WithCancel(context.Background())
go func(watchCtx context.Context, ctx context.Context, watcher watch.Interface) {
select {
case <-watchCtx.Done():
break
case <-ctx.Done():
watcher.Stop()
}
log.Debugln("khcheck change monitor watch stopping")
}(watcherCtx, ctx, watcher)
// loop over results and return them to the calling channel until we hit an error, then close and restart
for khc := range watcher.ResultChan() {
switch khc.Type {
case watch.Added:
log.Debugln("khcheck monitor saw an added event")
c <- struct{}{}
case watch.Modified:
log.Debugln("khcheck monitor saw a modified event")
c <- struct{}{}
case watch.Deleted:
log.Debugln("khcheck monitor saw a deleted event")
c <- struct{}{}
case watch.Error:
log.Debugln("khcheck monitor saw an error event")
e := khc.Object.(*metav1.Status)
log.Errorln("Error when watching for khcheck changes:", e.Reason)
continue
default:
log.Warningln("khcheck monitor saw an unknown event type and ignored it:", khc.Type)
}
}
// if the watcher breaks, shutdown the parent context monitor go routine
watcherCtxCancel()
select {
case <-ctx.Done():
log.Debugln("khcheck monitor closing due to context cancellation")
return
default:
}
}
}
func verifyNewKHJob(khJobName string, khJobNamespace string) bool {
kj, err := khJobClient.KuberhealthyJobs(khJobNamespace).Get(khJobName, metav1.GetOptions{})
if err != nil {
log.Debugln(khJobName, "Error getting khjob:", khJobName, err)
return false
}
log.Debugln("Found khjob:", kj.Name, "in job phase:", kj.Spec.Phase)
if kj.Spec.Phase == "" {
return true
}
return false
}
// monitorExternalChecks watches for changes to the external check CRDs
func (k *Kuberhealthy) monitorExternalChecks(ctx context.Context, notify chan struct{}) {
// make a map of resource versions so we know when things change
knownSettings := make(map[string]khcheckv1.CheckConfig)
// start watching for events to changes in the background
c := make(chan struct{})
go k.watchForKHCheckChanges(ctx, c)
// each time we see a change in our khcheck structs, we should look at every object to see if something has changed
for {
// wait for the change channel to detect a change before scanning again
<-c
log.Debugln("Change notification received. Scanning for external check changes...")
khChecks, err := listUnstructuredKHChecks(ctx)
if err != nil {
log.Errorln("error listing unstructured khChecks: %w", err)
continue
}
// this bool indicates if we should send a change signal to the channel
var foundChange bool
// if a khcheck has been deleted, then we signal for change and purge it from the knownSettings map.
for mapName := range knownSettings {
var existsInItems bool // indicates the item exists in the item listing
for _, kc := range khChecks.Items {
khCheck, err := convertUnstructuredKhCheck(kc)
if err != nil {
log.Errorln("Error converting unstructured object to khcheck:", err)
continue
}
itemMapName := khCheck.Namespace + "/" + khCheck.Name
if itemMapName == mapName {
existsInItems = true
break
}
}
if !existsInItems {
log.Debugln("Detected khcheck deletion for", mapName)
delete(knownSettings, mapName)
foundChange = true
}
}
for _, kc := range khChecks.Items {
i, err := convertUnstructuredKhCheck(kc)
if err != nil {
log.Errorln("Error converting unstructured object to khcheck:", err)
continue
}
mapName := i.Namespace + "/" + i.Name
log.Debugln("Scanning khcheck CRD", mapName, "for changes since last seen...")
if len(i.Namespace) < 1 {
log.Warning("Got khcheck update from object with no namespace...")
continue
}
if len(i.Name) < 1 {
log.Warning("Got khcheck update from object with no name...")
continue
}
// if we don't know about this check yet, just store the state and continue. The check is already
// loaded on the first check configuration run.
_, exists := knownSettings[mapName]
if !exists {
log.Debugln("First time seeing khcheck of name", mapName)
knownSettings[mapName] = i.Spec
foundChange = true
}
// check if run interval has changed
if knownSettings[mapName].RunInterval != i.Spec.RunInterval {
log.Debugln("The khcheck run interval for", mapName, "has changed.")
foundChange = true
}
// check if run timeout has changed
if knownSettings[mapName].Timeout != i.Spec.Timeout {
log.Debugln("The khcheck timeout for", mapName, "has changed.")
foundChange = true
}
// check if extraLabels has changed
if !foundChange && !reflect.DeepEqual(knownSettings[mapName].ExtraLabels, i.Spec.ExtraLabels) {
log.Debugln("The khcheck extra labels for", mapName, "has changed.")
foundChange = true
}
// check if extraAnnotations has changed
if !foundChange && !reflect.DeepEqual(knownSettings[mapName].ExtraAnnotations, i.Spec.ExtraAnnotations) {
log.Debugln("The khcheck extra annotations for", mapName, "has changed.")
foundChange = true
}
// check if CheckConfig has changed (PodSpec)
if !foundChange && !reflect.DeepEqual(knownSettings[mapName].PodSpec, i.Spec.PodSpec) {
log.Debugln("The khcheck for", mapName, "has changed.")
foundChange = true
}
// finally, update known settings before continuing to the next interval
knownSettings[mapName] = i.Spec
}
// if a change was detected, we signal the notify channel
if foundChange {
log.Debugln("Signaling that a change was found in external check configuration")
notify <- struct{}{}
}
}
}
// setExternalChecks syncs up the state of the external-checks installed in this
// Kuberhealthy struct.
func (k *Kuberhealthy) addExternalChecks(ctx context.Context) error {
log.Debugln("Fetching khcheck configurations...")
khChecks, err := listUnstructuredKHChecks(ctx)
if err != nil {
return err
}
log.Debugln("Found", len(khChecks.Items), "external checks to load")
// iterate on each check CRD resource and add it as a check
for _, kc := range khChecks.Items {
r, err := convertUnstructuredKhCheck(kc)
if err != nil {
log.Errorln("Error converting unstructured object to khcheck:", err)
continue
}
log.Debugln("Loading check CRD:", r.Name)
log.Debugf("External check custom resource loaded: %v", r)
// create a new kubernetes client for this external checker
log.Infoln("Enabling external check:", r.Name)
c := external.New(kubernetesClient, &r, khCheckClient, khStateClient, cfg.ExternalCheckReportingURL)
// parse the run interval string from the custom resource and setup the run interval
c.RunInterval, err = time.ParseDuration(r.Spec.RunInterval)
if err != nil {
log.Errorln("Error parsing duration for check", c.CheckName, "in namespace", c.Namespace, err)
log.Errorln("Defaulting check to a runtime of ten minutes.")
c.RunInterval = DefaultRunInterval
}
log.Debugln("RunInterval for check:", c.CheckName, "set to", c.RunInterval)
// parse the user specified timeout if present
c.RunTimeout = DefaultTimeout
if len(r.Spec.Timeout) > 0 {
c.RunTimeout, err = time.ParseDuration(r.Spec.Timeout)
if err != nil {
log.Errorln("Error parsing timeout for check", c.CheckName, "in namespace", c.Namespace, err)
log.Errorln("Defaulting check to a timeout of", DefaultTimeout)
}
}
log.Debugln("RunTimeout for check:", c.CheckName, "set to", c.RunTimeout)
// add on extra annotations and labels
if c.ExtraAnnotations != nil {
log.Debugln("External check setting extra annotations:", c.ExtraAnnotations)
c.ExtraAnnotations = r.Spec.ExtraAnnotations
}
if c.ExtraLabels != nil {
log.Debugln("External check setting extra labels:", c.ExtraLabels)
c.ExtraLabels = r.Spec.ExtraLabels
}
log.Debugln("External check labels and annotations:", c.ExtraLabels, c.ExtraAnnotations)
// add the check into the checker
k.AddCheck(c)
}
return nil
}
// addExternalJobs syncs up the state of the all jobs installed in this Kuberhealthy struct.
func (k *Kuberhealthy) configureJob(job khjobv1.KuberhealthyJob) *external.Checker {
log.Debugln("Loading job CRD:", job.Name)
// create a new kubernetes client for this external checker
log.Infoln("Enabling external job:", job.Name)
kj := external.NewJob(kubernetesClient, &job, khJobClient, khStateClient, cfg.ExternalCheckReportingURL)
var err error
// parse the user specified timeout if present
kj.RunTimeout = DefaultTimeout
if len(job.Spec.Timeout) > 0 {
kj.RunTimeout, err = time.ParseDuration(job.Spec.Timeout)
if err != nil {
log.Errorln("Error parsing timeout for check", kj.CheckName, "in namespace", kj.Namespace, err)
log.Errorln("Defaulting check to a timeout of", DefaultTimeout)
}
}
log.Debugln("RunTimeout for job:", kj.CheckName, "set to", kj.RunTimeout)
// add on extra annotations and labels
if kj.ExtraAnnotations != nil {
log.Debugln("External job setting extra annotations:", kj.ExtraAnnotations)
kj.ExtraAnnotations = job.Spec.ExtraAnnotations
}
if kj.ExtraLabels != nil {
log.Debugln("External job setting extra labels:", kj.ExtraLabels)
kj.ExtraLabels = job.Spec.ExtraLabels
}
log.Debugln("External job labels and annotations:", kj.ExtraLabels, kj.ExtraAnnotations)
return kj
}
// triggerKHJob checks if its master, sets the context, and runs the khjob in a goroutine
func (k *Kuberhealthy) triggerKHJob(ctx context.Context, job khjobv1.KuberhealthyJob) {
log.Debugln("khjob trigger, isMaster:", isMaster)
// only the master pod should be running khjobs or khjobs are duplicated
if isMaster {
go k.runJob(ctx, job)
}
}
// StartChecks starts all checks concurrently and ensures they stay running
func (k *Kuberhealthy) StartChecks(ctx context.Context) {
// wait for all check wg to be done, just in case
k.wg.Wait()
log.Infoln("control: Reloading check configuration...")
k.configureChecks(ctx)
// sleep to make a more graceful switch-up during lots of master and check changes coming in
log.Infoln("control:", len(k.Checks), "checks starting!")
// create a context for checks to abort with
checkGroupCtx, cancelFunc := context.WithCancel(ctx)
k.cancelChecksFunc = cancelFunc
// start each check with this check group's context
for _, c := range k.Checks {
k.wg.Add(1)
// start the check in its own routine
go k.runCheck(checkGroupCtx, c)
}
// spin up the khState reaper with a context after checks have been configured and started
log.Infoln("control: reaper starting!")
go k.khStateResourceReaper(ctx)
}
// masterStatusWatcher watches for master change events and updates the global upcomingMasterState along
// with the global lastMasterChangeTime
func (k *Kuberhealthy) masterStatusWatcher(ctx context.Context) {
// continue reconnecting to the api to resume the pod watch if it ends
for {
log.Debugln("master status watcher starting up...")
// don't retry our watch too fast
time.Sleep(time.Second * 5)
// setup a pod watching client for kuberhealthy pods
watcher, err := kubernetesClient.CoreV1().Pods(podNamespace).Watch(ctx, metav1.ListOptions{
LabelSelector: "app=kuberhealthy",
})
if err != nil {
log.Errorln("error when attempting to watch for kuberhealthy pod changes:", err)
continue
}
// watch for the parent context to expire as well as this watch context. if the parent context expires,
// then we stop the watcher. if the watcher context expires, we terminate the go routine to prevent a
// goroutine leak
watcherCtx, watcherCtxCancel := context.WithCancel(context.Background())
go func(watchCtx context.Context, ctx context.Context, watcher watch.Interface) {
select {
case <-watchCtx.Done():
break
case <-ctx.Done():
watcher.Stop()
}
log.Debugln("master status monitor watch stopping")
}(watcherCtx, ctx, watcher)
// on each update from the watch, we re-check our master status.
for range watcher.ResultChan() {
// update the time we last saw a master event
lastMasterChangeTime = time.Now()
// determine if we are becoming master or not
var err error
upcomingMasterState, err = masterCalculation.IAmMaster(kubernetesClient)
if err != nil {
log.Errorln(err)
}
// update the time we last saw a master event
log.Debugln("master status monitor saw a master event")
lastMasterChangeTime = time.Now()
}
// cancel the watcher by revoking its context
watcherCtxCancel()
// if the context has expired, then shut down the master status watcher entirely
select {
case <-ctx.Done():
log.Debugln("master status monitor stopping due to context cancellation")
return
default:
}
}
}
// masterMonitor periodically evaluates the current and upcoming master state
// and makes it so when appropriate
func (k *Kuberhealthy) masterMonitor(ctx context.Context, becameMasterChan chan struct{}, lostMasterChan chan struct{}) {
// watch master pod event changes and recalculate the current master state of this pdo with each
go k.masterStatusWatcher(ctx)
interval := time.Second * 10
ticker := time.NewTicker(interval)
defer ticker.Stop()
// on each tick, we ensure that enough time has passed since the last master change
// event, then we calculate if we should become or lose master.
for range ticker.C {
if time.Now().Sub(lastMasterChangeTime) < interval {
log.Println("control: waiting for master changes to settle...")
continue
}
// dupe the global to prevent races
goingToBeMaster := upcomingMasterState
// stop checks if we are no longer the master
if goingToBeMaster && !isMaster {
becameMasterChan <- struct{}{}
}
// start checks if we are now master
if !goingToBeMaster && isMaster {
lostMasterChan <- struct{}{}
}
// refresh global isMaster state
isMaster = goingToBeMaster
}
}
// runJob runs the job and sets its status
func (k *Kuberhealthy) runJob(ctx context.Context, job khjobv1.KuberhealthyJob) {
log.Infoln("control: Loading job configuration...")
j := k.configureJob(job)
log.Println("Starting kuberhealthy job:", j.CheckNamespace(), "/", j.Name())
// break out if context cancels
select {
case <-ctx.Done():
// we don't need to call a job shutdown here because the same func that cancels this context calls
// shutdown on all the jobs configured in the kuberhealthy struct.
log.Infoln("Shutting down job run due to context cancellation:", j.Name(), "in namespace", j.CheckNamespace())
return
default:
}
// Run the job
log.Infoln("Running job:", j.Name())
// Record job run start time
jobStartTime := time.Now()
// set KHJob phase to running
err := setJobPhase(job.Name, job.Namespace, khjobv1.JobRunning)
if err != nil {
log.Errorln("Error setting job phase:", err)
}
err = j.Run(ctx, kubernetesClient)
if err != nil {
log.Errorln("Error running job:", j.Name(), "in namespace", j.CheckNamespace()+":", err)
if strings.Contains(err.Error(), "pod deleted expectedly") {
log.Infoln("Skipping this job due to expected pod removal before completion")
}
// set any job run errors in the CRD
err = k.setJobExecutionError(j.Name(), j.CheckNamespace(), err)
if err != nil {
log.Errorln("Error setting job execution error:", err)
}
// exit out of this runJob
return
}
log.Debugln("Done running job:", j.Name(), "in namespace", j.CheckNamespace())
// Record job run end time
// Subtract 10 seconds from run time since there are two 5 second sleeps during the job run where kuberhealthy
// waits for all pods to clear before running the check and waits for all pods to exit once the check has finished
// running. Both occur before and after the kh job pod completes its run.
jobRunDuration := time.Now().Sub(jobStartTime) - time.Second*10
// make a new state for this job and fill it from the job's current status
jobDetails, err := getJobState(j)
if err != nil {
log.Errorln("Error setting check state after run:", j.Name(), "in namespace", j.CheckNamespace()+":", err)
}
details := khstatev1.NewWorkloadDetails(khstatev1.KHJob)
details.Namespace = j.CheckNamespace()
details.OK, details.Errors = j.CurrentStatus()
details.RunDuration = jobRunDuration.String()
details.CurrentUUID = jobDetails.CurrentUUID
// Fetch node information from running check pod using kh run uuid
selector := "kuberhealthy-run-id=" + details.CurrentUUID
pod, err := k.fetchPodBySelector(ctx, selector)
if err != nil {
log.Errorln(err)
}
details.Node = pod.Spec.NodeName
log.Debugln("node name:", details.Node, "nodeName", j.Node)
// send data to the metric forwarder if configured
if k.MetricForwarder != nil {
checkStatus := 0
if details.OK {
checkStatus = 1
}
runDuration, err := time.ParseDuration(details.RunDuration)
if err != nil {
log.Errorln("Error parsing run duration", err)
}
tags := map[string]string{
"KuberhealthyPod": details.AuthoritativePod,
"Namespace": j.CheckNamespace(),
"Name": j.Name(),
"Errors": strings.Join(details.Errors, ","),
}
metric := metrics.Metric{
{j.Name() + "." + j.CheckNamespace(): checkStatus},
{"RunDuration." + j.Name() + "." + j.CheckNamespace(): runDuration.Seconds()},
}
err = k.MetricForwarder.Push(metric, tags)
if err != nil {
log.Errorln("Error forwarding metrics", err)
}
}
log.Infoln("Setting state of job", j.Name(), "in namespace", j.CheckNamespace(), "to", details.OK, details.Errors, details.RunDuration, details.CurrentUUID, details.GetKHWorkload())
// store the job state with the CRD
err = k.storeCheckState(j.Name(), j.CheckNamespace(), details)
if err != nil {
log.Errorln("Error storing CRD state for job:", j.Name(), "in namespace", j.CheckNamespace(), err)
}
// set KHJob phase to running:
err = setJobPhase(j.Name(), j.CheckNamespace(), khjobv1.JobCompleted)
if err != nil {
log.Errorln("Error setting job phase:", err)
}
}
// runCheck runs a check on an interval and sets its status each run
func (k *Kuberhealthy) runCheck(ctx context.Context, c *external.Checker) {
log.Println("Starting check:", c.CheckNamespace(), "/", c.Name())
// run on an interval specified by the package
ticker := time.NewTicker(c.Interval())
// run the check forever and write its results to the kuberhealthy
// CRD resource for the check
for {
// break out if context cancels
select {