Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add csi_operations_seconds metrics on kubelet #98979

Merged
merged 1 commit into from Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/volume/csi/csi_attacher.go
Expand Up @@ -257,7 +257,7 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
}
csi := c.csiClient

ctx, cancel := context.WithTimeout(context.Background(), c.watchTimeout)
ctx, cancel := createCSIOperationContext(spec, c.watchTimeout)
defer cancel()
// Check whether "STAGE_UNSTAGE_VOLUME" is set
stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
Expand Down Expand Up @@ -516,7 +516,8 @@ func (c *csiAttacher) UnmountDevice(deviceMountPath string) error {
}
csi := c.csiClient

ctx, cancel := context.WithTimeout(context.Background(), c.watchTimeout)
// could not get whether this is migrated because there is no spec
ctx, cancel := createCSIOperationContext(nil, csiTimeout)
defer cancel()
// Check whether "STAGE_UNSTAGE_VOLUME" is set
stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
Expand Down
6 changes: 3 additions & 3 deletions pkg/volume/csi/csi_block.go
Expand Up @@ -357,7 +357,7 @@ func (m *csiBlockMapper) MapPodDevice() (string, error) {
accessMode = m.spec.PersistentVolume.Spec.AccessModes[0]
}

ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
ctx, cancel := createCSIOperationContext(m.spec, csiTimeout)
defer cancel()

csiClient, err := m.csiClientGetter.Get()
Expand Down Expand Up @@ -426,7 +426,7 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
return errors.New("CSIBlockVolume feature not enabled")
}

ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
ctx, cancel := createCSIOperationContext(m.spec, csiTimeout)
defer cancel()

csiClient, err := m.csiClientGetter.Get()
Expand Down Expand Up @@ -499,7 +499,7 @@ func (m *csiBlockMapper) UnmapPodDevice() error {
return errors.New(log("blockMapper.UnmapPodDevice failed to get CSI client: %v", err))
}

ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
ctx, cancel := createCSIOperationContext(m.spec, csiTimeout)
defer cancel()

// Call NodeUnpublishVolume.
Expand Down
31 changes: 17 additions & 14 deletions pkg/volume/csi/csi_client.go
Expand Up @@ -92,6 +92,7 @@ type csiDriverName string
type csiDriverClient struct {
driverName csiDriverName
addr csiAddr
metricsManager *MetricsManager
nodeV1ClientCreator nodeV1ClientCreator
}

Expand All @@ -111,7 +112,7 @@ type csiResizeOptions struct {

var _ csiClient = &csiDriverClient{}

type nodeV1ClientCreator func(addr csiAddr) (
type nodeV1ClientCreator func(addr csiAddr, metricsManager *MetricsManager) (
nodeClient csipbv1.NodeClient,
closer io.Closer,
err error,
Expand All @@ -122,9 +123,9 @@ type nodeV1ClientCreator func(addr csiAddr) (
// the gRPC connection when the NodeClient is not used anymore.
// This is the default implementation for the nodeV1ClientCreator, used in
// newCsiDriverClient.
func newV1NodeClient(addr csiAddr) (nodeClient csipbv1.NodeClient, closer io.Closer, err error) {
func newV1NodeClient(addr csiAddr, metricsManager *MetricsManager) (nodeClient csipbv1.NodeClient, closer io.Closer, err error) {
var conn *grpc.ClientConn
conn, err = newGrpcConn(addr)
conn, err = newGrpcConn(addr, metricsManager)
if err != nil {
return nil, nil, err
}
Expand All @@ -148,6 +149,7 @@ func newCsiDriverClient(driverName csiDriverName) (*csiDriverClient, error) {
driverName: driverName,
addr: csiAddr(existingDriver.endpoint),
nodeV1ClientCreator: nodeV1ClientCreator,
metricsManager: NewCSIMetricsManager(string(driverName)),
}, nil
}

Expand All @@ -172,7 +174,7 @@ func (c *csiDriverClient) nodeGetInfoV1(ctx context.Context) (
accessibleTopology map[string]string,
err error) {

nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
if err != nil {
return "", 0, nil, err
}
Expand Down Expand Up @@ -216,7 +218,7 @@ func (c *csiDriverClient) NodePublishVolume(

}

nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
if err != nil {
return err
}
Expand Down Expand Up @@ -275,7 +277,7 @@ func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOp
return opts.newSize, errors.New("size can not be less than 0")
}

nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
if err != nil {
return opts.newSize, err
}
Expand Down Expand Up @@ -331,7 +333,7 @@ func (c *csiDriverClient) NodeUnpublishVolume(ctx context.Context, volID string,
return errors.New("nodeV1ClientCreate is nil")
}

nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
if err != nil {
return err
}
Expand Down Expand Up @@ -367,7 +369,7 @@ func (c *csiDriverClient) NodeStageVolume(ctx context.Context,
return errors.New("nodeV1ClientCreate is nil")
}

nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
if err != nil {
return err
}
Expand Down Expand Up @@ -418,7 +420,7 @@ func (c *csiDriverClient) NodeUnstageVolume(ctx context.Context, volID, stagingT
return errors.New("nodeV1ClientCreate is nil")
}

nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
if err != nil {
return err
}
Expand All @@ -438,7 +440,7 @@ func (c *csiDriverClient) NodeSupportsNodeExpand(ctx context.Context) (bool, err
return false, errors.New("nodeV1ClientCreate is nil")
}

nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -469,7 +471,7 @@ func (c *csiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, e
return false, errors.New("nodeV1ClientCreate is nil")
}

nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -508,7 +510,7 @@ func asCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapabili
return csipbv1.VolumeCapability_AccessMode_UNKNOWN
}

func newGrpcConn(addr csiAddr) (*grpc.ClientConn, error) {
func newGrpcConn(addr csiAddr, metricsManager *MetricsManager) (*grpc.ClientConn, error) {
network := "unix"
klog.V(4).Infof(log("creating new gRPC connection for [%s://%s]", network, addr))

Expand All @@ -518,6 +520,7 @@ func newGrpcConn(addr csiAddr) (*grpc.ClientConn, error) {
grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, network, target)
}),
grpc.WithChainUnaryInterceptor(metricsManager.RecordMetricsInterceptor),
)
}

Expand Down Expand Up @@ -560,7 +563,7 @@ func (c *csiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, er
return false, errors.New("nodeV1ClientCreate is nil")
}

nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -594,7 +597,7 @@ func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string,
return nil, errors.New("nodeV1ClientCreate is nil")
}

nodeClient, closer, err := c.nodeV1ClientCreator(c.addr)
nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
if err != nil {
return nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/volume/csi/csi_client_test.go
Expand Up @@ -371,7 +371,7 @@ func TestClientNodeGetInfo(t *testing.T) {
fakeCloser := fake.NewCloser(t)
client := &csiDriverClient{
driverName: "Fake Driver Name",
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClient(false /* stagingCapable */)
nodeClient.SetNextError(tc.err)
nodeClient.SetNodeGetInfoResp(&csipbv1.NodeGetInfoResponse{
Expand Down Expand Up @@ -434,7 +434,7 @@ func TestClientNodePublishVolume(t *testing.T) {
fakeCloser := fake.NewCloser(t)
client := &csiDriverClient{
driverName: "Fake Driver Name",
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClient(false /* stagingCapable */)
nodeClient.SetNextError(tc.err)
return nodeClient, fakeCloser, nil
Expand Down Expand Up @@ -488,7 +488,7 @@ func TestClientNodeUnpublishVolume(t *testing.T) {
fakeCloser := fake.NewCloser(t)
client := &csiDriverClient{
driverName: "Fake Driver Name",
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClient(false /* stagingCapable */)
nodeClient.SetNextError(tc.err)
return nodeClient, fakeCloser, nil
Expand Down Expand Up @@ -534,7 +534,7 @@ func TestClientNodeStageVolume(t *testing.T) {
fakeCloser := fake.NewCloser(t)
client := &csiDriverClient{
driverName: "Fake Driver Name",
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClient(false /* stagingCapable */)
nodeClient.SetNextError(tc.err)
return nodeClient, fakeCloser, nil
Expand Down Expand Up @@ -586,7 +586,7 @@ func TestClientNodeUnstageVolume(t *testing.T) {
fakeCloser := fake.NewCloser(t)
client := &csiDriverClient{
driverName: "Fake Driver Name",
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClient(false /* stagingCapable */)
nodeClient.SetNextError(tc.err)
return nodeClient, fakeCloser, nil
Expand Down Expand Up @@ -647,7 +647,7 @@ func TestNodeExpandVolume(t *testing.T) {
fakeCloser := fake.NewCloser(t)
client := &csiDriverClient{
driverName: "Fake Driver Name",
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClient(false /* stagingCapable */)
nodeClient.SetNextError(tc.err)
return nodeClient, fakeCloser, nil
Expand Down
51 changes: 51 additions & 0 deletions pkg/volume/csi/csi_metrics.go
Expand Up @@ -19,9 +19,12 @@ package csi
import (
"context"
"fmt"
"time"

"google.golang.org/grpc"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)

var _ volume.MetricsProvider = &metricsCsi{}
Expand Down Expand Up @@ -79,3 +82,51 @@ func (mc *metricsCsi) GetMetrics() (*volume.Metrics, error) {
metrics.Time = currentTime
return metrics, nil
}

// MetricsManager defines the metrics mananger for CSI operation
type MetricsManager struct {
driverName string
}

// NewCSIMetricsManager creates a CSIMetricsManager object
func NewCSIMetricsManager(driverName string) *MetricsManager {
cmm := MetricsManager{
driverName: driverName,
}
return &cmm
}

type additionalInfo struct {
Migrated string
}
type additionalInfoKeyType struct{}

var additionalInfoKey additionalInfoKeyType

// RecordMetricsInterceptor is a grpc interceptor that is used to
// record CSI operation
func (cmm *MetricsManager) RecordMetricsInterceptor(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption) error {
start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
duration := time.Since(start)
// Check if this is migrated operation
additionalInfoVal := ctx.Value(additionalInfoKey)
migrated := "false"
if additionalInfoVal != nil {
additionalInfoVal, ok := additionalInfoVal.(additionalInfo)
if !ok {
return err
}
migrated = additionalInfoVal.Migrated
}
// Record the metric latency
volumeutil.RecordCSIOperationLatencyMetrics(cmm.driverName, method, err, duration, migrated)

return err
}
4 changes: 2 additions & 2 deletions pkg/volume/csi/csi_metrics_test.go
Expand Up @@ -45,7 +45,7 @@ func TestGetMetrics(t *testing.T) {
metricsGetter := &metricsCsi{volumeID: tc.volumeID, targetPath: tc.targetPath}
metricsGetter.csiClient = &csiDriverClient{
driverName: "com.google.gcepd",
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClientWithVolumeStats(true /* VolumeStatsCapable */)
fakeCloser := fake.NewCloser(t)
nodeClient.SetNodeVolumeStatsResp(getRawVolumeInfo())
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestGetMetricsDriverNotSupportStats(t *testing.T) {
metricsGetter := &metricsCsi{volumeID: tc.volumeID, targetPath: tc.targetPath}
metricsGetter.csiClient = &csiDriverClient{
driverName: "com.simple.SimpleDriver",
nodeV1ClientCreator: func(addr csiAddr) (csipbv1.NodeClient, io.Closer, error) {
nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
nodeClient := fake.NewNodeClientWithVolumeStats(false /* VolumeStatsCapable */)
fakeCloser := fake.NewCloser(t)
nodeClient.SetNodeVolumeStatsResp(getRawVolumeInfo())
Expand Down
6 changes: 3 additions & 3 deletions pkg/volume/csi/csi_mounter.go
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package csi

import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
Expand Down Expand Up @@ -114,7 +113,7 @@ func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error
return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to get CSI client: %v", err))

}
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
ctx, cancel := createCSIOperationContext(c.spec, csiTimeout)
defer cancel()

volSrc, pvSrc, err := getSourceFromSpec(c.spec)
Expand Down Expand Up @@ -396,7 +395,8 @@ func (c *csiMountMgr) TearDownAt(dir string) error {
return errors.New(log("mounter.SetUpAt failed to get CSI client: %v", err))
}

ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
// Could not get spec info on whether this is a migrated operation because c.spec is nil
ctx, cancel := createCSIOperationContext(c.spec, csiTimeout)
defer cancel()

if err := csi.NodeUnpublishVolume(ctx, volID, dir); err != nil {
Expand Down
10 changes: 10 additions & 0 deletions pkg/volume/csi/csi_util.go
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"time"

api "k8s.io/api/core/v1"
Expand Down Expand Up @@ -193,3 +194,12 @@ func GetCSIDriverName(spec *volume.Spec) (string, error) {
return "", errors.New(log("volume source not found in volume.Spec"))
}
}

func createCSIOperationContext(volumeSpec *volume.Spec, timeout time.Duration) (context.Context, context.CancelFunc) {
migrated := false
if volumeSpec != nil {
migrated = volumeSpec.Migrated
}
ctx := context.WithValue(context.Background(), additionalInfoKey, additionalInfo{Migrated: strconv.FormatBool(migrated)})
return context.WithTimeout(ctx, timeout)
}