Skip to content

Commit

Permalink
Merge pull request #127 from MorrisLaw/consistent-sidecars
Browse files Browse the repository at this point in the history
Refactor external-attacher to use csi-lib-utils/rpc
  • Loading branch information
k8s-ci-robot committed Mar 7, 2019
2 parents 6dd3aae + 158f000 commit 1573881
Show file tree
Hide file tree
Showing 10 changed files with 239 additions and 422 deletions.
8 changes: 5 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Expand Up @@ -43,7 +43,7 @@

[[constraint]]
name = "github.com/kubernetes-csi/csi-lib-utils"
version = "0.3.1"
version = ">=0.4.0-rc1"

[prune]
non-go = true
Expand Down
49 changes: 37 additions & 12 deletions cmd/csi-attacher/main.go
Expand Up @@ -31,8 +31,12 @@ import (
csiinformers "k8s.io/csi-api/pkg/client/informers/externalversions"
"k8s.io/klog"

"github.com/kubernetes-csi/external-attacher/pkg/connection"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-lib-utils/connection"
"github.com/kubernetes-csi/csi-lib-utils/rpc"
"github.com/kubernetes-csi/external-attacher/pkg/attacher"
"github.com/kubernetes-csi/external-attacher/pkg/controller"
"google.golang.org/grpc"
)

const (
Expand Down Expand Up @@ -103,20 +107,20 @@ func main() {
var csiFactory csiinformers.SharedInformerFactory
var handler controller.Handler

var attacher string
var csiAttacher string
if *dummy {
// Do not connect to any CSI, mark everything as attached.
handler = controller.NewTrivialHandler(clientset)
attacher = dummyAttacherName
csiAttacher = dummyAttacherName
} else {
// Connect to CSI.
csiConn, err := connection.New(*csiAddress)
csiConn, err := connection.Connect(*csiAddress)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}

err = csiConn.Probe(*timeout)
err = rpc.ProbeForever(csiConn, *timeout)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
Expand All @@ -125,14 +129,14 @@ func main() {
// Find driver name.
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
attacher, err = csiConn.GetDriverName(ctx)
csiAttacher, err = rpc.GetDriverName(ctx, csiConn)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}
klog.V(2).Infof("CSI driver name: %q", attacher)
klog.V(2).Infof("CSI driver name: %q", csiAttacher)

supportsService, err := csiConn.SupportsPluginControllerService(ctx)
supportsService, err := supportsPluginControllerService(ctx, csiConn)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
Expand All @@ -142,7 +146,7 @@ func main() {
klog.V(2).Infof("CSI driver does not support Plugin Controller Service, using trivial handler")
} else {
// Find out if the driver supports attach/detach.
supportsAttach, supportsReadOnly, err := csiConn.SupportsControllerPublish(ctx)
supportsAttach, supportsReadOnly, err := supportsControllerPublish(ctx, csiConn)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
Expand All @@ -153,7 +157,8 @@ func main() {
vaLister := factory.Storage().V1beta1().VolumeAttachments().Lister()
csiFactory := csiinformers.NewSharedInformerFactory(csiClientset, *resync)
nodeInfoLister := csiFactory.Csi().V1alpha1().CSINodeInfos().Lister()
handler = controller.NewCSIHandler(clientset, csiClientset, attacher, csiConn, pvLister, nodeLister, nodeInfoLister, vaLister, timeout, supportsReadOnly)
attacher := attacher.NewAttacher(csiConn)
handler = controller.NewCSIHandler(clientset, csiClientset, csiAttacher, attacher, pvLister, nodeLister, nodeInfoLister, vaLister, timeout, supportsReadOnly)
klog.V(2).Infof("CSI driver supports ControllerPublishUnpublish, using real CSI handler")
} else {
handler = controller.NewTrivialHandler(clientset)
Expand All @@ -164,7 +169,7 @@ func main() {

ctrl := controller.NewCSIAttachController(
clientset,
attacher,
csiAttacher,
handler,
factory.Storage().V1beta1().VolumeAttachments(),
factory.Core().V1().PersistentVolumes(),
Expand Down Expand Up @@ -192,7 +197,7 @@ func main() {
os.Exit(1)
}
// Name of config map with leader election lock
lockName := "external-attacher-leader-" + attacher
lockName := "external-attacher-leader-" + csiAttacher
runAsLeader(clientset, *leaderElectionNamespace, *leaderElectionIdentity, lockName, run)
}
}
Expand All @@ -203,3 +208,23 @@ func buildConfig(kubeconfig string) (*rest.Config, error) {
}
return rest.InClusterConfig()
}

func supportsControllerPublish(ctx context.Context, csiConn *grpc.ClientConn) (supportsControllerPublish bool, supportsPublishReadOnly bool, err error) {
caps, err := rpc.GetControllerCapabilities(ctx, csiConn)
if err != nil {
return false, false, err
}

supportsControllerPublish = caps[csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME]
supportsPublishReadOnly = caps[csi.ControllerServiceCapability_RPC_PUBLISH_READONLY]
return supportsControllerPublish, supportsPublishReadOnly, nil
}

func supportsPluginControllerService(ctx context.Context, csiConn *grpc.ClientConn) (bool, error) {
caps, err := rpc.GetPluginCapabilities(ctx, csiConn)
if err != nil {
return false, err
}

return caps[csi.PluginCapability_Service_CONTROLLER_SERVICE], nil
}
82 changes: 13 additions & 69 deletions pkg/connection/connection.go → pkg/attacher/attacher.go
Expand Up @@ -14,36 +14,22 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package connection
package attacher

import (
"context"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-lib-utils/connection"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog"
)

// CSIConnection is gRPC connection to a remote CSI driver and abstracts all
// CSI calls.
type CSIConnection interface {
// GetDriverName returns driver name as discovered by GetPluginInfo()
// gRPC call.
GetDriverName(ctx context.Context) (string, error)

// SupportsControllerPublish returns true if the CSI driver reports
// PUBLISH_UNPUBLISH_VOLUME in ControllerGetCapabilities() gRPC call.
SupportsControllerPublish(ctx context.Context) (supportsControllerPublish bool, supportsPublishReadOnly bool, err error)

// SupportsPluginControllerService return true if the CSI driver reports
// CONTROLLER_SERVICE in GetPluginCapabilities() gRPC call.
SupportsPluginControllerService(ctx context.Context) (bool, error)

// Attacher implements attach/detach operations against a remote CSI driver.
type Attacher interface {
// Attach given volume to given node. Returns PublishVolumeInfo. Note that
// "detached" is returned on error and means that the volume is for sure
// detached from the node. "false" means that the volume may be either
Expand All @@ -56,64 +42,26 @@ type CSIConnection interface {
// "false" means that the volume may or may not be detached and caller
// should retry.
Detach(ctx context.Context, volumeID string, nodeID string, secrets map[string]string) (detached bool, err error)

// Probe checks that the CSI driver is ready to process requests
Probe(singleProbeTimeout time.Duration) error

// Close the connection
Close() error
}

type csiConnection struct {
type attacher struct {
conn *grpc.ClientConn
capabilities []csi.ControllerServiceCapability
}

var (
_ CSIConnection = &csiConnection{}
_ Attacher = &attacher{}
)

// New provides a new CSI connection object.
func New(address string) (CSIConnection, error) {
conn, err := connection.Connect(address, connection.OnConnectionLoss(connection.ExitOnConnectionLoss()))
if err != nil {
return nil, err
}
return &csiConnection{
// NewAttacher provides a new Attacher object.
func NewAttacher(conn *grpc.ClientConn) Attacher {
return &attacher{
conn: conn,
}, nil
}

func (c *csiConnection) GetDriverName(ctx context.Context) (string, error) {
return connection.GetDriverName(ctx, c.conn)
}

func (c *csiConnection) Probe(singleProbeTimeout time.Duration) error {
return connection.ProbeForever(c.conn, singleProbeTimeout)
}

func (c *csiConnection) SupportsControllerPublish(ctx context.Context) (supportsControllerPublish bool, supportsPublishReadOnly bool, err error) {
caps, err := connection.GetControllerCapabilities(ctx, c.conn)
if err != nil {
return false, false, err
}

supportsControllerPublish = caps[csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME]
supportsPublishReadOnly = caps[csi.ControllerServiceCapability_RPC_PUBLISH_READONLY]
return supportsControllerPublish, supportsPublishReadOnly, nil
}

func (c *csiConnection) SupportsPluginControllerService(ctx context.Context) (bool, error) {
caps, err := connection.GetPluginCapabilities(ctx, c.conn)
if err != nil {
return false, err
}

return caps[csi.PluginCapability_Service_CONTROLLER_SERVICE], nil
}

func (c *csiConnection) Attach(ctx context.Context, volumeID string, readOnly bool, nodeID string, caps *csi.VolumeCapability, context, secrets map[string]string) (metadata map[string]string, detached bool, err error) {
client := csi.NewControllerClient(c.conn)
func (a *attacher) Attach(ctx context.Context, volumeID string, readOnly bool, nodeID string, caps *csi.VolumeCapability, context, secrets map[string]string) (metadata map[string]string, detached bool, err error) {
client := csi.NewControllerClient(a.conn)

req := csi.ControllerPublishVolumeRequest{
VolumeId: volumeID,
Expand All @@ -131,8 +79,8 @@ func (c *csiConnection) Attach(ctx context.Context, volumeID string, readOnly bo
return rsp.PublishContext, false, nil
}

func (c *csiConnection) Detach(ctx context.Context, volumeID string, nodeID string, secrets map[string]string) (detached bool, err error) {
client := csi.NewControllerClient(c.conn)
func (a *attacher) Detach(ctx context.Context, volumeID string, nodeID string, secrets map[string]string) (detached bool, err error) {
client := csi.NewControllerClient(a.conn)

req := csi.ControllerUnpublishVolumeRequest{
VolumeId: volumeID,
Expand All @@ -147,10 +95,6 @@ func (c *csiConnection) Detach(ctx context.Context, volumeID string, nodeID stri
return true, nil
}

func (c *csiConnection) Close() error {
return c.conn.Close()
}

func logGRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
klog.V(5).Infof("GRPC call: %s", method)
klog.V(5).Infof("GRPC request: %s", protosanitizer.StripSecrets(req))
Expand Down

0 comments on commit 1573881

Please sign in to comment.