/
kubernetes-client.go
155 lines (136 loc) · 4.53 KB
/
kubernetes-client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package health
import (
"context"
"log"
"net/url"
"path"
"strconv"
"strings"
"time"
"github.com/m-lab/go/rtx"
"github.com/m-lab/locate/metrics"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
)
var errKubernetesAPI = "error making request to Kubernetes API server"
// KubernetesClient manages requests to the Kubernetes API server.
type KubernetesClient struct {
pod string
node string
namespace string
clientset kubernetes.Interface
}
// MustNewKubernetesClient creates a new KubenernetesClient instance.
// If the client cannot be instantiated, the function will exit.
func MustNewKubernetesClient(url *url.URL, pod, node, namespace, auth string) *KubernetesClient {
defConfig := getDefaultClientConfig(url, auth)
restConfig, err := defConfig.ClientConfig()
rtx.Must(err, "failed to create kubernetes config")
clientset, err := kubernetes.NewForConfig(restConfig)
rtx.Must(err, "failed to create kubernetes clientset")
client := &KubernetesClient{
pod: pod,
node: node,
namespace: namespace,
clientset: clientset,
}
return client
}
func getDefaultClientConfig(url *url.URL, auth string) clientcmd.ClientConfig {
// This is a low-level structure normally created from parsing a kubeconfig
// file. Since we know all values we can create the client object directly.
//
// The cluster and user names serve only to define a context that
// associates login credentials with a specific cluster.
clusterClient := api.Config{
Clusters: map[string]*api.Cluster{
// Define the cluster address and CA Certificate.
"cluster": {
Server: url.String(),
InsecureSkipTLSVerify: false, // Require a valid CA Certificate.
CertificateAuthority: path.Join(auth, "ca.crt"),
},
},
AuthInfos: map[string]*api.AuthInfo{
// Define the user credentials for access to the API.
"user": {
TokenFile: path.Join(auth, "token"),
},
},
Contexts: map[string]*api.Context{
// Define a context that refers to the above cluster and user.
"cluster-user": {
Cluster: "cluster",
AuthInfo: "user",
},
},
// Use the above context.
CurrentContext: "cluster-user",
}
defConfig := clientcmd.NewDefaultClientConfig(
clusterClient,
&clientcmd.ConfigOverrides{
ClusterInfo: api.Cluster{Server: ""},
},
)
return defConfig
}
// isHealthy returns true if it can determine the following conditions are true:
// - The Pod's status is "Running"
// - The Node's Ready condition is "True"
// - The Node does not have a "lame-duck" taint
//
// OR if it cannot contact the API Server to make a determination.
func (c *KubernetesClient) isHealthy(ctx context.Context) bool {
start := time.Now()
isHealthy := c.isPodRunning(ctx) && c.isNodeReady(ctx)
metrics.KubernetesRequestTimeHistogram.WithLabelValues(strconv.FormatBool(isHealthy)).Observe(time.Since(start).Seconds())
return isHealthy
}
func (c *KubernetesClient) isPodRunning(ctx context.Context) bool {
pod, err := c.clientset.CoreV1().Pods(c.namespace).Get(ctx, c.pod, metav1.GetOptions{})
if err != nil {
log.Printf("%s: %v", errKubernetesAPI, err)
metrics.KubernetesRequestsTotal.WithLabelValues("pod", extractError(err)).Inc()
return true
}
metrics.KubernetesRequestsTotal.WithLabelValues("pod", "OK").Inc()
return pod.Status.Phase == "Running"
}
// isNodeReady returns true if it can determine the following conditions are true:
// - The Node's Ready condition is "True"
// - The Node does not have a "lame-duck" taint
//
// OR if it cannot contact the API Server to make a determination.
func (c *KubernetesClient) isNodeReady(ctx context.Context) bool {
node, err := c.clientset.CoreV1().Nodes().Get(ctx, c.node, metav1.GetOptions{})
if err != nil {
log.Printf("%s: %v", errKubernetesAPI, err)
metrics.KubernetesRequestsTotal.WithLabelValues("node", extractError(err)).Inc()
return true
}
metrics.KubernetesRequestsTotal.WithLabelValues("node", "OK").Inc()
for _, condition := range node.Status.Conditions {
if condition.Type == "Ready" && condition.Status == "True" {
return !isInMaintenance(node)
}
}
return false
}
func isInMaintenance(node *v1.Node) bool {
for _, taint := range node.Spec.Taints {
if taint.Key == "lame-duck" {
return true
}
}
return false
}
// extractError extracts the base error string from the error returned by the
// the Kubernetes API.
func extractError(err error) string {
parts := strings.Split(err.Error(), ": ")
return parts[len(parts)-1]
}