/
main.go
268 lines (217 loc) · 8.75 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
// Kuberhealthy is an enhanced health check for Kubernetes clusters.
package main
import (
"context"
"errors"
"fmt"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
"github.com/integrii/flaggy"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
khcheckv1 "github.com/kuberhealthy/kuberhealthy/v2/pkg/apis/khcheck/v1"
khjobv1 "github.com/kuberhealthy/kuberhealthy/v2/pkg/apis/khjob/v1"
khstatev1 "github.com/kuberhealthy/kuberhealthy/v2/pkg/apis/khstate/v1"
"github.com/kuberhealthy/kuberhealthy/v2/pkg/kubeClient"
"github.com/kuberhealthy/kuberhealthy/v2/pkg/masterCalculation"
)
// status represents the current Kuberhealthy OK:Error state
var cfg *Config
var configPath = "/etc/config/kuberhealthy.yaml"
var podNamespace = os.Getenv("POD_NAMESPACE")
var isMaster bool // indicates this instance is the master and should be running checks
var upcomingMasterState bool // the upcoming master state on next interval
var lastMasterChangeTime time.Time // indicates the last time a master change was seen
var listenNamespace string // namespace to listen (watch/get) `khcheck` resources on. If blank, all namespaces will be monitored.
// Interval for how often check pods should get reaped. Default is 30s.
var checkReaperRunInterval = os.Getenv("CHECK_REAPER_RUN_INTERVAL")
var terminationGracePeriod = time.Minute * 5 // keep calibrated with kubernetes terminationGracePeriodSeconds
// the hostname of this pod
var podHostname string
// KHExternalReportingURL is the environment variable key used to override the URL checks will be asked to report in to
const KHExternalReportingURL = "KH_EXTERNAL_REPORTING_URL"
// DefaultRunInterval is the default run interval for checks set by kuberhealthy
const DefaultRunInterval = time.Minute * 10
// DefaultTimeout is the default timeout for external checks
var DefaultTimeout = time.Minute * 5
// KHCheckNameAnnotationKey is the key used in the annotation that holds the check's short name
const KHCheckNameAnnotationKey = "comcast.github.io/check-name"
// khCheckClient is a client for khstate custom resources
var khStateClient *khstatev1.KHStateV1Client
// khStateClient is a client for khcheck custom resources
var khCheckClient *khcheckv1.KHCheckV1Client
// khJobClient is a client for khjob custom resources
var khJobClient *khjobv1.KHJobV1Client
// constants for using the kuberhealthy status CRD
const stateCRDGroup = "comcast.github.io"
const stateCRDVersion = "v1"
const stateCRDResource = "khstates"
// constants for using the kuberhealthy check CRD
const checkCRDGroup = "comcast.github.io"
const checkCRDVersion = "v1"
const checkCRDResource = "khchecks"
// the global kubernetes client
var kubernetesClient *kubernetes.Clientset
// Set dynamicClient that represents the client used to watch and list unstructured khchecks
var dynamicClient dynamic.Interface
// setUpConfig loads and sets default Kuberhealthy configurations
// Everytime kuberhealthy sees a configuration change, configurations should reload and reset
func setUpConfig() error {
cfg = &Config{
kubeConfigFile: filepath.Join(os.Getenv("HOME"), ".kube", "config"),
LogLevel: "info",
}
// attempt to load config file from disk
err := cfg.Load(configPath)
if err != nil {
log.Println("WARNING: Failed to read configuration file from disk:", err)
}
// set env variables into config if specified. otherwise set external check URL to default
externalCheckURL, err := getEnvVar(KHExternalReportingURL)
if err != nil {
if len(podNamespace) == 0 {
return errors.New("KH_EXTERNAL_REPORTING_URL environment variable not set and POD_NAMESPACE environment variable was blank. Could not determine Kuberhealthy callback URL.")
}
log.Infoln("KH_EXTERNAL_REPORTING_URL environment variable not set, using default value")
externalCheckURL = "http://kuberhealthy." + podNamespace + ".svc.cluster.local/externalCheckStatus"
}
cfg.ExternalCheckReportingURL = externalCheckURL
log.Infoln("External check reporting URL set to:", cfg.ExternalCheckReportingURL)
return nil
}
// setUp loads, parses, and sets various Kuberhealthy configurations -- from flags, config values and env vars.
func setUp() error {
var useDebugMode bool
// setup flaggy
flaggy.SetDescription("Kuberhealthy is an in-cluster synthetic health checker for Kubernetes.")
flaggy.String(&configPath, "c", "config", "(optional) absolute path to the kuberhealthy config file")
flaggy.Bool(&useDebugMode, "d", "debug", "Set to true to enable debug.")
flaggy.Parse()
err := setUpConfig()
if err != nil {
return err
}
// parse and set logging level
parsedLogLevel, err := log.ParseLevel(cfg.LogLevel)
if err != nil {
err := fmt.Errorf("unable to parse log-level flag: %s", err)
return err
}
// log to stdout and set the level to info by default
log.SetOutput(os.Stdout)
log.SetLevel(parsedLogLevel)
log.Infoln("Startup Arguments:", os.Args)
// no matter what if user has specified debug leveling, use debug leveling
if useDebugMode {
log.Infoln("Setting debug output on because user specified flag")
log.SetLevel(log.DebugLevel)
}
// Handle force master mode
if cfg.EnableForceMaster == true {
log.Infoln("Enabling forced master mode")
masterCalculation.DebugAlwaysMasterOn()
}
// determine the name of this pod from the POD_NAME environment variable
podHostname, err = getEnvVar("POD_NAME")
if err != nil {
err := fmt.Errorf("failed to determine my hostname: %s", err)
return err
}
// setup all clients
err = initKubernetesClients()
if err != nil {
err := fmt.Errorf("failed to bootstrap kubernetes clients: %s", err)
return err
}
return nil
}
func main() {
// Initial setup before starting Kuberhealthy. Loading, parsing, and setting flags, config values and environment vars.
err := setUp()
if err != nil {
log.Fatalln("Error setting up Kuberhealthy:", err)
}
// Create a new Kuberhealthy struct
kuberhealthy := NewKuberhealthy()
kuberhealthy.ListenAddr = cfg.ListenAddress
// create run context and start listening for shutdown interrupts
khRunCtx, khRunCtxCancelFunc := context.WithCancel(context.Background())
kuberhealthy.shutdownCtxFunc = khRunCtxCancelFunc // load the KH struct with a func to shutdown its control system
go listenForInterrupts(kuberhealthy)
// tell Kuberhealthy to start all checks and master change monitoring
kuberhealthy.Start(khRunCtx)
time.Sleep(time.Second * 90) // give the interrupt handler a period of time to call exit before we shutdown
<-time.After(terminationGracePeriod + (time.Second * 10))
log.Errorln("shutdown: main loop was ready for shutdown for too long. exiting.")
os.Exit(1)
}
// listenForInterrupts watches for termination signals and acts on them
func listenForInterrupts(k *Kuberhealthy) {
// shutdown signal handling
sigChan := make(chan os.Signal, 1)
// register for shutdown events on sigChan
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM, syscall.SIGKILL)
log.Infoln("shutdown: waiting for sigChan notification...")
<-sigChan
log.Infoln("shutdown: Shutting down due to sigChan signal...")
// wait for check to fully shutdown before exiting
doneChan := make(chan struct{})
go k.Shutdown(doneChan)
// wait for checks to be done shutting down before exiting
select {
case <-doneChan:
log.Infoln("shutdown: Shutdown gracefully completed!")
log.Infoln("shutdown: exiting 0")
os.Exit(0)
case <-sigChan:
log.Warningln("shutdown: Shutdown forced from multiple interrupts!")
log.Infoln("shutdown: exiting 1")
os.Exit(1)
case <-time.After(terminationGracePeriod):
log.Errorln("shutdown: Shutdown took too long. Shutting down forcefully!")
log.Infoln("shutdown: exiting 1")
os.Exit(1)
}
}
// initKubernetesClients creates the appropriate CRD clients and kubernetes client to be used in all cases. Issue #181
func initKubernetesClients() error {
// make a new kuberhealthy client
kc, err := kubeClient.Create(cfg.kubeConfigFile)
if err != nil {
return err
}
kubernetesClient = kc
// make a new crd check client
checkClient, err := khcheckv1.Client(cfg.kubeConfigFile)
if err != nil {
return err
}
khCheckClient = checkClient
// make a new crd state client
stateClient, err := khstatev1.Client(cfg.kubeConfigFile)
if err != nil {
return err
}
khStateClient = stateClient
// make a new crd job client
jobClient, err := khjobv1.Client(cfg.kubeConfigFile)
if err != nil {
return err
}
khJobClient = jobClient
// make a dynamicClient for kubernetes unstructured checks
restConfig, err := clientcmd.BuildConfigFromFlags("", configPath)
if err != nil {
log.Fatalln("Failed to build kubernetes configuration from configuration flags:", err)
}
dynamicClient, err = dynamic.NewForConfig(restConfig)
if err != nil {
log.Fatalln("Failed to create kubernetes dynamic client configuration")
}
return nil
}