Skip to content

Commit

Permalink
Merge pull request #64519 from vladimirvivien/csi-gRPC-Conn-fix
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

CSI fix for gRPC conn leak

**What this PR does / why we need it**:
This PR is a bug fix for leaky gRPC connection that never closes (see issue #64341 for detail)

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #64341

This fix was originally started with PR #64380

```release-note
NONE
```
  • Loading branch information
Kubernetes Submit Queue committed Jun 2, 2018
2 parents c7b71eb + d6d3962 commit 54900d7
Show file tree
Hide file tree
Showing 9 changed files with 289 additions and 154 deletions.
3 changes: 2 additions & 1 deletion pkg/volume/csi/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ go_test(
"//pkg/volume:go_default_library",
"//pkg/volume/csi/fake:go_default_library",
"//pkg/volume/testing:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
"//pkg/volume/util:go_default_library",
"//vendor/github.com/container-storage-interface/spec/lib/go/csi/v0:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/storage/v1beta1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
Expand Down
9 changes: 2 additions & 7 deletions pkg/volume/csi/csi_attacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,7 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
}

if c.csiClient == nil {
if csiSource.Driver == "" {
return fmt.Errorf("attacher.MountDevice failed, driver name is empty")
}
addr := fmt.Sprintf(csiAddrTemplate, csiSource.Driver)
c.csiClient = newCsiDriverClient("unix", addr)
c.csiClient = newCsiDriverClient(csiSource.Driver)
}
csi := c.csiClient

Expand Down Expand Up @@ -472,8 +468,7 @@ func (c *csiAttacher) UnmountDevice(deviceMountPath string) error {
}

if c.csiClient == nil {
addr := fmt.Sprintf(csiAddrTemplate, driverName)
c.csiClient = newCsiDriverClient("unix", addr)
c.csiClient = newCsiDriverClient(driverName)
}
csi := c.csiClient

Expand Down
11 changes: 5 additions & 6 deletions pkg/volume/csi/csi_attacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
core "k8s.io/client-go/testing"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi/fake"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
)

Expand Down Expand Up @@ -583,8 +582,8 @@ func TestAttacherMountDevice(t *testing.T) {
numStaged = 0
}

cdc := csiAttacher.csiClient.(*csiDriverClient)
staged := cdc.nodeClient.(*fake.NodeClient).GetNodeStagedVolumes()
cdc := csiAttacher.csiClient.(*fakeCsiDriverClient)
staged := cdc.nodeClient.GetNodeStagedVolumes()
if len(staged) != numStaged {
t.Errorf("got wrong number of staged volumes, expecting %v got: %v", numStaged, len(staged))
}
Expand Down Expand Up @@ -668,8 +667,8 @@ func TestAttacherUnmountDevice(t *testing.T) {
csiAttacher.csiClient = setupClient(t, tc.stageUnstageSet)

// Add the volume to NodeStagedVolumes
cdc := csiAttacher.csiClient.(*csiDriverClient)
cdc.nodeClient.(*fake.NodeClient).AddNodeStagedVolume(tc.volID, tc.deviceMountPath)
cdc := csiAttacher.csiClient.(*fakeCsiDriverClient)
cdc.nodeClient.AddNodeStagedVolume(tc.volID, tc.deviceMountPath)

// Make the PV for this object
dir := filepath.Dir(tc.deviceMountPath)
Expand Down Expand Up @@ -700,7 +699,7 @@ func TestAttacherUnmountDevice(t *testing.T) {
if !tc.stageUnstageSet {
expectedSet = 1
}
staged := cdc.nodeClient.(*fake.NodeClient).GetNodeStagedVolumes()
staged := cdc.nodeClient.GetNodeStagedVolumes()
if len(staged) != expectedSet {
t.Errorf("got wrong number of staged volumes, expecting %v got: %v", expectedSet, len(staged))
}
Expand Down
107 changes: 56 additions & 51 deletions pkg/volume/csi/csi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package csi
import (
"context"
"errors"
"fmt"
"net"
"time"

Expand Down Expand Up @@ -61,45 +62,15 @@ type csiClient interface {

// csiClient encapsulates all csi-plugin methods
type csiDriverClient struct {
network string
addr string
conn *grpc.ClientConn
idClient csipb.IdentityClient
nodeClient csipb.NodeClient
ctrlClient csipb.ControllerClient
versionAsserted bool
versionSupported bool
publishAsserted bool
publishCapable bool
driverName string
nodeClient csipb.NodeClient
}

func newCsiDriverClient(network, addr string) *csiDriverClient {
return &csiDriverClient{network: network, addr: addr}
}

// assertConnection ensures a valid connection has been established
// if not, it creates a new connection and associated clients
func (c *csiDriverClient) assertConnection() error {
if c.conn == nil {
conn, err := grpc.Dial(
c.addr,
grpc.WithInsecure(),
grpc.WithDialer(func(target string, timeout time.Duration) (net.Conn, error) {
return net.Dial(c.network, target)
}),
)
if err != nil {
return err
}
c.conn = conn
c.idClient = csipb.NewIdentityClient(conn)
c.nodeClient = csipb.NewNodeClient(conn)
c.ctrlClient = csipb.NewControllerClient(conn)

// set supported version
}
var _ csiClient = &csiDriverClient{}

return nil
func newCsiDriverClient(driverName string) *csiDriverClient {
c := &csiDriverClient{driverName: driverName}
return c
}

func (c *csiDriverClient) NodePublishVolume(
Expand All @@ -121,10 +92,13 @@ func (c *csiDriverClient) NodePublishVolume(
if targetPath == "" {
return errors.New("missing target path")
}
if err := c.assertConnection(); err != nil {
glog.Errorf("%v: failed to assert a connection: %v", csiPluginName, err)

conn, err := newGrpcConn(c.driverName)
if err != nil {
return err
}
defer conn.Close()
nodeClient := csipb.NewNodeClient(conn)

req := &csipb.NodePublishVolumeRequest{
VolumeId: volID,
Expand All @@ -148,7 +122,7 @@ func (c *csiDriverClient) NodePublishVolume(
req.StagingTargetPath = stagingTargetPath
}

_, err := c.nodeClient.NodePublishVolume(ctx, req)
_, err = nodeClient.NodePublishVolume(ctx, req)
return err
}

Expand All @@ -160,17 +134,20 @@ func (c *csiDriverClient) NodeUnpublishVolume(ctx context.Context, volID string,
if targetPath == "" {
return errors.New("missing target path")
}
if err := c.assertConnection(); err != nil {
glog.Error(log("failed to assert a connection: %v", err))

conn, err := newGrpcConn(c.driverName)
if err != nil {
return err
}
defer conn.Close()
nodeClient := csipb.NewNodeClient(conn)

req := &csipb.NodeUnpublishVolumeRequest{
VolumeId: volID,
TargetPath: targetPath,
}

_, err := c.nodeClient.NodeUnpublishVolume(ctx, req)
_, err = nodeClient.NodeUnpublishVolume(ctx, req)
return err
}

Expand All @@ -190,10 +167,13 @@ func (c *csiDriverClient) NodeStageVolume(ctx context.Context,
if stagingTargetPath == "" {
return errors.New("missing staging target path")
}
if err := c.assertConnection(); err != nil {
glog.Errorf("%v: failed to assert a connection: %v", csiPluginName, err)

conn, err := newGrpcConn(c.driverName)
if err != nil {
return err
}
defer conn.Close()
nodeClient := csipb.NewNodeClient(conn)

req := &csipb.NodeStageVolumeRequest{
VolumeId: volID,
Expand All @@ -213,7 +193,7 @@ func (c *csiDriverClient) NodeStageVolume(ctx context.Context,
VolumeAttributes: volumeAttribs,
}

_, err := c.nodeClient.NodeStageVolume(ctx, req)
_, err = nodeClient.NodeStageVolume(ctx, req)
return err
}

Expand All @@ -225,27 +205,34 @@ func (c *csiDriverClient) NodeUnstageVolume(ctx context.Context, volID, stagingT
if stagingTargetPath == "" {
return errors.New("missing staging target path")
}
if err := c.assertConnection(); err != nil {
glog.Errorf("%v: failed to assert a connection: %v", csiPluginName, err)

conn, err := newGrpcConn(c.driverName)
if err != nil {
return err
}
defer conn.Close()
nodeClient := csipb.NewNodeClient(conn)

req := &csipb.NodeUnstageVolumeRequest{
VolumeId: volID,
StagingTargetPath: stagingTargetPath,
}
_, err := c.nodeClient.NodeUnstageVolume(ctx, req)
_, err = nodeClient.NodeUnstageVolume(ctx, req)
return err
}

func (c *csiDriverClient) NodeGetCapabilities(ctx context.Context) ([]*csipb.NodeServiceCapability, error) {
glog.V(4).Info(log("calling NodeGetCapabilities rpc"))
if err := c.assertConnection(); err != nil {
glog.Errorf("%v: failed to assert a connection: %v", csiPluginName, err)

conn, err := newGrpcConn(c.driverName)
if err != nil {
return nil, err
}
defer conn.Close()
nodeClient := csipb.NewNodeClient(conn)

req := &csipb.NodeGetCapabilitiesRequest{}
resp, err := c.nodeClient.NodeGetCapabilities(ctx, req)
resp, err := nodeClient.NodeGetCapabilities(ctx, req)
if err != nil {
return nil, err
}
Expand All @@ -263,3 +250,21 @@ func asCSIAccessMode(am api.PersistentVolumeAccessMode) csipb.VolumeCapability_A
}
return csipb.VolumeCapability_AccessMode_UNKNOWN
}

func newGrpcConn(driverName string) (*grpc.ClientConn, error) {
if driverName == "" {
return nil, fmt.Errorf("driver name is empty")
}

network := "unix"
addr := fmt.Sprintf(csiAddrTemplate, driverName)
glog.V(4).Infof(log("creating new gRPC connection for [%s://%s]", network, addr))

return grpc.Dial(
addr,
grpc.WithInsecure(),
grpc.WithDialer(func(target string, timeout time.Duration) (net.Conn, error) {
return net.Dial(network, target)
}),
)
}

0 comments on commit 54900d7

Please sign in to comment.