Skip to content

Commit

Permalink
Merge pull request #955 from kuberhealthy/reaper-bug
Browse files Browse the repository at this point in the history
Address reaper and conifg reloader bug
  • Loading branch information
Eric Greer committed May 27, 2021
2 parents b13c637 + f25cf6b commit 5e27b32
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 36 deletions.
10 changes: 7 additions & 3 deletions cmd/kuberhealthy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/codingsince1985/checksum"
log "github.com/sirupsen/logrus"
yaml "gopkg.in/yaml.v2"
"gopkg.in/yaml.v2"
)

// Config holds all configurable options
Expand Down Expand Up @@ -175,11 +175,15 @@ func configReloadNotifier(ctx context.Context, notifyChan chan struct{}) {
default:
}

err := cfg.Load(configPath)
log.Debugln("configReloader: loading new configuration")

// setup config
err := setUpConfig()
if err != nil {
log.Errorln("configReloader: Error reloading config:", err)
log.Errorln("configReloader: Error reloading and setting up config:", err)
continue
}
log.Debugln("configReloader: loaded new configuration:", cfg)

// reparse and set logging level
parsedLogLevel, err := log.ParseLevel(cfg.LogLevel)
Expand Down
32 changes: 21 additions & 11 deletions cmd/kuberhealthy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,9 @@ var kubernetesClient *kubernetes.Clientset
// Set dynamicClient that represents the client used to watch and list unstructured khchecks
var dynamicClient dynamic.Interface

// setUp loads, parses, and sets various Kuberhealthy configurations -- from flags, config values and env vars.
func setUp() error {

var useDebugMode bool

// setup flaggy
flaggy.SetDescription("Kuberhealthy is an in-cluster synthetic health checker for Kubernetes.")
flaggy.String(&configPath, "c", "config", "(optional) absolute path to the kuberhealthy config file")
flaggy.Bool(&useDebugMode, "d", "debug", "Set to true to enable debug.")
flaggy.Parse()

// setUpConfig loads and sets default Kuberhealthy configurations
// Everytime kuberhealthy sees a configuration change, configurations should reload and reset
func setUpConfig() error {
cfg = &Config{
kubeConfigFile: filepath.Join(os.Getenv("HOME"), ".kube", "config"),
LogLevel: "info",
Expand All @@ -119,6 +111,24 @@ func setUp() error {
}
cfg.ExternalCheckReportingURL = externalCheckURL
log.Infoln("External check reporting URL set to:", cfg.ExternalCheckReportingURL)
return nil
}

// setUp loads, parses, and sets various Kuberhealthy configurations -- from flags, config values and env vars.
func setUp() error {

var useDebugMode bool

// setup flaggy
flaggy.SetDescription("Kuberhealthy is an in-cluster synthetic health checker for Kubernetes.")
flaggy.String(&configPath, "c", "config", "(optional) absolute path to the kuberhealthy config file")
flaggy.Bool(&useDebugMode, "d", "debug", "Set to true to enable debug.")
flaggy.Parse()

err := setUpConfig()
if err != nil {
return err
}

// parse and set logging level
parsedLogLevel, err := log.ParseLevel(cfg.LogLevel)
Expand Down
56 changes: 42 additions & 14 deletions cmd/kuberhealthy/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
var ReapCheckerPods map[string]v1.Pod

// Default values for reaper configurations
const maxKHJobAgeDefault = time.Minute * 15
const maxCheckPodAgeDefault = time.Hour * 72
const minKHJobAge = time.Minute * 5
const minCheckPodAge = time.Second * 30
const checkReaperRunIntervalDefault = time.Second * 30

type KubernetesAPI struct {
Expand Down Expand Up @@ -77,6 +77,18 @@ func reaper(ctx context.Context) {
log.Infoln("checkReaper: max completed check pod count:", cfg.MaxCompletedPodCount)
log.Infoln("checkReaper: max error check pod count:", cfg.MaxErrorPodCount)

// set MaxCheckPodAge to minCheckPodAge before getting reaped if no maxCheckPodAge is set
// Want to make sure the completed pod is around for at least 30s before getting reaped
if cfg.MaxCheckPodAge < minCheckPodAge {
cfg.MaxCheckPodAge = minCheckPodAge
}

// set MaxKHJobAge to minKHJobAge before getting reaped if no maxCheckPodAge is set
// Want to make sure the completed job is around for at least 5m before getting reaped
if cfg.MaxKHJobAge < minKHJobAge {
cfg.MaxKHJobAge = minKHJobAge
}

// start a new ticker
t := time.NewTicker(reaperRunInterval)
defer t.Stop()
Expand Down Expand Up @@ -113,7 +125,7 @@ func runCheckReap(ctx context.Context) {
Client: kubernetesClient,
}
// list checker pods in all namespaces
podList, err := kubeApi.listCheckerPods(ctx, listenNamespace)
podList, err := kubeApi.listCompletedCheckerPods(ctx, listenNamespace)
if err != nil {
log.Errorln("checkReaper: Failed to list and delete old checker pods", err)
}
Expand Down Expand Up @@ -148,8 +160,8 @@ func runJobReap(ctx context.Context) {
log.Infoln("checkReaper: Finished reaping khjobs.")
}

// listCheckerPods returns a list of pods with the khcheck name label
func (k *KubernetesAPI) listCheckerPods(ctx context.Context, namespace string) (map[string]v1.Pod, error) {
// listCompletedCheckerPods returns a list of completed (Failed of Succeeded) pods with the khcheck name label
func (k *KubernetesAPI) listCompletedCheckerPods(ctx context.Context, namespace string) (map[string]v1.Pod, error) {
log.Infoln("checkReaper: Listing checker pods")

ReapCheckerPods = make(map[string]v1.Pod)
Expand All @@ -164,7 +176,6 @@ func (k *KubernetesAPI) listCheckerPods(ctx context.Context, namespace string) (

for _, p := range pods.Items {
if p.Status.Phase == v1.PodSucceeded || p.Status.Phase == v1.PodFailed {
//log.Infoln("Checker pod: ", p.Name, "found in namespace: ", p.Namespace)
ReapCheckerPods[p.Name] = p
}
}
Expand All @@ -180,8 +191,8 @@ func (k *KubernetesAPI) deleteFilteredCheckerPods(ctx context.Context, client *k
for n, v := range reapCheckerPods {

// Delete pods older than maxCheckPodAge and is in status Succeeded
if time.Now().Sub(v.CreationTimestamp.Time) > cfg.MaxCheckPodAge && v.Status.Phase == v1.PodSucceeded {
log.Infoln("checkReaper: Found pod older than:", cfg.MaxCheckPodAge, "in status `Succeeded`. Deleting pod:", n)
if v.Status.Phase == v1.PodSucceeded && time.Now().Sub(getPodCompletedTime(v)) > cfg.MaxCheckPodAge {
log.Infoln("checkReaper: Found completed pod older than:", cfg.MaxCheckPodAge, "in status `Succeeded`. Deleting pod:", n)

err = k.deletePod(ctx, v)
if err != nil {
Expand All @@ -192,8 +203,8 @@ func (k *KubernetesAPI) deleteFilteredCheckerPods(ctx context.Context, client *k
}

// Delete failed pods (status Failed) older than maxCheckPodAge
if time.Now().Sub(v.CreationTimestamp.Time) > cfg.MaxCheckPodAge && v.Status.Phase == v1.PodFailed {
log.Infoln("checkReaper: Found pod older than:", cfg.MaxCheckPodAge, "in status `Failed`. Deleting pod:", n)
if v.Status.Phase == v1.PodFailed && time.Now().Sub(getPodCompletedTime(v)) > cfg.MaxCheckPodAge{
log.Infoln("checkReaper: Found completed pod older than:", cfg.MaxCheckPodAge, "in status `Failed`. Deleting pod:", n)

err = k.deletePod(ctx, v)
if err != nil {
Expand All @@ -205,7 +216,7 @@ func (k *KubernetesAPI) deleteFilteredCheckerPods(ctx context.Context, client *k

// Delete if there are more than MaxCompletedPodCount checker pods with the same name in status Succeeded that were created more recently
// Delete if the checker pod is Failed and there are more than MaxErrorPodCount checker pods of the same type which were created more recently
allCheckPods := getAllPodsWithCheckName(reapCheckerPods, v)
allCheckPods := getAllCompletedPodsWithCheckName(reapCheckerPods, v)
if len(allCheckPods) > cfg.MaxCompletedPodCount {

failOldCount := 0
Expand Down Expand Up @@ -255,16 +266,18 @@ func (k *KubernetesAPI) deleteFilteredCheckerPods(ctx context.Context, client *k
return err
}

// getAllPodsWithCheckName finds all checker pods for a given khcheck
func getAllPodsWithCheckName(reapCheckerPods map[string]v1.Pod, pod v1.Pod) []v1.Pod {
// getAllCompletedPodsWithCheckName finds all completed checker pods for a given khcheck that are older than minCheckPodAge
func getAllCompletedPodsWithCheckName(reapCheckerPods map[string]v1.Pod, pod v1.Pod) []v1.Pod {

var allCheckPods []v1.Pod

checkName := pod.Annotations["comcast.github.io/check-name"]

for _, v := range reapCheckerPods {
if v.Labels["kuberhealthy-check-name"] == checkName {
allCheckPods = append(allCheckPods, v)
if time.Now().Sub(getPodCompletedTime(v)) > minCheckPodAge {
allCheckPods = append(allCheckPods, v)
}
}
}

Expand Down Expand Up @@ -318,3 +331,18 @@ func khJobDelete(client *khjobcrd.KHJobV1Client) error {
}
return nil
}

// getPodCompletedTime returns the container with the latest finished time.
func getPodCompletedTime(pod v1.Pod) time.Time {

var podCompletedTime time.Time
for _, cs := range pod.Status.ContainerStatuses {

finishedTime := cs.State.Terminated.FinishedAt
if finishedTime.After(podCompletedTime) {
podCompletedTime = finishedTime.Time
}
}

return podCompletedTime
}
16 changes: 8 additions & 8 deletions cmd/kuberhealthy/reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestParseDurationOrUseDefault(t *testing.T) {
}

// TestListCheckerPods ensures that only completed (Successful or Failed) kuberhealthy checker pods are listed / returned
func TestListCheckerPods(t *testing.T) {
func TestListCompletedCheckerPods(t *testing.T) {
validKHPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "valid-kh-pod",
Expand Down Expand Up @@ -129,24 +129,24 @@ func TestListCheckerPods(t *testing.T) {
}
}

results, err := api.listCheckerPods(ctx, "")
results, err := api.listCompletedCheckerPods(ctx, "")
if err != nil {
t.Fatalf("Error listCheckerPods: %s", err)
t.Fatalf("Error listCompletedCheckerPods: %s", err)
}

if len(results) != 2 {
t.Fatalf("listCheckerPods failed to list only completed Kuberhealthy pods")
t.Fatalf("listCompletedCheckerPods failed to list only completed Kuberhealthy pods")
}

if _, exists := results[validKHPod.Name]; !exists {
t.Fatalf("listCheckerPods failed to list %s", validKHPod.Name)
t.Fatalf("listCompletedCheckerPods failed to list %s", validKHPod.Name)
}

if _, exists := results[anotherValidKHPod.Name]; !exists {
t.Fatalf("listCheckerPods failed to list %s", anotherValidKHPod.Name)
t.Fatalf("listCompletedCheckerPods failed to list %s", anotherValidKHPod.Name)
}

t.Logf("listCheckerPods successfully listed only completed Kuberhealthy pods")
t.Logf("listCompletedCheckerPods successfully listed only completed Kuberhealthy pods")
}

// TestGetAllPodsWithCheckName tests that only pods from the same khcheck get listed
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestGetAllPodsWithCheckName(t *testing.T) {
khCheckerPods[khCheck2.Name] = khCheck2
khCheckerPods[khCheck3.Name] = khCheck3

results := getAllPodsWithCheckName(khCheckerPods, khCheck)
results := getAllCompletedPodsWithCheckName(khCheckerPods, khCheck)

if len(results) != 2 {
t.Fatalf("getAllPodsWithCheckName failed to get all pods of the same khcheck")
Expand Down

0 comments on commit 5e27b32

Please sign in to comment.