Skip to content

Commit

Permalink
Merge pull request #1180 from p0lyn0mial/sno-readiness-checks
Browse files Browse the repository at this point in the history
introduces KubeAPIReadinessChecker used by startup monitor to assess Kube API server readiness/health condition
  • Loading branch information
openshift-merge-robot committed Jul 22, 2021
2 parents 0763c5d + abb0257 commit e1b7cc3
Show file tree
Hide file tree
Showing 18 changed files with 1,856 additions and 4 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/openshift/api v0.0.0-20210706092853-b63d499a70ce
github.com/openshift/build-machinery-go v0.0.0-20210423112049-9415d7ebd33e
github.com/openshift/client-go v0.0.0-20210521082421-73d9475a9142
github.com/openshift/library-go v0.0.0-20210715155611-70a39c8ba7a1
github.com/openshift/library-go v0.0.0-20210720093535-f8ed43828870
github.com/pkg/profile v1.5.0 // indirect
github.com/prometheus-operator/prometheus-operator/pkg/client v0.45.0
github.com/prometheus/client_golang v1.7.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ github.com/openshift/client-go v0.0.0-20210521082421-73d9475a9142 h1:ZHRIMCFIJN1
github.com/openshift/client-go v0.0.0-20210521082421-73d9475a9142/go.mod h1:fjS8r9mqDVsPb5td3NehsNOAWa4uiFkYEfVZioQ2gH0=
github.com/openshift/kubernetes-apiserver v0.0.0-20210419140141-620426e63a99 h1:KrCYRAJcgZYzMCB1PjJHJMYPu/d+dEkelq5eYyi0fDw=
github.com/openshift/kubernetes-apiserver v0.0.0-20210419140141-620426e63a99/go.mod h1:w2YSn4/WIwYuxG5zJmcqtRdtqgW/J2JRgFAqps3bBpg=
github.com/openshift/library-go v0.0.0-20210715155611-70a39c8ba7a1 h1:jclNeKbYKeXiq05pt1PdnjcgKV76oqQIcp8h7uqO3V0=
github.com/openshift/library-go v0.0.0-20210715155611-70a39c8ba7a1/go.mod h1:rln3LbFNOpENSvhmsfH7g/hqc58IF78+o96yAAp5mq0=
github.com/openshift/library-go v0.0.0-20210720093535-f8ed43828870 h1:xhtl3hJFfICWRPhLfu0xvX44rhR2Gf91LPRND2TdPPY=
github.com/openshift/library-go v0.0.0-20210720093535-f8ed43828870/go.mod h1:rln3LbFNOpENSvhmsfH7g/hqc58IF78+o96yAAp5mq0=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
Expand Down
324 changes: 324 additions & 0 deletions pkg/operator/startupmonitorreadiness/readiness_checks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
package startupmonitorreadiness

import (
"context"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"time"

"github.com/openshift/cluster-kube-apiserver-operator/pkg/operator/operatorclient"
"github.com/openshift/library-go/pkg/operator/staticpod/startupmonitor"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport"
)

// KubeAPIReadinessChecker is a struct that holds necessary data
// to perform a set of checks against a Kube API server to assess its health condition
type KubeAPIReadinessChecker struct {
// configuration for authN/authZ against the server
// populated from kubeconfig and set by the startup monitor pod
restConfig *rest.Config

// client we use to perform HTTP checks
client *http.Client

// defined here for easier testing
baseRawURL string

kubeClient *kubernetes.Clientset
}

var _ startupmonitor.ReadinessChecker = &KubeAPIReadinessChecker{}
var _ startupmonitor.WantsRestConfig = &KubeAPIReadinessChecker{}

// New creates a new Kube API readiness checker
func New() *KubeAPIReadinessChecker {
return &KubeAPIReadinessChecker{
baseRawURL: "https://localhost:6443",
}
}

// SetRestConfig called by startup monitor to provide a valid configuration for authN/authZ against Kube API server
func (ch *KubeAPIReadinessChecker) SetRestConfig(config *rest.Config) {
ch.restConfig = config

// note that we will be talking to Kube API over localhost and in case of an error/timeout requests will be retired for 5 min.
// setting the global timeout to a short value seems to be fine
ch.restConfig.Timeout = 4 * time.Second

ch.restConfig.Burst = 15
ch.restConfig.QPS = 10
}

// IsReady performs a series of checks for assessing Kube API server readiness condition
func (ch *KubeAPIReadinessChecker) IsReady(ctx context.Context, revision int) ( /*ready*/ bool /*reason*/, string /*message*/, string /*err*/, error) {
if ch.restConfig == nil {
return false, "", "", fmt.Errorf("missing restConfig, use SetRestConfig() metod to set one")

}
if ch.client == nil {
client, err := createHTTPClient(ch.restConfig)
if err != nil {
return false, "", "", fmt.Errorf("failed to create an HTTP client due to %v", err)
}
ch.client = client
}

if ch.kubeClient == nil {
kubeClient, err := kubernetes.NewForConfig(ch.restConfig)
if err != nil {
return false, "", "", fmt.Errorf("failed to create kubernetes clientset due to %v", err)
}
ch.kubeClient = kubeClient
}

// loop through a list of ordered checks for assessing Kube API readiness condition
for _, checkFn := range []func(context.Context) (bool, string, string){
// TODO: watch /var/log/kube-apiserver/termination.log for the first start-up attempt (beware of the race of startup-monitor startup and kube-apiserver startup). Set Reason=NeverStartedUp when this times out.
// TODO: watch /var/log/kube-apiserver/termination.log for more than one start-up attempt. Set Reason=CrashLooping if more than one is found and the monitor times out.

// checks if we are not dealing with the old kas
noOldRevisionPodExists(ch.kubeClient.CoreV1().Pods(operatorclient.TargetNamespace), revision),

// check kube-apiserver /healthz/etcd endpoint
goodHealthzEtcdEndpoint(ch.client, ch.baseRawURL),

// check kube-apiserver /healthz endpoint
goodHealthzEndpoint(ch.client, ch.baseRawURL),

// check kube-apiserver /readyz endpoint
goodReadyzEndpoint(ch.client, ch.baseRawURL, 3, 5*time.Second),

// check if the kas pod is running at the expected revision
newRevisionPodExists(ch.kubeClient.CoreV1().Pods(operatorclient.TargetNamespace), revision),

// check that kubelet has reporting readiness for the new pod
newPodRunning(ch.kubeClient.CoreV1().Pods(operatorclient.TargetNamespace), revision),
} {
select {
case <-ctx.Done():
return false, "", "", ctx.Err()
default:
}

if ready, reason, message := checkFn(ctx); !ready {
return ready, reason, message, nil
}
}

// at this point Kube API is ready!
return true, "", "", nil
}

// newPodRunning checks if kas pod is in PodRunning phase and has PodReady condition set to true
func newPodRunning(podClient corev1client.PodInterface, monitorRevision int) func(context.Context) (bool, string, string) {
return func(ctx context.Context) (bool, string, string) {
apiServerPods, err := podClient.List(ctx, metav1.ListOptions{LabelSelector: "apiserver=true"})
if err != nil {
return false, "PodListError", fmt.Sprintf("unable to check the pod's status, falied to get Kube API server pod due to %v", err)
}
if len(apiServerPods.Items) == 0 {
return false, "PodNotRunning", "unable to check the pod's status, waiting for Kube API server pod to show up"
}
if len(apiServerPods.Items) != 1 {
return false, "PodListError", fmt.Sprintf("unable to check the pod's status, unexpected number of Kube API server pods %d, expected only one pod", len(apiServerPods.Items))
}

kasPod := apiServerPods.Items[0]
if kasPod.Status.Phase != corev1.PodRunning {
return false, "PodNodReady", fmt.Sprintf("waiting for Kube API server pod to be in PodRunning phase, the current phase is %v", kasPod.Status.Phase)
}

if kasPod.Status.Phase == corev1.PodRunning && !func(pod corev1.Pod) bool {
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue {
return true
}
}
return false
}(kasPod) {
return false, "PodNodReady", "waiting for Kube API server pod to have PodReady state set to true"
}

return checkRevision(&kasPod, monitorRevision)
}
}

// newRevisionPodExists check if the kas pod is running at the expected revision
func newRevisionPodExists(podClient corev1client.PodInterface, monitorRevision int) func(context.Context) (bool, string, string) {
return func(ctx context.Context) (bool, string, string) {
return checkRevisionOnPod(ctx, podClient, monitorRevision, true)
}
}

// noOldRevisionPodExists checks if we are not dealing with the old kas
// it is useful when you want to avoid false positive - failing readyz check when the previous instance is still running
//
// note that:
// it won't fail when getting the pod from the api server fails as that might mean the new instance is not ready/healthy
func noOldRevisionPodExists(podClient corev1client.PodInterface, monitorRevision int) func(context.Context) (bool, string, string) {
return func(ctx context.Context) (bool, string, string) {
return checkRevisionOnPod(ctx, podClient, monitorRevision, false)
}
}

// checkRevisionOnPod checks if the kas pod is running at the expected revision
//
// strictMode controls whether a certain errors like: failing to get the pod or absence of the pod should be fatal
// it is useful when you want to avoid false positive - failing readyz check when the previous instance is still running
func checkRevisionOnPod(ctx context.Context, podClient corev1client.PodInterface, monitorRevision int, strictMode bool) (bool, string, string) {
apiServerPods, err := podClient.List(ctx, metav1.ListOptions{LabelSelector: "apiserver=true"})
if err != nil {
return !strictMode, "PodListError", fmt.Sprintf("unable to check a revison, failed to get Kube API server pod due to %v", err)
}
if len(apiServerPods.Items) == 0 {
return !strictMode, "PodNotRunning", "unable to check a revision, waiting for Kube API server pod to show up"
}
if len(apiServerPods.Items) != 1 {
return !strictMode, "PodListError", fmt.Sprintf("unable to check a revision, unexpected number of Kube API server pods %d, expected only one pod", len(apiServerPods.Items))
}

return checkRevision(&apiServerPods.Items[0], monitorRevision)
}

func checkRevision(kasPod *corev1.Pod, monitorRevision int) (bool, string, string) {
revisionString, found := kasPod.Labels["revision"]
if !found {
return false, "InvalidPod", fmt.Sprintf("pod %s doesn't have revision label", kasPod.Name)
}
if len(revisionString) == 0 {
return false, "InvalidRevision", fmt.Sprintf("empty revision label on %s pod", kasPod.Name)
}
revision, err := strconv.Atoi(revisionString)
if err != nil || revision < 0 {
return false, "InvalidRevision", fmt.Sprintf("invalid revision label on pod %s: %q", kasPod.Name, revisionString)
}

if revision != monitorRevision {
return false, "UnexpectedRevision", fmt.Sprintf("the running Kube API (%s) is at unexpected revision %d, expected %d", kasPod.Name, revision, monitorRevision)
}

return true, "", ""
}

// goodReadyzEndpoint performs HTTP checks against readyz?verbose=true endpoint
// returns true, "", "", when we got HTTP 200 "successThreshold" times
// returns false, "NotReady", EntireResponseBody (if any) on HTTP != 200
// returns false, "NotReadyError", EntireResponseBody (if any) in case of any error or timeout
func goodReadyzEndpoint(client *http.Client, rawURL string, successThreshold int, interval time.Duration) func(ctx context.Context) (bool, string, string) {
return func(ctx context.Context) (bool, string, string) {
return doHTTPCheckAndTransform(ctx, client, fmt.Sprintf("%s/readyz?verbose=true", rawURL), "NotReady", doHTTPCheckMultipleTimes(successThreshold, interval))
}
}

// goodHealthzEndpoint performs an HTTP check against healthz?verbose=true endpoint
// returns true, "", "", on HTTP 200
// returns false, "Unhealthy", EntireResponseBody (if any) on HTTP != 200
// returns false, "UnhealthyError", EntireResponseBody (if any) in case of any error or timeout
func goodHealthzEndpoint(client *http.Client, rawURL string) func(context.Context) (bool, string, string) {
return func(ctx context.Context) (bool, string, string) {
return doHTTPCheckAndTransform(ctx, client, fmt.Sprintf("%s/healthz?verbose=true", rawURL), "Unhealthy", doHTTPCheck)
}
}

// goodHealthzEtcdEndpoint performs an HTTP check against healthz/etcd endpoint
// returns true, "", "", on HTTP 200
// returns false, "EtcdUnhealthy", EntireResponseBody (if any) on HTTP != 200
// returns false, "EtcdUnhealthyError", EntireResponseBody (if any) in case of any error or timeout
func goodHealthzEtcdEndpoint(client *http.Client, rawURL string) func(context.Context) (bool, string, string) {
return func(ctx context.Context) (bool, string, string) {
return doHTTPCheckAndTransform(ctx, client, fmt.Sprintf("%s/healthz/etcd", rawURL), "EtcdUnhealthy", doHTTPCheck)
}
}

func doHTTPCheckAndTransform(ctx context.Context, client *http.Client, rawURL string, checkName string, httpCheckFn func(ctx context.Context, client *http.Client, rawURL string) (int, string, error)) (bool, string, string) {
statusCode, response, err := httpCheckFn(ctx, client, rawURL)
if err != nil {
errMsg := err.Error()
if len(response) > 0 {
errMsg = fmt.Sprintf("%v, a response from the server was %v", errMsg, response)
}
return false, fmt.Sprintf("%vError", checkName), errMsg
}
if statusCode != http.StatusOK {
return false, checkName, response
}

return true, "", ""
}

func doHTTPCheck(ctx context.Context, client *http.Client, rawURL string) (int, string, error) {
targetURL, err := url.Parse(rawURL)
if err != nil {
return 0, "", err
}
newReq, err := http.NewRequestWithContext(ctx, "GET", targetURL.String(), nil)
if err != nil {
return 0, "", err
}

resp, err := client.Do(newReq)
if err != nil {
return 0, "", err
}
defer resp.Body.Close()

// we expect small responses from the server
// so it is okay to read the entire body
rawResponse, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, "", fmt.Errorf("error while reading body from %v, err %v", targetURL.String(), err)
}

return resp.StatusCode, string(rawResponse), nil
}

func createHTTPClient(restConfig *rest.Config) (*http.Client, error) {
transportConfig, err := restConfig.TransportConfig()
if err != nil {
return nil, err
}

tlsConfig, err := transport.TLSConfigFor(transportConfig)
if err != nil {
return nil, err
}

client := &http.Client{
Transport: utilnet.SetTransportDefaults(&http.Transport{
TLSClientConfig: tlsConfig,
}),
Timeout: restConfig.Timeout,
}

return client, nil
}

// doHTTPCheckMultipleTimes calls doHTTPCheck "n" times with an "interval" between each invocation
// it stops on a non 200 HTTP status code or when an error is returned from doHTTPCheck method
func doHTTPCheckMultipleTimes(n int, interval time.Duration) func(ctx context.Context, client *http.Client, rawURL string) (int, string, error) {
return func(ctx context.Context, client *http.Client, rawURL string) (int, string, error) {
var lastResponse string
var lastError error
var lastStatusCode int
for i := 1; i <= n; i++ {
lastStatusCode, lastResponse, lastError = doHTTPCheck(ctx, client, rawURL)
if lastError != nil || lastStatusCode != http.StatusOK {
return lastStatusCode, lastResponse, lastError
}
if i != n {
time.Sleep(interval)
}
}
return lastStatusCode, lastResponse, lastError
}
}

0 comments on commit e1b7cc3

Please sign in to comment.