Skip to content

Commit

Permalink
move mount logic to seperate functions
Browse files Browse the repository at this point in the history
  • Loading branch information
boddumanohar committed Mar 5, 2021
1 parent f694160 commit d55b6c0
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 38 deletions.
14 changes: 9 additions & 5 deletions pkg/blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,25 +93,29 @@ var (
// Driver implements all interfaces of CSI drivers
type Driver struct {
csicommon.CSIDriver
cloud *azure.Cloud
blobfuseProxyEndpoint string
mounter *mount.SafeFormatAndMount
volLockMap *util.LockMap
cloud *azure.Cloud
blobfuseProxyEndpoint string
enableBlobfuseProxy bool
blobfuseProxyConnTimout int
mounter *mount.SafeFormatAndMount
volLockMap *util.LockMap
// A map storing all volumes with ongoing operations so that additional operations
// for that same volume (as defined by VolumeID) return an Aborted error
volumeLocks *volumeLocks
}

// NewDriver Creates a NewCSIDriver object. Assumes vendor version is equal to driver version &
// does not support optional driver plugin info manifest field. Refer to CSI spec for more details.
func NewDriver(nodeID, blobfuseProxyEndpoint string) *Driver {
func NewDriver(nodeID, blobfuseProxyEndpoint string, enableBlobfuseProxy bool, blobfuseProxyConnTimout int) *Driver {
driver := Driver{}
driver.Name = DriverName
driver.Version = driverVersion
driver.NodeID = nodeID
driver.volLockMap = util.NewLockMap()
driver.volumeLocks = newVolumeLocks()
driver.blobfuseProxyEndpoint = blobfuseProxyEndpoint
driver.enableBlobfuseProxy = enableBlobfuseProxy
driver.blobfuseProxyConnTimout = blobfuseProxyConnTimout
return &driver
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/blob/blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,19 @@ const (
)

func NewFakeDriver() *Driver {
driver := NewDriver(fakeNodeID, "")
driver := NewDriver(fakeNodeID, "", false, 5)
driver.Name = fakeDriverName
driver.Version = vendorVersion
return driver
}

func TestNewFakeDriver(t *testing.T) {
d := NewDriver(fakeNodeID, "")
d := NewDriver(fakeNodeID, "", false, 5)
assert.NotNil(t, d)
}

func TestNewDriver(t *testing.T) {
driver := NewDriver(fakeNodeID, "")
driver := NewDriver(fakeNodeID, "", false, 5)
fakedriver := NewFakeDriver()
fakedriver.Name = DriverName
fakedriver.Version = driverVersion
Expand Down
62 changes: 40 additions & 22 deletions pkg/blob/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,38 @@ func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolu
return &csi.NodePublishVolumeResponse{}, nil
}

func (d *Driver) mountUsingBlobfuseProxy(args string, authEnv []string) (string, error) {
klog.V(2).Infof("mouting using blobfuse proxy")
var resp *mount_azure_blob.MountAzureBlobResponse
var output string
connectionTimout := time.Duration(d.blobfuseProxyConnTimout)
ctx, cancel := context.WithTimeout(context.Background(), connectionTimout*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, d.blobfuseProxyEndpoint, grpc.WithInsecure(), grpc.WithBlock())
if err == nil {
mountClient := NewMountClient(conn)
mountreq := mount_azure_blob.MountAzureBlobRequest{
MountArgs: args,
AuthEnv: authEnv,
}
klog.V(2).Infof("calling BlobfuseProxy: MountAzureBlob function")
resp, err = mountClient.service.MountAzureBlob(context.TODO(), &mountreq)
if err != nil {
klog.Error("GRPC call returned with an error:", err)
}
output = resp.GetOutput()
}
return output, err
}

func (d *Driver) mountUsingNodeServerBlobfuseBinary(args string, authEnv []string) (string, error) {
klog.V(2).Infof("mouting using nodeserver blobfuse binary")
cmd := exec.Command("blobfuse", strings.Split(args, " ")...)
cmd.Env = append(os.Environ(), authEnv...)
output, err := cmd.CombinedOutput()
return string(output), err
}

// NodeUnpublishVolume unmount the volume from the target path
func (d *Driver) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
if len(req.GetVolumeId()) == 0 {
Expand Down Expand Up @@ -227,34 +259,20 @@ func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRe
targetPath, protocol, volumeID, attrib, mountFlags, mountOptions, args, serverAddress)

authEnv = append(authEnv, "AZURE_STORAGE_ACCOUNT="+accountName, "AZURE_STORAGE_BLOB_ENDPOINT="+serverAddress)
var output []byte
var resp *mount_azure_blob.MountAzureBlobResponse
// create a context with 5 secs timeout.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, d.blobfuseProxyEndpoint, grpc.WithInsecure(), grpc.WithBlock())
if err == nil {
mountClient := NewMountClient(conn)
mountreq := mount_azure_blob.MountAzureBlobRequest{
MountArgs: args,
AuthEnv: authEnv,
}
klog.V(2).Infof("calling BlobfuseProxy: MountAzureBlob function")
resp, err = mountClient.service.MountAzureBlob(context.TODO(), &mountreq)
var output string
enableBlobfuseProxy := d.enableBlobfuseProxy
if enableBlobfuseProxy {
output, err = d.mountUsingBlobfuseProxy(args, authEnv)
if err != nil {
klog.Error("GRPC call returned with an error:", err)
klog.Warningf("cannot dial blobfuse-proxy at address: %v error: %v \nfalling back to the nodeserver based mount", d.blobfuseProxyEndpoint, err)
output, err = d.mountUsingNodeServerBlobfuseBinary(args, authEnv)
}
output = []byte(resp.GetOutput())
} else {
klog.Warningf("cannot dial blobfuse-proxy at address: %v error: %v \nfalling back to the nodeserver based mount", d.blobfuseProxyEndpoint, err)
// fall back to normal mount if blobfuse proxy is not cannot start
cmd := exec.Command("blobfuse", strings.Split(args, " ")...)
cmd.Env = append(os.Environ(), authEnv...)
output, err = cmd.CombinedOutput()
output, err = d.mountUsingNodeServerBlobfuseBinary(args, authEnv)
}

if err != nil {
err = fmt.Errorf("Mount failed with error: %v, output: %v", err, string(output))
err = fmt.Errorf("Mount failed with error: %v, output: %v", err, output)
klog.Errorf("%v", err)
notMnt, mntErr := d.mounter.IsLikelyNotMountPoint(targetPath)
if mntErr != nil {
Expand Down
19 changes: 19 additions & 0 deletions pkg/blob/nodeserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"os"
"os/exec"
"reflect"
"runtime"
"syscall"
Expand Down Expand Up @@ -570,3 +571,21 @@ func TestNodeExpandVolume(t *testing.T) {
t.Errorf("Unexpected error: %v", err)
}
}

func TestMountUsingBlobfuseProxy(t *testing.T) {
args := "--tmp-path /tmp"
authEnv := []string{"username=blob", "authkey=blob"}
d := NewFakeDriver()
_, err := d.mountUsingBlobfuseProxy(args, authEnv)
// should be context.deadlineExceededError{} error
assert.NotNil(t, err)
}

func TestMountUsingNodeServerBlobfuseBinary(t *testing.T) {
args := "--tmp-path /tmp"
authEnv := []string{"username=blob", "authkey=blob"}
d := NewFakeDriver()
_, err := d.mountUsingNodeServerBlobfuseBinary(args, authEnv)
_, ok := err.(*exec.ExitError)
assert.True(t, ok)
}
16 changes: 9 additions & 7 deletions pkg/blobplugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ func init() {
}

var (
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
blobfuseProxyEndpoint = flag.String("blobfuse-proxy-endpoint", "unix://tmp/blobfuse-proxy.sock", "blobfuse-proxy endpoint")
nodeID = flag.String("nodeid", "", "node id")
version = flag.Bool("version", false, "Print the version and exit.")
metricsAddress = flag.String("metrics-address", "0.0.0.0:29634", "export the metrics")
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
blobfuseProxyEndpoint = flag.String("blobfuse-proxy-endpoint", "unix://tmp/blobfuse-proxy.sock", "blobfuse-proxy endpoint")
nodeID = flag.String("nodeid", "", "node id")
version = flag.Bool("version", false, "Print the version and exit.")
metricsAddress = flag.String("metrics-address", "0.0.0.0:29634", "export the metrics")
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
enableBlobfuseProxy = flag.Bool("enable-blobfuse-proxy", false, "Whether supports using Blobfuse proxy for mounts")
blobfuseProxyConnTimout = flag.Int("blobfuse-proxy-connect-timeout", 5, "blobfuse proxy connection timeout")
)

func main() {
Expand All @@ -62,7 +64,7 @@ func main() {
}

func handle() {
driver := blob.NewDriver(*nodeID, *blobfuseProxyEndpoint)
driver := blob.NewDriver(*nodeID, *blobfuseProxyEndpoint, *enableBlobfuseProxy, *blobfuseProxyConnTimout)
if driver == nil {
klog.Fatalln("Failed to initialize Azure Blob Storage CSI driver")
}
Expand Down
11 changes: 10 additions & 1 deletion test/e2e/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os/exec"
"path"
"path/filepath"
"strconv"
"strings"
"testing"

Expand Down Expand Up @@ -109,7 +110,15 @@ var _ = ginkgo.BeforeSuite(func() {

nodeid := os.Getenv("nodeid")
kubeconfig := os.Getenv(kubeconfigEnvVar)
blobDriver = blob.NewDriver(nodeid, "")
value, exists := os.LookupEnv("ENABLE_BLOBFUSE_PROXY")
useBlobfuseProxy := false
if exists {
useBlobfuseProxy, err = strconv.ParseBool(value)
if err != nil {
log.Println("Failed to parse boolean value", err)
}
}
blobDriver = blob.NewDriver(nodeid, "", useBlobfuseProxy, 5)
go func() {
os.Setenv("AZURE_CREDENTIAL_FILE", credentials.TempAzureCredentialFilePath)
blobDriver.Run(fmt.Sprintf("unix:///tmp/csi-%s.sock", uuid.NewUUID().String()), kubeconfig, false)
Expand Down

0 comments on commit d55b6c0

Please sign in to comment.