Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #75750: Improve volume operation metrics #76222

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 20 additions & 1 deletion pkg/volume/util/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ import (
"k8s.io/kubernetes/pkg/volume"
)

const (
statusSuccess = "success"
statusFailUnknown = "fail-unknown"
)

var storageOperationMetric = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "storage_operation_duration_seconds",
Help: "Storage operation duration",
Buckets: []float64{.1, .25, .5, 1, 2.5, 5, 10, 15, 25, 50},
Buckets: []float64{.1, .25, .5, 1, 2.5, 5, 10, 15, 25, 50, 120, 300, 600},
},
[]string{"volume_plugin", "operation_name"},
)
Expand All @@ -41,13 +46,22 @@ var storageOperationErrorMetric = prometheus.NewCounterVec(
[]string{"volume_plugin", "operation_name"},
)

var storageOperationStatusMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "storage_operation_status_count",
Help: "Storage operation return statuses count",
},
[]string{"volume_plugin", "operation_name", "status"},
)

func init() {
registerMetrics()
}

func registerMetrics() {
prometheus.MustRegister(storageOperationMetric)
prometheus.MustRegister(storageOperationErrorMetric)
prometheus.MustRegister(storageOperationStatusMetric)
}

// OperationCompleteHook returns a hook to call when an operation is completed
Expand All @@ -56,11 +70,16 @@ func OperationCompleteHook(plugin, operationName string) func(*error) {
opComplete := func(err *error) {
timeTaken := time.Since(requestTime).Seconds()
// Create metric with operation name and plugin name
status := statusSuccess
if *err != nil {
// TODO: Establish well-known error codes to be able to distinguish
// user configuration errors from system errors.
status = statusFailUnknown
storageOperationErrorMetric.WithLabelValues(plugin, operationName).Inc()
} else {
storageOperationMetric.WithLabelValues(plugin, operationName).Observe(timeTaken)
}
storageOperationStatusMetric.WithLabelValues(plugin, operationName, status).Inc()
}
return opComplete
}
Expand Down
182 changes: 158 additions & 24 deletions test/e2e/storage/volume_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
. "github.com/onsi/gomega"
"github.com/prometheus/common/model"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
Expand All @@ -43,14 +44,16 @@ var _ = utils.SIGDescribe("[Serial] Volume metrics", func() {
ns string
pvc *v1.PersistentVolumeClaim
metricsGrabber *metrics.Grabber
invalidSc *storagev1.StorageClass
defaultScName string
)
f := framework.NewDefaultFramework("pv")

BeforeEach(func() {
c = f.ClientSet
ns = f.Namespace.Name
framework.SkipUnlessProviderIs("gce", "gke", "aws")
defaultScName := getDefaultStorageClassName(c)
defaultScName = getDefaultStorageClassName(c)
verifyDefaultStorageClass(c, defaultScName, true)

test := testsuites.StorageClassTest{
Expand All @@ -68,7 +71,22 @@ var _ = utils.SIGDescribe("[Serial] Volume metrics", func() {
})

AfterEach(func() {
framework.DeletePersistentVolumeClaim(c, pvc.Name, pvc.Namespace)
newPvc, err := c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(pvc.Name, metav1.GetOptions{})
if err != nil {
framework.Logf("Failed to get pvc %s/%s: %v", pvc.Namespace, pvc.Name, err)
} else {
framework.DeletePersistentVolumeClaim(c, newPvc.Name, newPvc.Namespace)
if newPvc.Spec.VolumeName != "" {
err = framework.WaitForPersistentVolumeDeleted(c, newPvc.Spec.VolumeName, 5*time.Second, 5*time.Minute)
framework.ExpectNoError(err, "Persistent Volume %v not deleted by dynamic provisioner", newPvc.Spec.VolumeName)
}
}

if invalidSc != nil {
err := c.StorageV1().StorageClasses().Delete(invalidSc.Name, nil)
framework.ExpectNoError(err, "Error deleting storageclass %v: %v", invalidSc.Name, err)
invalidSc = nil
}
})

It("should create prometheus metrics for volume provisioning and attach/detach", func() {
Expand Down Expand Up @@ -102,15 +120,72 @@ var _ = utils.SIGDescribe("[Serial] Volume metrics", func() {

updatedStorageMetrics := waitForDetachAndGrabMetrics(storageOpMetrics, metricsGrabber)

Expect(len(updatedStorageMetrics)).ToNot(Equal(0), "Error fetching c-m updated storage metrics")
Expect(len(updatedStorageMetrics.latencyMetrics)).ToNot(Equal(0), "Error fetching c-m updated storage metrics")
Expect(len(updatedStorageMetrics.statusMetrics)).ToNot(Equal(0), "Error fetching c-m updated storage metrics")

volumeOperations := []string{"volume_provision", "volume_detach", "volume_attach"}

for _, volumeOp := range volumeOperations {
verifyMetricCount(storageOpMetrics, updatedStorageMetrics, volumeOp)
verifyMetricCount(storageOpMetrics, updatedStorageMetrics, volumeOp, false)
}
})

It("should create prometheus metrics for volume provisioning errors [Slow]", func() {
var err error

if !metricsGrabber.HasRegisteredMaster() {
framework.Skipf("Environment does not support getting controller-manager metrics - skipping")
}

controllerMetrics, err := metricsGrabber.GrabFromControllerManager()

framework.ExpectNoError(err, "Error getting c-m metrics : %v", err)

storageOpMetrics := getControllerStorageMetrics(controllerMetrics)

By("Creating an invalid storageclass")
defaultClass, err := c.StorageV1().StorageClasses().Get(defaultScName, metav1.GetOptions{})
framework.ExpectNoError(err, "Error getting default storageclass: %v", err)

invalidSc = &storagev1.StorageClass{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("fail-metrics-invalid-sc-%s", pvc.Namespace),
},
Provisioner: defaultClass.Provisioner,
Parameters: map[string]string{
"invalidparam": "invalidvalue",
},
}
_, err = c.StorageV1().StorageClasses().Create(invalidSc)
framework.ExpectNoError(err, "Error creating new storageclass: %v", err)

pvc.Spec.StorageClassName = &invalidSc.Name
pvc, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(pvc)
framework.ExpectNoError(err, "failed to create PVC %s/%s", pvc.Namespace, pvc.Name)
Expect(pvc).ToNot(Equal(nil))

claims := []*v1.PersistentVolumeClaim{pvc}

By("Creating a pod and expecting it to fail")
pod := framework.MakePod(ns, nil, claims, false, "")
pod, err = c.CoreV1().Pods(ns).Create(pod)
framework.ExpectNoError(err, "failed to create Pod %s/%s", pod.Namespace, pod.Name)

err = framework.WaitTimeoutForPodRunningInNamespace(c, pod.Name, pod.Namespace, framework.PodStartShortTimeout)
Expect(err).To(HaveOccurred())

framework.Logf("Deleting pod %q/%q", pod.Namespace, pod.Name)
framework.ExpectNoError(framework.DeletePodWithWait(f, c, pod))

By("Checking failure metrics")
updatedControllerMetrics, err := metricsGrabber.GrabFromControllerManager()
framework.ExpectNoError(err, "failed to get controller manager metrics")
updatedStorageMetrics := getControllerStorageMetrics(updatedControllerMetrics)

Expect(len(updatedStorageMetrics.statusMetrics)).ToNot(Equal(0), "Error fetching c-m updated storage metrics")
verifyMetricCount(storageOpMetrics, updatedStorageMetrics, "volume_provision", true)
})

It("should create volume metrics with the correct PVC ref", func() {
var err error
pvc, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(pvc)
Expand Down Expand Up @@ -422,15 +497,33 @@ var _ = utils.SIGDescribe("[Serial] Volume metrics", func() {
})
})

func waitForDetachAndGrabMetrics(oldMetrics map[string]int64, metricsGrabber *metrics.Grabber) map[string]int64 {
type storageControllerMetrics struct {
latencyMetrics map[string]int64
statusMetrics map[string]statusMetricCounts
}

type statusMetricCounts struct {
successCount int64
failCount int64
otherCount int64
}

func newStorageControllerMetrics() *storageControllerMetrics {
return &storageControllerMetrics{
latencyMetrics: make(map[string]int64),
statusMetrics: make(map[string]statusMetricCounts),
}
}

func waitForDetachAndGrabMetrics(oldMetrics *storageControllerMetrics, metricsGrabber *metrics.Grabber) *storageControllerMetrics {
backoff := wait.Backoff{
Duration: 10 * time.Second,
Factor: 1.2,
Steps: 21,
}

updatedStorageMetrics := make(map[string]int64)
oldDetachCount, ok := oldMetrics["volume_detach"]
updatedStorageMetrics := newStorageControllerMetrics()
oldDetachCount, ok := oldMetrics.latencyMetrics["volume_detach"]
if !ok {
oldDetachCount = 0
}
Expand All @@ -444,7 +537,7 @@ func waitForDetachAndGrabMetrics(oldMetrics map[string]int64, metricsGrabber *me
}

updatedStorageMetrics = getControllerStorageMetrics(updatedMetrics)
newDetachCount, ok := updatedStorageMetrics["volume_detach"]
newDetachCount, ok := updatedStorageMetrics.latencyMetrics["volume_detach"]

// if detach metrics are not yet there, we need to retry
if !ok {
Expand All @@ -464,33 +557,74 @@ func waitForDetachAndGrabMetrics(oldMetrics map[string]int64, metricsGrabber *me
return updatedStorageMetrics
}

func verifyMetricCount(oldMetrics map[string]int64, newMetrics map[string]int64, metricName string) {
oldCount, ok := oldMetrics[metricName]
func verifyMetricCount(oldMetrics, newMetrics *storageControllerMetrics, metricName string, expectFailure bool) {
oldLatencyCount, ok := oldMetrics.latencyMetrics[metricName]
// if metric does not exist in oldMap, it probably hasn't been emitted yet.
if !ok {
oldCount = 0
oldLatencyCount = 0
}

oldStatusCount := int64(0)
if oldStatusCounts, ok := oldMetrics.statusMetrics[metricName]; ok {
if expectFailure {
oldStatusCount = oldStatusCounts.failCount
} else {
oldStatusCount = oldStatusCounts.successCount
}
}

newLatencyCount, ok := newMetrics.latencyMetrics[metricName]
if !expectFailure {
Expect(ok).To(BeTrue(), "Error getting updated latency metrics for %s", metricName)
}
newStatusCounts, ok := newMetrics.statusMetrics[metricName]
Expect(ok).To(BeTrue(), "Error getting updated status metrics for %s", metricName)

newStatusCount := int64(0)
if expectFailure {
newStatusCount = newStatusCounts.failCount
} else {
newStatusCount = newStatusCounts.successCount
}

newCount, ok := newMetrics[metricName]
Expect(ok).To(BeTrue(), "Error getting updated metrics for %s", metricName)
// It appears that in a busy cluster some spurious detaches are unavoidable
// even if the test is run serially. We really just verify if new count
// is greater than old count
Expect(newCount).To(BeNumerically(">", oldCount), "New count %d should be more than old count %d for action %s", newCount, oldCount, metricName)
if !expectFailure {
Expect(newLatencyCount).To(BeNumerically(">", oldLatencyCount), "New latency count %d should be more than old count %d for action %s", newLatencyCount, oldLatencyCount, metricName)
}
Expect(newStatusCount).To(BeNumerically(">", oldStatusCount), "New status count %d should be more than old count %d for action %s", newStatusCount, oldStatusCount, metricName)
}

func getControllerStorageMetrics(ms metrics.ControllerManagerMetrics) map[string]int64 {
result := make(map[string]int64)
func getControllerStorageMetrics(ms metrics.ControllerManagerMetrics) *storageControllerMetrics {
result := newStorageControllerMetrics()

for method, samples := range ms {
if method != "storage_operation_duration_seconds_count" {
continue
}
switch method {

case "storage_operation_duration_seconds_count":
for _, sample := range samples {
count := int64(sample.Value)
operation := string(sample.Metric["operation_name"])
result.latencyMetrics[operation] = count
}
case "storage_operation_status_count":
for _, sample := range samples {
count := int64(sample.Value)
operation := string(sample.Metric["operation_name"])
status := string(sample.Metric["status"])
statusCounts := result.statusMetrics[operation]
switch status {
case "success":
statusCounts.successCount = count
case "fail-unknown":
statusCounts.failCount = count
default:
statusCounts.otherCount = count
}
result.statusMetrics[operation] = statusCounts
}

for _, sample := range samples {
count := int64(sample.Value)
operation := string(sample.Metric["operation_name"])
result[operation] = count
}
}
return result
Expand Down