-
Notifications
You must be signed in to change notification settings - Fork 21
/
k8s_metrics_api_client.go
127 lines (111 loc) · 3.44 KB
/
k8s_metrics_api_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package runners
import (
"bytes"
"context"
"github.com/pkg/errors"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"golang.org/x/sync/errgroup"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
// NewK8sMetricsApiClient returns a new k8sMetricsApiClient client
func NewK8sMetricsApiClient() (MetricsClient, error) {
return &k8sMetricsApiClient{}, nil
}
type k8sMetricsApiClient struct {
}
func (c *k8sMetricsApiClient) GetMetrics(ctx context.Context) (map[types.NamespacedName]*VolumeStats, error) {
// create a Kubernetes client using in-cluster configuration
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
// get a list of nodes and IP addresses
nodes, err := clientset.CoreV1().Nodes().List(context.Background(), v1.ListOptions{})
if err != nil {
return nil, err
}
// create a map to hold PVC usage data
pvcUsage := make(map[types.NamespacedName]*VolumeStats)
// use an errgroup to query kubelet for PVC usage on each node
eg, ctx := errgroup.WithContext(ctx)
for _, node := range nodes.Items {
nodeName := node.Name
eg.Go(func() error {
return getPVCUsage(clientset, nodeName, pvcUsage, ctx)
})
}
// wait for all queries to complete and handle any errors
if err := eg.Wait(); err != nil {
return nil, err
}
return pvcUsage, nil
}
func getPVCUsage(clientset *kubernetes.Clientset, nodeName string, pvcUsage map[types.NamespacedName]*VolumeStats, ctx context.Context) error {
// make the request to the api /metrics endpoint and handle the response
req := clientset.
CoreV1().
RESTClient().
Get().
Resource("nodes").
Name(nodeName).
SubResource("proxy").
Suffix("metrics")
respBody, err := req.DoRaw(ctx)
if err != nil {
return errors.Errorf("failed to get stats from kubelet on node %s: with error %s", nodeName, err)
}
parser := expfmt.TextParser{}
metricFamilies, err := parser.TextToMetricFamilies(bytes.NewReader(respBody))
if err != nil {
return errors.Wrapf(err, "failed to read response body from kubelet on node %s", nodeName)
}
// volumeAvailableQuery
if gauge, ok := metricFamilies[volumeAvailableQuery]; ok {
for _, m := range gauge.Metric {
pvcName, value := parseMetric(m)
pvcUsage[pvcName] = new(VolumeStats)
pvcUsage[pvcName].AvailableBytes = int64(value)
}
}
// volumeCapacityQuery
if gauge, ok := metricFamilies[volumeCapacityQuery]; ok {
for _, m := range gauge.Metric {
pvcName, value := parseMetric(m)
pvcUsage[pvcName].CapacityBytes = int64(value)
}
}
// inodesAvailableQuery
if gauge, ok := metricFamilies[inodesAvailableQuery]; ok {
for _, m := range gauge.Metric {
pvcName, value := parseMetric(m)
pvcUsage[pvcName].AvailableInodeSize = int64(value)
}
}
// inodesCapacityQuery
if gauge, ok := metricFamilies[inodesCapacityQuery]; ok {
for _, m := range gauge.Metric {
pvcName, value := parseMetric(m)
pvcUsage[pvcName].CapacityInodeSize = int64(value)
}
}
return nil
}
func parseMetric(m *dto.Metric) (pvcName types.NamespacedName, value uint64) {
for _, label := range m.GetLabel() {
if label.GetName() == "namespace" {
pvcName.Namespace = label.GetValue()
} else if label.GetName() == "persistentvolumeclaim" {
pvcName.Name = label.GetValue()
}
}
value = uint64(m.GetGauge().GetValue())
return pvcName, value
}