diff --git a/cmd/kuberhealthy/config.go b/cmd/kuberhealthy/config.go index 9d116169b..63038d150 100644 --- a/cmd/kuberhealthy/config.go +++ b/cmd/kuberhealthy/config.go @@ -7,7 +7,7 @@ import ( "github.com/codingsince1985/checksum" log "github.com/sirupsen/logrus" - yaml "gopkg.in/yaml.v2" + "gopkg.in/yaml.v2" ) // Config holds all configurable options @@ -175,11 +175,15 @@ func configReloadNotifier(ctx context.Context, notifyChan chan struct{}) { default: } - err := cfg.Load(configPath) + log.Debugln("configReloader: loading new configuration") + + // setup config + err := setUpConfig() if err != nil { - log.Errorln("configReloader: Error reloading config:", err) + log.Errorln("configReloader: Error reloading and setting up config:", err) continue } + log.Debugln("configReloader: loaded new configuration:", cfg) // reparse and set logging level parsedLogLevel, err := log.ParseLevel(cfg.LogLevel) diff --git a/cmd/kuberhealthy/main.go b/cmd/kuberhealthy/main.go index 367c5cf26..bb20f753a 100644 --- a/cmd/kuberhealthy/main.go +++ b/cmd/kuberhealthy/main.go @@ -86,17 +86,9 @@ var kubernetesClient *kubernetes.Clientset // Set dynamicClient that represents the client used to watch and list unstructured khchecks var dynamicClient dynamic.Interface -// setUp loads, parses, and sets various Kuberhealthy configurations -- from flags, config values and env vars. -func setUp() error { - - var useDebugMode bool - - // setup flaggy - flaggy.SetDescription("Kuberhealthy is an in-cluster synthetic health checker for Kubernetes.") - flaggy.String(&configPath, "c", "config", "(optional) absolute path to the kuberhealthy config file") - flaggy.Bool(&useDebugMode, "d", "debug", "Set to true to enable debug.") - flaggy.Parse() - +// setUpConfig loads and sets default Kuberhealthy configurations +// Everytime kuberhealthy sees a configuration change, configurations should reload and reset +func setUpConfig() error { cfg = &Config{ kubeConfigFile: filepath.Join(os.Getenv("HOME"), ".kube", "config"), LogLevel: "info", @@ -119,6 +111,24 @@ func setUp() error { } cfg.ExternalCheckReportingURL = externalCheckURL log.Infoln("External check reporting URL set to:", cfg.ExternalCheckReportingURL) + return nil +} + +// setUp loads, parses, and sets various Kuberhealthy configurations -- from flags, config values and env vars. +func setUp() error { + + var useDebugMode bool + + // setup flaggy + flaggy.SetDescription("Kuberhealthy is an in-cluster synthetic health checker for Kubernetes.") + flaggy.String(&configPath, "c", "config", "(optional) absolute path to the kuberhealthy config file") + flaggy.Bool(&useDebugMode, "d", "debug", "Set to true to enable debug.") + flaggy.Parse() + + err := setUpConfig() + if err != nil { + return err + } // parse and set logging level parsedLogLevel, err := log.ParseLevel(cfg.LogLevel) diff --git a/cmd/kuberhealthy/reaper.go b/cmd/kuberhealthy/reaper.go index eda96d3d9..e0850ad2e 100644 --- a/cmd/kuberhealthy/reaper.go +++ b/cmd/kuberhealthy/reaper.go @@ -29,8 +29,8 @@ import ( var ReapCheckerPods map[string]v1.Pod // Default values for reaper configurations -const maxKHJobAgeDefault = time.Minute * 15 -const maxCheckPodAgeDefault = time.Hour * 72 +const minKHJobAge = time.Minute * 5 +const minCheckPodAge = time.Second * 30 const checkReaperRunIntervalDefault = time.Second * 30 type KubernetesAPI struct { @@ -77,6 +77,18 @@ func reaper(ctx context.Context) { log.Infoln("checkReaper: max completed check pod count:", cfg.MaxCompletedPodCount) log.Infoln("checkReaper: max error check pod count:", cfg.MaxErrorPodCount) + // set MaxCheckPodAge to minCheckPodAge before getting reaped if no maxCheckPodAge is set + // Want to make sure the completed pod is around for at least 30s before getting reaped + if cfg.MaxCheckPodAge < minCheckPodAge { + cfg.MaxCheckPodAge = minCheckPodAge + } + + // set MaxKHJobAge to minKHJobAge before getting reaped if no maxCheckPodAge is set + // Want to make sure the completed job is around for at least 5m before getting reaped + if cfg.MaxKHJobAge < minKHJobAge { + cfg.MaxKHJobAge = minKHJobAge + } + // start a new ticker t := time.NewTicker(reaperRunInterval) defer t.Stop() @@ -113,7 +125,7 @@ func runCheckReap(ctx context.Context) { Client: kubernetesClient, } // list checker pods in all namespaces - podList, err := kubeApi.listCheckerPods(ctx, listenNamespace) + podList, err := kubeApi.listCompletedCheckerPods(ctx, listenNamespace) if err != nil { log.Errorln("checkReaper: Failed to list and delete old checker pods", err) } @@ -148,8 +160,8 @@ func runJobReap(ctx context.Context) { log.Infoln("checkReaper: Finished reaping khjobs.") } -// listCheckerPods returns a list of pods with the khcheck name label -func (k *KubernetesAPI) listCheckerPods(ctx context.Context, namespace string) (map[string]v1.Pod, error) { +// listCompletedCheckerPods returns a list of completed (Failed of Succeeded) pods with the khcheck name label +func (k *KubernetesAPI) listCompletedCheckerPods(ctx context.Context, namespace string) (map[string]v1.Pod, error) { log.Infoln("checkReaper: Listing checker pods") ReapCheckerPods = make(map[string]v1.Pod) @@ -164,7 +176,6 @@ func (k *KubernetesAPI) listCheckerPods(ctx context.Context, namespace string) ( for _, p := range pods.Items { if p.Status.Phase == v1.PodSucceeded || p.Status.Phase == v1.PodFailed { - //log.Infoln("Checker pod: ", p.Name, "found in namespace: ", p.Namespace) ReapCheckerPods[p.Name] = p } } @@ -180,8 +191,8 @@ func (k *KubernetesAPI) deleteFilteredCheckerPods(ctx context.Context, client *k for n, v := range reapCheckerPods { // Delete pods older than maxCheckPodAge and is in status Succeeded - if time.Now().Sub(v.CreationTimestamp.Time) > cfg.MaxCheckPodAge && v.Status.Phase == v1.PodSucceeded { - log.Infoln("checkReaper: Found pod older than:", cfg.MaxCheckPodAge, "in status `Succeeded`. Deleting pod:", n) + if v.Status.Phase == v1.PodSucceeded && time.Now().Sub(getPodCompletedTime(v)) > cfg.MaxCheckPodAge { + log.Infoln("checkReaper: Found completed pod older than:", cfg.MaxCheckPodAge, "in status `Succeeded`. Deleting pod:", n) err = k.deletePod(ctx, v) if err != nil { @@ -192,8 +203,8 @@ func (k *KubernetesAPI) deleteFilteredCheckerPods(ctx context.Context, client *k } // Delete failed pods (status Failed) older than maxCheckPodAge - if time.Now().Sub(v.CreationTimestamp.Time) > cfg.MaxCheckPodAge && v.Status.Phase == v1.PodFailed { - log.Infoln("checkReaper: Found pod older than:", cfg.MaxCheckPodAge, "in status `Failed`. Deleting pod:", n) + if v.Status.Phase == v1.PodFailed && time.Now().Sub(getPodCompletedTime(v)) > cfg.MaxCheckPodAge{ + log.Infoln("checkReaper: Found completed pod older than:", cfg.MaxCheckPodAge, "in status `Failed`. Deleting pod:", n) err = k.deletePod(ctx, v) if err != nil { @@ -205,7 +216,7 @@ func (k *KubernetesAPI) deleteFilteredCheckerPods(ctx context.Context, client *k // Delete if there are more than MaxCompletedPodCount checker pods with the same name in status Succeeded that were created more recently // Delete if the checker pod is Failed and there are more than MaxErrorPodCount checker pods of the same type which were created more recently - allCheckPods := getAllPodsWithCheckName(reapCheckerPods, v) + allCheckPods := getAllCompletedPodsWithCheckName(reapCheckerPods, v) if len(allCheckPods) > cfg.MaxCompletedPodCount { failOldCount := 0 @@ -255,8 +266,8 @@ func (k *KubernetesAPI) deleteFilteredCheckerPods(ctx context.Context, client *k return err } -// getAllPodsWithCheckName finds all checker pods for a given khcheck -func getAllPodsWithCheckName(reapCheckerPods map[string]v1.Pod, pod v1.Pod) []v1.Pod { +// getAllCompletedPodsWithCheckName finds all completed checker pods for a given khcheck that are older than minCheckPodAge +func getAllCompletedPodsWithCheckName(reapCheckerPods map[string]v1.Pod, pod v1.Pod) []v1.Pod { var allCheckPods []v1.Pod @@ -264,7 +275,9 @@ func getAllPodsWithCheckName(reapCheckerPods map[string]v1.Pod, pod v1.Pod) []v1 for _, v := range reapCheckerPods { if v.Labels["kuberhealthy-check-name"] == checkName { - allCheckPods = append(allCheckPods, v) + if time.Now().Sub(getPodCompletedTime(v)) > minCheckPodAge { + allCheckPods = append(allCheckPods, v) + } } } @@ -318,3 +331,18 @@ func khJobDelete(client *khjobcrd.KHJobV1Client) error { } return nil } + +// getPodCompletedTime returns the container with the latest finished time. +func getPodCompletedTime(pod v1.Pod) time.Time { + + var podCompletedTime time.Time + for _, cs := range pod.Status.ContainerStatuses { + + finishedTime := cs.State.Terminated.FinishedAt + if finishedTime.After(podCompletedTime) { + podCompletedTime = finishedTime.Time + } + } + + return podCompletedTime +} diff --git a/cmd/kuberhealthy/reaper_test.go b/cmd/kuberhealthy/reaper_test.go index 9967575f4..52201d1d0 100644 --- a/cmd/kuberhealthy/reaper_test.go +++ b/cmd/kuberhealthy/reaper_test.go @@ -64,7 +64,7 @@ func TestParseDurationOrUseDefault(t *testing.T) { } // TestListCheckerPods ensures that only completed (Successful or Failed) kuberhealthy checker pods are listed / returned -func TestListCheckerPods(t *testing.T) { +func TestListCompletedCheckerPods(t *testing.T) { validKHPod := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "valid-kh-pod", @@ -129,24 +129,24 @@ func TestListCheckerPods(t *testing.T) { } } - results, err := api.listCheckerPods(ctx, "") + results, err := api.listCompletedCheckerPods(ctx, "") if err != nil { - t.Fatalf("Error listCheckerPods: %s", err) + t.Fatalf("Error listCompletedCheckerPods: %s", err) } if len(results) != 2 { - t.Fatalf("listCheckerPods failed to list only completed Kuberhealthy pods") + t.Fatalf("listCompletedCheckerPods failed to list only completed Kuberhealthy pods") } if _, exists := results[validKHPod.Name]; !exists { - t.Fatalf("listCheckerPods failed to list %s", validKHPod.Name) + t.Fatalf("listCompletedCheckerPods failed to list %s", validKHPod.Name) } if _, exists := results[anotherValidKHPod.Name]; !exists { - t.Fatalf("listCheckerPods failed to list %s", anotherValidKHPod.Name) + t.Fatalf("listCompletedCheckerPods failed to list %s", anotherValidKHPod.Name) } - t.Logf("listCheckerPods successfully listed only completed Kuberhealthy pods") + t.Logf("listCompletedCheckerPods successfully listed only completed Kuberhealthy pods") } // TestGetAllPodsWithCheckName tests that only pods from the same khcheck get listed @@ -209,7 +209,7 @@ func TestGetAllPodsWithCheckName(t *testing.T) { khCheckerPods[khCheck2.Name] = khCheck2 khCheckerPods[khCheck3.Name] = khCheck3 - results := getAllPodsWithCheckName(khCheckerPods, khCheck) + results := getAllCompletedPodsWithCheckName(khCheckerPods, khCheck) if len(results) != 2 { t.Fatalf("getAllPodsWithCheckName failed to get all pods of the same khcheck")