Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add migrated field to storage_operation_duration_seconds metric #99050

Merged
merged 4 commits into from Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/controller/volume/expand/expand_controller_test.go
Expand Up @@ -23,7 +23,7 @@ import (
"regexp"
"testing"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -121,9 +121,9 @@ func TestSyncHandler(t *testing.T) {
var expController *expandController
expController, _ = expc.(*expandController)
var expansionCalled bool
expController.operationGenerator = operationexecutor.NewFakeOGCounter(func() (error, error) {
expController.operationGenerator = operationexecutor.NewFakeOGCounter(func() volumetypes.OperationContext {
expansionCalled = true
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
})

if test.pv != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/volume/persistentvolume/BUILD
Expand Up @@ -31,6 +31,7 @@ go_library(
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/recyclerclient:go_default_library",
"//pkg/volume/util/subpath:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//staging/src/k8s.io/api/authentication/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/volume/persistentvolume/pv_controller.go
Expand Up @@ -51,6 +51,7 @@ import (
vol "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/recyclerclient"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"

"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -1412,7 +1413,7 @@ func (ctrl *PersistentVolumeController) doDeleteVolume(volume *v1.PersistentVolu

opComplete := util.OperationCompleteHook(pluginName, "volume_delete")
err = deleter.Delete()
opComplete(&err)
opComplete(volumetypes.CompleteFuncParam{Err: &err})
if err != nil {
// Deleter failed
return pluginName, false, err
Expand Down Expand Up @@ -1558,7 +1559,7 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(

opComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision")
volume, err = provisioner.Provision(selectedNode, allowedTopologies)
opComplete(&err)
opComplete(volumetypes.CompleteFuncParam{Err: &err})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the migrated field be part of provision and delete calls as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For CSI Migration, if a plugin has been migrated, then provision and delete will be handle by csi-provisioner. So any call happens here is a non-csi migration call

if err != nil {
// Other places of failure have nothing to do with VolumeScheduling,
// so just let controller retry in the next sync. We'll only call func
Expand Down
41 changes: 41 additions & 0 deletions pkg/volume/BUILD
Expand Up @@ -41,14 +41,55 @@ go_library(
"//vendor/k8s.io/klog/v2:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
] + select({
"@io_bazel_rules_go//go/platform:aix": [
"//pkg/volume/util/types:go_default_library",
],
"@io_bazel_rules_go//go/platform:android": [
"//pkg/features:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
],
"@io_bazel_rules_go//go/platform:darwin": [
"//pkg/volume/util/types:go_default_library",
],
"@io_bazel_rules_go//go/platform:dragonfly": [
"//pkg/volume/util/types:go_default_library",
],
"@io_bazel_rules_go//go/platform:freebsd": [
"//pkg/volume/util/types:go_default_library",
],
"@io_bazel_rules_go//go/platform:illumos": [
"//pkg/volume/util/types:go_default_library",
],
"@io_bazel_rules_go//go/platform:ios": [
"//pkg/volume/util/types:go_default_library",
],
"@io_bazel_rules_go//go/platform:js": [
"//pkg/volume/util/types:go_default_library",
],
"@io_bazel_rules_go//go/platform:linux": [
"//pkg/features:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
],
"@io_bazel_rules_go//go/platform:nacl": [
"//pkg/volume/util/types:go_default_library",
],
"@io_bazel_rules_go//go/platform:netbsd": [
"//pkg/volume/util/types:go_default_library",
],
"@io_bazel_rules_go//go/platform:openbsd": [
"//pkg/volume/util/types:go_default_library",
],
"@io_bazel_rules_go//go/platform:plan9": [
"//pkg/volume/util/types:go_default_library",
],
"@io_bazel_rules_go//go/platform:solaris": [
"//pkg/volume/util/types:go_default_library",
],
"@io_bazel_rules_go//go/platform:windows": [
"//pkg/volume/util/types:go_default_library",
],
"//conditions:default": [],
}),
)
Expand Down
1 change: 1 addition & 0 deletions pkg/volume/csimigration/plugin_manager.go
Expand Up @@ -142,6 +142,7 @@ func TranslateInTreeSpecToCSI(spec *volume.Spec, translator InTreeToCSITranslato
return nil, fmt.Errorf("failed to translate in-tree pv to CSI: %v", err)
}
return &volume.Spec{
Migrated: true,
PersistentVolume: csiPV,
ReadOnly: spec.ReadOnly,
InlineVolumeSpecForCSIMigration: inlineVolume,
Expand Down
1 change: 1 addition & 0 deletions pkg/volume/plugins.go
Expand Up @@ -473,6 +473,7 @@ type Spec struct {
PersistentVolume *v1.PersistentVolume
ReadOnly bool
InlineVolumeSpecForCSIMigration bool
Migrated bool
}

// Name returns the name of either Volume or PersistentVolume, one of which must not be nil.
Expand Down
18 changes: 12 additions & 6 deletions pkg/volume/util/metrics.go
Expand Up @@ -18,13 +18,15 @@ package util

import (
"fmt"
"strconv"
"time"

utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/types"
)

const (
Expand All @@ -47,7 +49,7 @@ var storageOperationMetric = metrics.NewHistogramVec(
Buckets: []float64{.1, .25, .5, 1, 2.5, 5, 10, 15, 25, 50, 120, 300, 600},
StabilityLevel: metrics.ALPHA,
},
[]string{"volume_plugin", "operation_name", "status"},
[]string{"volume_plugin", "operation_name", "status", "migrated"},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you enhance

func getMigrationVolumeOpCounts(cs clientset.Interface, pluginName string) (opCounts, opCounts) {
to also check for the new field in the metric?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the implementation of this here, some observation:

  1. The idea of migration volume metrics checking is to compare the in-tree plugin metrics has not changed before/after the test execution. => which implies no in-tree operation is invoked.

  2. This check does not utilize the corresponding CSI metrics at all. I think the reason might be

    • The test can be execute parallel and it would be hard to compare accurate result.
    • And also per the comments, some negative test cases might not emit any metrics.
  3. This is done using the to-be-deprecated metric "storage_operation_status_count". So we need to replace it with the correct metric.

And it seems that my change of adding a migrated status does not fit in the picture as we are not checking the csi metrics at all. And even if we want to check it will be hard because the observation 2 I mentioned above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, maybe we need a todo to try to test this via unit tests.

)

var storageOperationStatusMetric = metrics.NewCounterVec(
Expand Down Expand Up @@ -82,25 +84,29 @@ func registerMetrics() {
}

// OperationCompleteHook returns a hook to call when an operation is completed
func OperationCompleteHook(plugin, operationName string) func(*error) {
func OperationCompleteHook(plugin, operationName string) func(types.CompleteFuncParam) {
requestTime := time.Now()
opComplete := func(err *error) {
opComplete := func(c types.CompleteFuncParam) {
timeTaken := time.Since(requestTime).Seconds()
// Create metric with operation name and plugin name
status := statusSuccess
if *err != nil {
if *c.Err != nil {
// TODO: Establish well-known error codes to be able to distinguish
// user configuration errors from system errors.
status = statusFailUnknown
}
storageOperationMetric.WithLabelValues(plugin, operationName, status).Observe(timeTaken)
migrated := false
if c.Migrated != nil {
migrated = *c.Migrated
}
storageOperationMetric.WithLabelValues(plugin, operationName, status, strconv.FormatBool(migrated)).Observe(timeTaken)
storageOperationStatusMetric.WithLabelValues(plugin, operationName, status).Inc()
}
return opComplete
}

// FSGroupCompleteHook returns a hook to call when volume recursive permission is changed
func FSGroupCompleteHook(plugin volume.VolumePlugin, spec *volume.Spec) func(*error) {
func FSGroupCompleteHook(plugin volume.VolumePlugin, spec *volume.Spec) func(types.CompleteFuncParam) {
return OperationCompleteHook(GetFullQualifiedPluginNameForVolume(plugin.GetPluginName(), spec), "volume_fsgroup_recursive_apply")
}

Expand Down
Expand Up @@ -21,7 +21,7 @@ import (
"testing"
"time"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
Expand Down Expand Up @@ -824,36 +824,38 @@ func testConcurrentOperationsNegative(

/* END concurrent operations tests */

func generateCallbackFunc(done chan<- interface{}) func() (error, error) {
return func() (error, error) {
func generateCallbackFunc(done chan<- interface{}) func() volumetypes.OperationContext {
return func() volumetypes.OperationContext {
done <- true
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
}
}

func generateWaitFunc(done <-chan interface{}) func() (error, error) {
return func() (error, error) {
func generateWaitFunc(done <-chan interface{}) func() volumetypes.OperationContext {
return func() volumetypes.OperationContext {
<-done
return nil, nil
return volumetypes.NewOperationContext(nil, nil, false)
}
}

func panicFunc() (error, error) {
func panicFunc() volumetypes.OperationContext {
panic("testing panic")
}

func errorFunc() (error, error) {
return fmt.Errorf("placeholder1"), fmt.Errorf("placeholder2")
func errorFunc() volumetypes.OperationContext {
return volumetypes.NewOperationContext(fmt.Errorf("placeholder1"), fmt.Errorf("placeholder2"), false)
}

func generateWaitWithErrorFunc(done <-chan interface{}) func() (error, error) {
return func() (error, error) {
func generateWaitWithErrorFunc(done <-chan interface{}) func() volumetypes.OperationContext {
return func() volumetypes.OperationContext {
<-done
return fmt.Errorf("placeholder1"), fmt.Errorf("placeholder2")
return volumetypes.NewOperationContext(fmt.Errorf("placeholder1"), fmt.Errorf("placeholder2"), false)
}
}

func noopFunc() (error, error) { return nil, nil }
func noopFunc() volumetypes.OperationContext {
return volumetypes.NewOperationContext(nil, nil, false)
}

func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
backoff := wait.Backoff{
Expand Down
4 changes: 2 additions & 2 deletions pkg/volume/util/operationexecutor/fakegenerator.go
Expand Up @@ -32,13 +32,13 @@ import (
type fakeOGCounter struct {
// calledFuncs stores name and count of functions
calledFuncs map[string]int
opFunc func() (error, error)
opFunc func() volumetypes.OperationContext
}

var _ OperationGenerator = &fakeOGCounter{}

// NewFakeOGCounter returns a OperationGenerator
func NewFakeOGCounter(opFunc func() (error, error)) OperationGenerator {
func NewFakeOGCounter(opFunc func() volumetypes.OperationContext) OperationGenerator {
return &fakeOGCounter{
calledFuncs: map[string]int{},
opFunc: opFunc,
Expand Down