Skip to content

Commit

Permalink
feat: Implement new metrics endpoint for k8s 1.24+ (#1082)
Browse files Browse the repository at this point in the history
  • Loading branch information
fnuarnav committed Mar 28, 2023
1 parent b70ee9b commit a457d44
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 1 deletion.
124 changes: 124 additions & 0 deletions cmd/virtual-kubelet/internal/provider/mock/mock.go
Expand Up @@ -10,6 +10,7 @@ import (
"strings"
"time"

dto "github.com/prometheus/client_model/go"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
"github.com/virtual-kubelet/virtual-kubelet/log"
"github.com/virtual-kubelet/virtual-kubelet/node/api"
Expand Down Expand Up @@ -508,6 +509,129 @@ func (p *MockProvider) GetStatsSummary(ctx context.Context) (*stats.Summary, err
return res, nil
}

func (p *MockProvider) generateMockMetrics(metricsMap map[string][]*dto.Metric, resourceType string, label []*dto.LabelPair) map[string][]*dto.Metric {
var (
cpuMetricSuffix = "_cpu_usage_seconds_total"
memoryMetricSuffix = "_memory_working_set_bytes"
dummyValue = float64(100)
)

if metricsMap == nil {
metricsMap = map[string][]*dto.Metric{}
}

finalCpuMetricName := resourceType + cpuMetricSuffix
finalMemoryMetricName := resourceType + memoryMetricSuffix

newCPUMetric := dto.Metric{
Label: label,
Counter: &dto.Counter{
Value: &dummyValue,
},
}
newMemoryMetric := dto.Metric{
Label: label,
Gauge: &dto.Gauge{
Value: &dummyValue,
},
}
// if metric family exists add to metric array
if cpuMetrics, ok := metricsMap[finalCpuMetricName]; ok {
metricsMap[finalCpuMetricName] = append(cpuMetrics, &newCPUMetric)
} else {
metricsMap[finalCpuMetricName] = []*dto.Metric{&newCPUMetric}
}
if memoryMetrics, ok := metricsMap[finalMemoryMetricName]; ok {
metricsMap[finalMemoryMetricName] = append(memoryMetrics, &newMemoryMetric)
} else {
metricsMap[finalMemoryMetricName] = []*dto.Metric{&newMemoryMetric}
}

return metricsMap
}

func (p *MockProvider) getMetricType(metricName string) *dto.MetricType {
var (
dtoCounterMetricType = dto.MetricType_COUNTER
dtoGaugeMetricType = dto.MetricType_GAUGE
cpuMetricSuffix = "_cpu_usage_seconds_total"
memoryMetricSuffix = "_memory_working_set_bytes"
)
if strings.HasSuffix(metricName, cpuMetricSuffix) {
return &dtoCounterMetricType
}
if strings.HasSuffix(metricName, memoryMetricSuffix) {
return &dtoGaugeMetricType
}

return nil
}

func (p *MockProvider) GetMetricsResource(ctx context.Context) ([]*dto.MetricFamily, error) {
var span trace.Span
ctx, span = trace.StartSpan(ctx, "GetMetricsResource") //nolint: ineffassign,staticcheck
defer span.End()

var (
nodeNameStr = "NodeName"
podNameStr = "PodName"
containerNameStr = "containerName"
)
nodeLabels := []*dto.LabelPair{
{
Name: &nodeNameStr,
Value: &p.nodeName,
},
}

metricsMap := p.generateMockMetrics(nil, "node", nodeLabels)
for _, pod := range p.pods {
podLabels := []*dto.LabelPair{
{
Name: &nodeNameStr,
Value: &p.nodeName,
},
{
Name: &podNameStr,
Value: &pod.Name,
},
}
metricsMap = p.generateMockMetrics(metricsMap, "pod", podLabels)
for _, container := range pod.Spec.Containers {
containerLabels := []*dto.LabelPair{
{
Name: &nodeNameStr,
Value: &p.nodeName,
},
{
Name: &podNameStr,
Value: &pod.Name,
},
{
Name: &containerNameStr,
Value: &container.Name,
},
}
metricsMap = p.generateMockMetrics(metricsMap, "container", containerLabels)
}
}

res := []*dto.MetricFamily{}
for metricName := range metricsMap {
tempName := metricName
tempMetrics := metricsMap[tempName]

metricFamily := dto.MetricFamily{
Name: &tempName,
Type: p.getMetricType(tempName),
Metric: tempMetrics,
}
res = append(res, &metricFamily)
}

return res, nil
}

// NotifyPods is called to set a pod notifier callback function. This should be called before any operations are done
// within the provider.
func (p *MockProvider) NotifyPods(ctx context.Context, notifier func(*v1.Pod)) {
Expand Down
25 changes: 25 additions & 0 deletions internal/test/e2e/framework/stats.go
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"encoding/json"

dto "github.com/prometheus/client_model/go"
api "github.com/virtual-kubelet/virtual-kubelet/node/api"
stats "github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1"
"k8s.io/apimachinery/pkg/util/net"
)
Expand All @@ -29,3 +31,26 @@ func (f *Framework) GetStatsSummary(ctx context.Context) (*stats.Summary, error)
}
return res, nil
}

// GetStatsSummary queries the /metrics/resource endpoint of the virtual-kubelet and returns the Summary object obtained as a response.
func (f *Framework) GetMetricsResource(ctx context.Context) ([]*dto.MetricFamily, error) {
// Query the /stats/summary endpoint.
b, err := f.KubeClient.CoreV1().
RESTClient().
Get().
Namespace(f.Namespace).
Resource("pods").
SubResource("proxy").
Name(net.JoinSchemeNamePort("https", f.NodeName, "10250")).
Suffix(api.MetricsResourceRouteSuffix).DoRaw(ctx)
if err != nil {
return nil, err
}
// Unmarshal the response as a MetricFamily object and return it.
res := []*dto.MetricFamily{}
err = json.Unmarshal(b, &res)
if err != nil {
return nil, err
}
return res, nil
}
53 changes: 53 additions & 0 deletions node/api/metrics.go
@@ -0,0 +1,53 @@
// Copyright © 2017 The virtual-kubelet authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"context"
"encoding/json"
"net/http"

"github.com/pkg/errors"
dto "github.com/prometheus/client_model/go"
)

// PodMetricsResourceHandlerFunc defines the handler for getting pod metrics
type PodMetricsResourceHandlerFunc func(context.Context) ([]*dto.MetricFamily, error)

// HandlePodMetricsResource makes an HTTP handler for implementing the kubelet /metrics/resource endpoint
func HandlePodMetricsResource(h PodMetricsResourceHandlerFunc) http.HandlerFunc {
if h == nil {
return NotImplemented
}
return handleError(func(w http.ResponseWriter, req *http.Request) error {
metrics, err := h(req.Context())
if err != nil {
if isCancelled(err) {
return err
}
return errors.Wrap(err, "error getting status from provider")
}

b, err := json.Marshal(metrics)
if err != nil {
return errors.Wrap(err, "error marshalling metrics")
}

if _, err := w.Write(b); err != nil {
return errors.Wrap(err, "could not write to client")
}
return nil
})
}
32 changes: 31 additions & 1 deletion node/api/server.go
Expand Up @@ -41,10 +41,13 @@ type PodHandlerConfig struct { //nolint:golint
// GetPodsFromKubernetes is meant to enumerate the pods that the node is meant to be running
GetPodsFromKubernetes PodListerFunc
GetStatsSummary PodStatsSummaryHandlerFunc
GetMetricsResource PodMetricsResourceHandlerFunc
StreamIdleTimeout time.Duration
StreamCreationTimeout time.Duration
}

const MetricsResourceRouteSuffix = "/metrics/resource"

// PodHandler creates an http handler for interacting with pods/containers.
func PodHandler(p PodHandlerConfig, debug bool) http.Handler {
r := mux.NewRouter()
Expand Down Expand Up @@ -72,6 +75,11 @@ func PodHandler(p PodHandlerConfig, debug bool) http.Handler {
r.HandleFunc("/stats/summary/", f).Methods("GET")
}

if p.GetMetricsResource != nil {
f := HandlePodMetricsResource(p.GetMetricsResource)
r.HandleFunc(MetricsResourceRouteSuffix, f).Methods("GET")
r.HandleFunc(MetricsResourceRouteSuffix+"/", f).Methods("GET")
}
r.NotFoundHandler = http.HandlerFunc(NotFound)
return r
}
Expand All @@ -97,6 +105,26 @@ func PodStatsSummaryHandler(f PodStatsSummaryHandlerFunc) http.Handler {
return r
}

// PodMetricsResourceHandler creates an http handler for serving pod metrics.
//
// If the passed in handler func is nil this will create handlers which only
// serves http.StatusNotImplemented
func PodMetricsResourceHandler(f PodMetricsResourceHandlerFunc) http.Handler {
if f == nil {
return http.HandlerFunc(NotImplemented)
}

r := mux.NewRouter()

h := HandlePodMetricsResource(f)

r.Handle(MetricsResourceRouteSuffix, ochttp.WithRouteTag(h, "PodMetricsResourceHandler")).Methods("GET")
r.Handle(MetricsResourceRouteSuffix+"/", ochttp.WithRouteTag(h, "PodMetricsResourceHandler")).Methods("GET")

r.NotFoundHandler = http.HandlerFunc(NotFound)
return r
}

// AttachPodRoutes adds the http routes for pod stuff to the passed in serve mux.
//
// Callers should take care to namespace the serve mux as they see fit, however
Expand All @@ -111,7 +139,8 @@ func AttachPodRoutes(p PodHandlerConfig, mux ServeMux, debug bool) {
// The main reason for this struct is in case of expansion we do not need to break
// the package level API.
type PodMetricsConfig struct {
GetStatsSummary PodStatsSummaryHandlerFunc
GetStatsSummary PodStatsSummaryHandlerFunc
GetMetricsResource PodMetricsResourceHandlerFunc
}

// AttachPodMetricsRoutes adds the http routes for pod/node metrics to the passed in serve mux.
Expand All @@ -120,6 +149,7 @@ type PodMetricsConfig struct {
// these routes get called by the Kubernetes API server.
func AttachPodMetricsRoutes(p PodMetricsConfig, mux ServeMux) {
mux.Handle("/", InstrumentHandler(HandlePodStatsSummary(p.GetStatsSummary)))
mux.Handle("/", InstrumentHandler(HandlePodMetricsResource(p.GetMetricsResource)))
}

func instrumentRequest(r *http.Request) *http.Request {
Expand Down
5 changes: 5 additions & 0 deletions node/nodeutil/provider.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"io"

dto "github.com/prometheus/client_model/go"
"github.com/virtual-kubelet/virtual-kubelet/node"
"github.com/virtual-kubelet/virtual-kubelet/node/api"
"github.com/virtual-kubelet/virtual-kubelet/node/api/statsv1alpha1"
Expand All @@ -29,6 +30,9 @@ type Provider interface {

// GetStatsSummary gets the stats for the node, including running pods
GetStatsSummary(context.Context) (*statsv1alpha1.Summary, error)

// GetMetricsResource gets the metrics for the node, including running pods
GetMetricsResource(context.Context) ([]*dto.MetricFamily, error)
}

// ProviderConfig holds objects created by NewNodeFromClient that a provider may need to bootstrap itself.
Expand Down Expand Up @@ -61,6 +65,7 @@ func AttachProviderRoutes(mux api.ServeMux) NodeOpt {
return pods.List(labels.Everything())
},
GetStatsSummary: p.GetStatsSummary,
GetMetricsResource: p.GetMetricsResource,
StreamIdleTimeout: cfg.StreamIdleTimeout,
StreamCreationTimeout: cfg.StreamCreationTimeout,
}, true))
Expand Down
58 changes: 58 additions & 0 deletions test/e2e/basic.go
Expand Up @@ -111,6 +111,64 @@ func (ts *EndToEndTestSuite) TestGetStatsSummary(t *testing.T) {
}
}

// TestGetMetricsResource creates a pod having two containers and queries the /metrics/resource endpoint of the virtual-kubelet.
// It expects this endpoint to return stats for the current node, as well as for the aforementioned pod and each of its two containers.
func (ts *EndToEndTestSuite) TestGetMetricsResource(t *testing.T) {
ctx := context.Background()

// Create a pod with prefix "nginx-" having three containers.
pod, err := f.CreatePod(ctx, f.CreateDummyPodObjectWithPrefix(t.Name(), "nginx", "foo", "bar", "baz"))
if err != nil {
t.Fatal(err)
}
// Delete the "nginx-0-X" pod after the test finishes.
defer func() {
if err := f.DeletePodImmediately(ctx, pod.Namespace, pod.Name); err != nil && !apierrors.IsNotFound(err) {
t.Error(err)
}
}()

// Wait for the "nginx-" pod to be reported as running and ready.
if _, err := f.WaitUntilPodReady(pod.Namespace, pod.Name); err != nil {
t.Fatal(err)
}

// Grab the stats from the provider.
metricsResourceResponse, err := f.GetMetricsResource(ctx)
if err != nil {
t.Fatal(err)
}

// Make sure the "nginx-" pod exists in the metrics returned.
currentContainerStatsCount := 0
found := false
for _, metricFamily := range metricsResourceResponse {
if *metricFamily.Name == "pod_cpu_usage_seconds_total" {
for _, metric := range metricFamily.Metric {
if *metric.Label[1].Value == pod.Name {
found = true
}
}
}
if *metricFamily.Name == "container_cpu_usage_seconds_total" {
for _, metric := range metricFamily.Metric {
if *metric.Label[1].Value == pod.Name {
currentContainerStatsCount += 1
}
}
}
}
if !found {
t.Fatalf("Pod %s not found in metrics", pod.Name)
}

// Make sure that we've got stats for all the containers in the "nginx-" pod.
desiredContainerStatsCount := len(pod.Spec.Containers)
if currentContainerStatsCount != desiredContainerStatsCount {
t.Fatalf("expected stats for %d containers, got stats for %d containers", desiredContainerStatsCount, currentContainerStatsCount)
}
}

// TestPodLifecycleGracefulDelete creates a pod and verifies that the provider has been asked to create it.
// Then, it deletes the pods and verifies that the provider has been asked to delete it.
// These verifications are made using the /stats/summary endpoint of the virtual-kubelet, by checking for the presence or absence of the pods.
Expand Down

0 comments on commit a457d44

Please sign in to comment.