Skip to content

Commit

Permalink
Add status to khcheck and khjob
Browse files Browse the repository at this point in the history
Fixes kuberhealthy#1120

Signed-off-by: Ugur Zongur <104643878+ugurzongur@users.noreply.github.com>
  • Loading branch information
ugurzongur committed May 4, 2023
1 parent b489fb6 commit 421bccf
Show file tree
Hide file tree
Showing 15 changed files with 459 additions and 33 deletions.
88 changes: 88 additions & 0 deletions .ci/_job_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/bin/bash

# ========== Utility functions ==========
function get_khjob_status_ok {
kubectl get khjob -n $NS kh-test-job -ojsonpath='{.status.ok}'
}

function get_khstate_ok {
kubectl get khstate -n $NS kh-test-job -ojsonpath='{.spec.OK}'
}

function job_phase {
kubectl get khjob -n $NS kh-test-job -ojsonpath='{.spec.phase}'
}

function fail_test {
# Print debug_information
echo ---
kubectl get khjob -n $NS kh-test-job -oyaml
echo ---
kubectl get khstate -n $NS kh-test-job -oyaml
exit 1
}

echo ========== Job E2E test - Job successful case ==========
sed s/REPORT_FAILURE_VALUE/false/ .ci/khjob.yaml |kubectl apply -n $NS -f-

if [ "$(get_khjob_status_ok)" != "" ]; then
echo "There should not be any OK field initially"; fail_test
fi

if [ "$(job_phase)" != "Running" ]; then
echo "Job should be in running phase"; fail_test
fi

# Wait until the field is available
TIMEOUT=30
while [ "$(job_phase)" == "Running" ] && [ $TIMEOUT -gt 0 ]; do sleep 1; echo Job phase: $(job_phase), timeout: ${TIMEOUT}; let TIMEOUT-=1; done

# Check the result
if [ "$(get_khjob_status_ok)" != "true" ]; then
echo "khjob status should have returned OK"; fail_test
fi

if [ "$(get_khstate_ok)" != "true" ]; then
echo "khstate should have returned OK"; fail_test
fi

if [ "$(job_phase)" != "Completed" ]; then
echo "Job phase should be Completed."; fail_test
fi

# Delete the job
kubectl delete khjob -n $NS kh-test-job



echo ========== Job E2E test - Job fail case ==========

sed s/REPORT_FAILURE_VALUE/true/ .ci/khjob.yaml |kubectl apply -n $NS -f-

if [ "$(get_khjob_status_ok)" != "" ]; then
echo "There should not be any OK field initially"; fail_test
fi

if [ "$(job_phase)" != "Running" ]; then
echo "Job should be in running phase"; fail_test
fi

# Wait until the field is available
TIMEOUT=30
while [ "$(job_phase)" == "Running" ] && [ $TIMEOUT -gt 0 ]; do sleep 1; echo Job phase: $(job_phase), timeout: ${TIMEOUT}; let TIMEOUT-=1; done

# Check the result
if [ "$(get_khjob_status_ok)" != "false" ]; then
echo "khjob status should have NOT returned OK"; fail_test
fi

if [ "$(get_khstate_ok)" != "false" ]; then
echo "khstate should have NOT returned OK"; fail_test
fi

if [ "$(job_phase)" != "Completed" ]; then
echo "Job phase should be Completed."; fail_test
fi

# Delete the job
kubectl delete khjob -n $NS kh-test-job
5 changes: 4 additions & 1 deletion .ci/e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,15 @@ kubectl logs -n $NS --selector $selector
for i in {1..60}
do
khsCount=$(kubectl get -n $NS khs -o yaml |grep "OK: true" |wc -l)
kcStatusCount=$(kubectl get -n $NS khcheck -o yaml |grep "ok: true" |wc -l)
cDeploy=$(kubectl -n $NS get pods -l app=kuberhealthy-check |grep deployment |grep Completed |wc -l)
cDNS=$(kubectl -n $NS get pods -l app=kuberhealthy-check |grep dns-status-internal |grep Completed |wc -l)
cDS=$(kubectl -n $NS get pods -l app=kuberhealthy-check |grep daemonset |grep Completed |wc -l)
cPR=$(kubectl -n $NS get pods -l app=kuberhealthy-check |grep pod-restarts |grep Completed |wc -l)
cPS=$(kubectl -n $NS get pods -l app=kuberhealthy-check |grep pod-status |grep Completed |wc -l)
failCount=$(kubectl get -n $NS khs -o yaml |grep "OK: false" |wc -l)

if [ $khsCount -ge 5 ] && [ $cDeploy -ge 1 ] && [ $cDS -ge 1 ] && [ $cDNS -ge 1 ] && [ $cPR -ge 1 ] && [ $cPS -ge 1 ]
if [ $khsCount -ge 5 ] && [ $khsCount -eq $kcStatusCount ] && [ $cDeploy -ge 1 ] && [ $cDS -ge 1 ] && [ $cDNS -ge 1 ] && [ $cPR -ge 1 ] && [ $cPS -ge 1 ]
then
echo "Kuberhealthy is working like it should and all tests passed"
break
Expand Down Expand Up @@ -113,3 +114,5 @@ then
else
echo "No Error deployment pods found"
fi

. .ci/_job_test.sh
20 changes: 20 additions & 0 deletions .ci/khjob.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: comcast.github.io/v1
kind: KuberhealthyJob
metadata:
name: kh-test-job
spec:
timeout: 2m
podSpec:
containers:
- env:
- name: REPORT_FAILURE
value: "REPORT_FAILURE_VALUE"
- name: REPORT_DELAY
value: 5s
image: kuberhealthy/test-check:latest
imagePullPolicy: Always
name: main
resources:
requests:
cpu: 10m
memory: 50Mi
3 changes: 2 additions & 1 deletion CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@
- [Joel Kulesa](https://github.com/jkulesa)
- [McKenna Jones](https://github.com/mckennajones)
- [Allan Ramirez](https://github.com/ramirezag)
- [Erich Stoekl](https://github.com/erichstoekl)
- [Erich Stoekl](https://github.com/erichstoekl)
- [Ugur Zongur](https://github.com/ugurzongur)
14 changes: 2 additions & 12 deletions cmd/kuberhealthy/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ func setCheckStateResource(checkName string, checkNamespace string, state khstat
}
resourceVersion := existingState.GetResourceVersion()

// set the pod name that wrote the khstate
state.AuthoritativePod = podHostname
now := metav1.Now() // set the time the khstate was last
state.LastRun = &now

khState := khstatev1.NewKuberhealthyState(name, state)
khState.SetResourceVersion(resourceVersion)
// TODO - if "try again" message found in error, then try again
Expand Down Expand Up @@ -127,18 +122,13 @@ func getJobState(j *external.Checker) (khstatev1.WorkloadDetails, error) {

// setJobPhase updates the kuberhealthy job phase depending on the state of its run.
func setJobPhase(jobName string, jobNamespace string, jobPhase khjobv1.JobPhase) error {

kj, err := khJobClient.KuberhealthyJobs(jobNamespace).Get(jobName, metav1.GetOptions{})
if err != nil {
log.Errorln("error getting khjob:", jobName, err)
return err
}
resourceVersion := kj.GetResourceVersion()
updatedJob := khjobv1.NewKuberhealthyJob(jobName, jobNamespace, kj.Spec)
updatedJob.SetResourceVersion(resourceVersion)
log.Infoln("Setting khjob phase to:", jobPhase)
updatedJob.Spec.Phase = jobPhase

_, err = khJobClient.KuberhealthyJobs(jobNamespace).Update(&updatedJob)
kj.Spec.Phase = jobPhase
_, err = khJobClient.KuberhealthyJobs(jobNamespace).Update(&kj)
return err
}
103 changes: 88 additions & 15 deletions cmd/kuberhealthy/kuberhealthy.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (k *Kuberhealthy) setCheckExecutionError(checkName string, checkNamespace s
log.Debugln("Setting execution state of check", checkName, "to", details.OK, details.Errors, details.CurrentUUID, details.GetKHWorkload())

// store the check state with the CRD
err = k.storeCheckState(checkName, checkNamespace, details)
err = k.updateWorkloadStatus(checkName, checkNamespace, details)
if err != nil {
return fmt.Errorf("Was unable to write an execution error to the CRD status with error: %w", err)
}
Expand Down Expand Up @@ -116,7 +116,7 @@ func (k *Kuberhealthy) setJobExecutionError(jobName string, jobNamespace string,
log.Debugln("Setting execution state of job", jobName, "to", details.OK, details.Errors, details.CurrentUUID, details.GetKHWorkload())

// store the check state with the CRD
err = k.storeCheckState(jobName, jobNamespace, details)
err = k.updateWorkloadStatus(jobName, jobNamespace, details)
if err != nil {
return fmt.Errorf("Was unable to write an execution error to the CRD status with error: %w", err)
}
Expand All @@ -135,7 +135,7 @@ func (k *Kuberhealthy) Shutdown(doneChan chan struct{}) {
log.Infoln("shutdown: aborting control context")
k.shutdownCtxFunc() // stop the control system
}
time.Sleep(5) // help prevent more checks from starting in a race before control system stop happens
time.Sleep(time.Second * 5) // help prevent more checks from starting in a race before control system stop happens
log.Infoln("shutdown: stopping checks")
k.StopChecks() // stop all checks
log.Infoln("shutdown: ready for main program shutdown")
Expand Down Expand Up @@ -848,7 +848,7 @@ func (k *Kuberhealthy) masterMonitor(ctx context.Context, becameMasterChan chan
// event, then we calculate if we should become or lose master.
for range ticker.C {

if time.Now().Sub(lastMasterChangeTime) < interval {
if time.Since(lastMasterChangeTime) < interval {
log.Println("control: waiting for master changes to settle...")
continue
}
Expand Down Expand Up @@ -918,7 +918,7 @@ func (k *Kuberhealthy) runJob(ctx context.Context, job khjobv1.KuberhealthyJob)
// Subtract 10 seconds from run time since there are two 5 second sleeps during the job run where kuberhealthy
// waits for all pods to clear before running the check and waits for all pods to exit once the check has finished
// running. Both occur before and after the kh job pod completes its run.
jobRunDuration := time.Now().Sub(jobStartTime) - time.Second*10
jobRunDuration := time.Since(jobStartTime) - time.Second*10

// make a new state for this job and fill it from the job's current status
jobDetails, err := getJobState(j)
Expand All @@ -930,6 +930,10 @@ func (k *Kuberhealthy) runJob(ctx context.Context, job khjobv1.KuberhealthyJob)
details.OK, details.Errors = j.CurrentStatus()
details.RunDuration = jobRunDuration.String()
details.CurrentUUID = jobDetails.CurrentUUID
details.AuthoritativePod = podHostname

lastRun := metav1.Now()
details.LastRun = &lastRun

// Fetch node information from running check pod using kh run uuid
selector := "kuberhealthy-run-id=" + details.CurrentUUID
Expand Down Expand Up @@ -972,12 +976,12 @@ func (k *Kuberhealthy) runJob(ctx context.Context, job khjobv1.KuberhealthyJob)
log.Infoln("Setting state of job", j.Name(), "in namespace", j.CheckNamespace(), "to", details.OK, details.Errors, details.RunDuration, details.CurrentUUID, details.GetKHWorkload())

// store the job state with the CRD
err = k.storeCheckState(j.Name(), j.CheckNamespace(), details)
err = k.updateWorkloadStatus(j.Name(), j.CheckNamespace(), details)
if err != nil {
log.Errorln("Error storing CRD state for job:", j.Name(), "in namespace", j.CheckNamespace(), err)
}

// set KHJob phase to running:
// set KHJob phase to completed:
err = setJobPhase(j.Name(), j.CheckNamespace(), khjobv1.JobCompleted)
if err != nil {
log.Errorln("Error setting job phase:", err)
Expand Down Expand Up @@ -1031,7 +1035,7 @@ func (k *Kuberhealthy) runCheck(ctx context.Context, c *external.Checker) {
// Subtract 10 seconds from run time since there are two 5 second sleeps during the check run where kuberhealthy
// waits for all pods to clear before running the check and waits for all pods to exit once the check has finished
// running. Both occur before and after the checker pod completes its run.
checkRunDuration := time.Now().Sub(checkStartTime) - time.Second*10
checkRunDuration := time.Since(checkStartTime) - time.Second*10

// make a new state for this check and fill it from the check's current status
checkDetails, err := getCheckState(c)
Expand All @@ -1043,6 +1047,10 @@ func (k *Kuberhealthy) runCheck(ctx context.Context, c *external.Checker) {
details.OK, details.Errors = c.CurrentStatus()
details.RunDuration = checkRunDuration.String()
details.CurrentUUID = checkDetails.CurrentUUID
details.AuthoritativePod = podHostname

lastRun := metav1.Now()
details.LastRun = &lastRun

// Fetch node information from running check pod using kh run uuid
selector := "kuberhealthy-run-id=" + details.CurrentUUID
Expand Down Expand Up @@ -1085,7 +1093,7 @@ func (k *Kuberhealthy) runCheck(ctx context.Context, c *external.Checker) {
log.Infoln("Setting state of check", c.Name(), "in namespace", c.CheckNamespace(), "to", details.OK, details.Errors, details.RunDuration, details.CurrentUUID, details.GetKHWorkload())

// store the check state with the CRD
err = k.storeCheckState(c.Name(), c.CheckNamespace(), details)
err = k.updateWorkloadStatus(c.Name(), c.CheckNamespace(), details)
if err != nil {
log.Errorln("Error storing CRD state for check:", c.Name(), "in namespace", c.CheckNamespace(), err)
}
Expand All @@ -1095,17 +1103,78 @@ func (k *Kuberhealthy) runCheck(ctx context.Context, c *external.Checker) {
}
}

// storeCheckState stores the check state in its cluster CRD
func (k *Kuberhealthy) storeCheckState(checkName string, checkNamespace string, details khstatev1.WorkloadDetails) error {
func (k *Kuberhealthy) updateWorkloadStatus(name string, namespace string, details khstatev1.WorkloadDetails) error {
var updateFunction func() error
khWorkload := details.GetKHWorkload()
switch khWorkload {
case khstatev1.KHCheck:
status := khcheckv1.CheckStatus{}
status.OK = details.OK
status.Errors = details.Errors
status.RunDuration = details.RunDuration
status.Node = details.Node
status.LastRun = details.LastRun
status.AuthoritativePod = details.AuthoritativePod
status.CurrentUUID = details.CurrentUUID
updateFunction = func() error { return k.updateCheckStatus(name, namespace, status) }
case khstatev1.KHJob:
status := khjobv1.JobStatus{}
status.OK = details.OK
status.Errors = details.Errors
status.RunDuration = details.RunDuration
status.Node = details.Node
status.LastRun = details.LastRun
status.AuthoritativePod = details.AuthoritativePod
status.CurrentUUID = details.CurrentUUID
updateFunction = func() error { return k.updateJobStatus(name, namespace, status) }
}
resourceDescription := fmt.Sprintf("%s %s/%s", khWorkload, namespace, name)
err := updateRetryingObjectModifiedError(resourceDescription, updateFunction)
if err != nil {
log.Errorln("error updating state:", khWorkload, namespace, name, err)
return err
}

// Now store the state custom resource
return k.storeKHState(name, namespace, details)
}

func (k *Kuberhealthy) updateCheckStatus(name string, namespace string, status khcheckv1.CheckStatus) error {
check, err := khCheckClient.KuberhealthyChecks(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return err
}
check.Status = status
_, err = khCheckClient.KuberhealthyChecks(namespace).UpdateStatus(&check)
return err
}

func (k *Kuberhealthy) updateJobStatus(name string, namespace string, status khjobv1.JobStatus) error {
job, err := khJobClient.KuberhealthyJobs(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return err
}
job.Status = status
_, err = khJobClient.KuberhealthyJobs(namespace).UpdateStatus(&job)
return err
}

// storeKHState stores the check state in its cluster CRD
func (k *Kuberhealthy) storeKHState(checkName string, checkNamespace string, details khstatev1.WorkloadDetails) error {

// ensure the CRD resource exits
err := ensureStateResourceExists(checkName, checkNamespace, details.GetKHWorkload())
if err != nil {
return err
}

resourceDescription := fmt.Sprintf("khstate %s/%s", checkNamespace, checkName)
return updateRetryingObjectModifiedError(resourceDescription, func() error { return setCheckStateResource(checkName, checkNamespace, details) })
}

func updateRetryingObjectModifiedError(resourceDescription string, updateFunction func() error) error {
// put the status on the CRD from the check
err = setCheckStateResource(checkName, checkNamespace, details)
err := updateFunction()

//TODO: Make this retry of updating custom resources repeatable
//
Expand All @@ -1121,7 +1190,7 @@ func (k *Kuberhealthy) storeCheckState(checkName string, checkNamespace string,

// if too many retires have occurred, we fail up the stack further
if tries > maxTries {
return fmt.Errorf("failed to update khstate for check %s in namespace %s after %d with error %w", checkName, checkNamespace, maxTries, err)
return fmt.Errorf("failed to update %s after %d with error %w", resourceDescription, maxTries, err)
}
log.Infoln("Failed to update khstate for check because object was modified by another process. Retrying in " + delay.String() + ". Try " + strconv.Itoa(tries) + " of " + strconv.Itoa(maxTries) + ".")

Expand All @@ -1130,7 +1199,7 @@ func (k *Kuberhealthy) storeCheckState(checkName string, checkNamespace string,
delay = delay + delay

// try setting the check state again
err = setCheckStateResource(checkName, checkNamespace, details)
err = updateFunction()

// count how many times we've retried
tries++
Expand Down Expand Up @@ -1459,10 +1528,14 @@ func (k *Kuberhealthy) externalCheckReportHandler(w http.ResponseWriter, r *http
details.RunDuration = checkRunDuration
details.Namespace = podReport.Namespace
details.CurrentUUID = podReport.UUID
details.AuthoritativePod = podHostname

lastRun := metav1.Now()
details.LastRun = &lastRun

// since the check is validated, we can proceed to update the status now
k.externalCheckReportHandlerLog(requestID, "Setting check with name", podReport.Name, "in namespace", podReport.Namespace, "to 'OK' state:", details.OK, "uuid", details.CurrentUUID, details.GetKHWorkload())
err = k.storeCheckState(podReport.Name, podReport.Namespace, details)
err = k.updateWorkloadStatus(podReport.Name, podReport.Namespace, details)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
k.externalCheckReportHandlerLog(requestID, "failed to store check state for %s: %w", podReport.Name, err)
Expand Down
Loading

0 comments on commit 421bccf

Please sign in to comment.