/
reflector.go
137 lines (115 loc) · 5.23 KB
/
reflector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package main
import (
"strings"
"time"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/cache"
log "github.com/sirupsen/logrus"
khstatev1 "github.com/kuberhealthy/kuberhealthy/v2/pkg/apis/khstate/v1"
"github.com/kuberhealthy/kuberhealthy/v2/pkg/health"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
)
// StateReflector watches the state of khstate objects and stores them in a local cache. Then, when the current
// state of checks is requested, the CurrentStatus func can serve it rapidly from cache. Needs to run in the
// background and can be stopped/started by simply calling `Stop()` on it.
type StateReflector struct {
reflector *cache.Reflector
reflectorSigChan chan struct{} // the channel that indicates when the cache sync should stop
resyncPeriod time.Duration // the period for full API re-syncs
store cache.Store
}
// NewStateReflector creates a new StateReflector for watching the state of khstate resources on the server
func NewStateReflector() *StateReflector {
sr := StateReflector{}
sr.reflectorSigChan = make(chan struct{})
sr.resyncPeriod = time.Minute * 5
// structure the reflector and its required elements
khStateListWatch := cache.NewListWatchFromClient(khStateClient.RESTClient(), stateCRDResource, listenNamespace, fields.Everything())
sr.store = cache.NewStore(cache.MetaNamespaceKeyFunc)
sr.reflector = cache.NewReflector(khStateListWatch, &khstatev1.KuberhealthyState{}, sr.store, sr.resyncPeriod)
return &sr
}
// Stop halts cache sync operations. this is async and we don't know exactly when the sync worker fully stops
func (sr *StateReflector) Stop() {
log.Infoln("khState reflector stopping")
if sr.reflectorSigChan != nil {
sr.reflectorSigChan <- struct{}{}
}
}
// Start begins the store and resync operations in the background
func (sr *StateReflector) Start() {
log.Infoln("khState reflector starting")
sr.reflector.Run(sr.reflectorSigChan)
}
// CurrentStatus returns the current summary of checks as known by the cache.
func (sr *StateReflector) CurrentStatus() health.State {
log.Infoln("khState reflector fetching current status")
state := health.NewState()
// if the store is nil, then we just return a blank slate
if sr.store == nil {
log.Warningln("attempted to fetch CurrentStatus from khStateReflector, but the store was nil")
return state
}
// list all objects from the storage cache
khStateList := sr.store.List()
for i, khStateUndefined := range khStateList {
log.Debugln("state reflector store item from listing:", i, khStateUndefined)
khState, ok := khStateUndefined.(*khstatev1.KuberhealthyState)
if !ok {
log.Warningln("attempted to convert item from state cache reflector to a khstatev1.KuberhealthyState, but the type was invalid")
continue
}
log.Debugln("Getting status of check for web request to status page:", khState.GetName(), khState.GetNamespace())
// skip the check if it has never been run before. This prevents checks that have not yet
// run from showing in the status page.
if len(khState.Spec.AuthoritativePod) == 0 {
log.Debugln("Output for", khState.GetName(), khState.GetNamespace(), "hidden from status page due to blank authoritative pod")
continue
}
// parse check status from CRD and add it to the global status of errors. Skip blank errors
for _, e := range khState.Spec.Errors {
if len(strings.TrimSpace(e)) == 0 {
log.Warningln("Skipped an error that was blank when adding check details to current state.")
continue
}
state.AddError(e)
log.Debugln("Status page: Setting global OK state to false due to check details not being OK")
state.OK = false
}
khWorkload := determineKHWorkload(khState.Name, khState.Namespace)
switch khWorkload {
case khstatev1.KHCheck:
state.CheckDetails[khState.GetNamespace()+"/"+khState.GetName()] = khState.Spec
case khstatev1.KHJob:
state.JobDetails[khState.GetNamespace()+"/"+khState.GetName()] = khState.Spec
}
}
log.Infoln("khState reflector returning current status on", len(state.CheckDetails), "check khStates and", len(state.JobDetails), "job khStates.")
return state
}
// determineKHWorkload uses the name and namespace of the kuberhealthy resource to determine whether its a khjob or khcheck
// This function is necessary for the CurrentStatus() function as getting the KHWorkload from the state spec returns a blank kh workload.
func determineKHWorkload(name string, namespace string) khstatev1.KHWorkload {
var khWorkload khstatev1.KHWorkload
log.Debugln("determineKHWorkload: determining workload:", name)
checkPod, err := khCheckClient.KuberhealthyChecks(namespace).Get(name, v1.GetOptions{})
if err != nil {
if k8sErrors.IsNotFound(err) || strings.Contains(err.Error(), "not found") {
log.Debugln("determineKHWorkload: Not a khcheck.")
}
} else {
log.Debugln("determineKHWorkload: Found khcheck:", checkPod.Name)
return khstatev1.KHCheck
}
_, err = khJobClient.KuberhealthyJobs(namespace).Get(name, v1.GetOptions{})
if err != nil {
if k8sErrors.IsNotFound(err) || strings.Contains(err.Error(), "not found") {
log.Debugln("determineKHWorkload: Not a khjob.")
}
} else {
log.Debugln("determineKHWorkload: Found khjob:", checkPod.Name)
return khstatev1.KHJob
}
return khWorkload
}