Skip to content

Commit

Permalink
Merge pull request #10 from kfox1111/ephemeral
Browse files Browse the repository at this point in the history
Ephemeral support
  • Loading branch information
k8s-ci-robot committed Apr 19, 2019
2 parents a6ba7d0 + 065acbe commit ac1ccd7
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 44 deletions.
3 changes: 2 additions & 1 deletion cmd/hostpathplugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var (
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
driverName = flag.String("drivername", "csi-hostpath", "name of the driver")
nodeID = flag.String("nodeid", "", "node id")
ephemeral = flag.Bool("ephemeral", false, "deploy in ephemeral mode")
showVersion = flag.Bool("version", false, "Show version.")
// Set by the build process
version = ""
Expand All @@ -52,7 +53,7 @@ func main() {
}

func handle() {
driver, err := hostpath.NewHostPathDriver(*driverName, *nodeID, *endpoint, version)
driver, err := hostpath.NewHostPathDriver(*driverName, *nodeID, *endpoint, version, *ephemeral)
if err != nil {
fmt.Printf("Failed to initialize driver: %s", err.Error())
os.Exit(1)
Expand Down
52 changes: 26 additions & 26 deletions pkg/hostpath/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ type controllerServer struct {
caps []*csi.ControllerServiceCapability
}

func NewControllerServer() *controllerServer {
func NewControllerServer(ephemeral bool) *controllerServer {
if ephemeral {
return &controllerServer{caps: getControllerServiceCapabilities(nil)}
}
return &controllerServer{
caps: getControllerServiceCapabilities(
[]csi.ControllerServiceCapability_RPC_Type{
Expand Down Expand Up @@ -110,6 +113,12 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
requestedAccessType = mountAccess
}

// Check for maximum available capacity
capacity := int64(req.GetCapacityRange().GetRequiredBytes())
if capacity >= maxStorageCapacity {
return nil, status.Errorf(codes.OutOfRange, "Requested capacity %d exceeds maximum allowed %d", capacity, maxStorageCapacity)
}

// Need to check for already existing volume name, and if found
// check for the requested capacity and already allocated capacity
if exVol, err := getVolumeByName(req.GetName()); err == nil {
Expand All @@ -129,17 +138,11 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
}
return nil, status.Error(codes.AlreadyExists, fmt.Sprintf("Volume with the same name: %s but with different size already exist", req.GetName()))
}
// Check for maximum available capacity
capacity := int64(req.GetCapacityRange().GetRequiredBytes())
if capacity >= maxStorageCapacity {
return nil, status.Errorf(codes.OutOfRange, "Requested capacity %d exceeds maximum allowed %d", capacity, maxStorageCapacity)
}

volumeID := uuid.NewUUID().String()
path := provisionRoot + volumeID
path := getVolumePath(volumeID)

switch requestedAccessType {
case blockAccess:
if requestedAccessType == blockAccess {
executor := utilexec.New()
size := fmt.Sprintf("%dM", capacity/mib)
// Create a block file.
Expand All @@ -160,14 +163,14 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
}
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to attach device: %v", err))
}
case mountAccess:
err := os.MkdirAll(path, 0777)
if err != nil {
glog.V(3).Infof("failed to create volume: %v", err)
return nil, err
}
}

vol, err := createHostpathVolume(volumeID, req.GetName(), capacity, requestedAccessType)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to create volume: %s", err))
}
glog.V(4).Infof("created volume %s at path %s", vol.VolID, vol.VolPath)

if req.GetVolumeContentSource() != nil {
contentSource := req.GetVolumeContentSource()
if contentSource.GetSnapshot() != nil {
Expand All @@ -188,14 +191,7 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
}
}
}
glog.V(4).Infof("create volume %s", path)
hostPathVol := hostPathVolume{}
hostPathVol.VolName = req.GetName()
hostPathVol.VolID = volumeID
hostPathVol.VolSize = capacity
hostPathVol.VolPath = path
hostPathVol.VolAccessType = requestedAccessType
hostPathVolumes[volumeID] = hostPathVol

return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: volumeID,
Expand Down Expand Up @@ -228,7 +224,7 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol

volPathHandler := volumepathhandler.VolumePathHandler{}
// Get the associated loop device.
device, err := volPathHandler.GetLoopDevice(provisionRoot + vol.VolID)
device, err := volPathHandler.GetLoopDevice(getVolumePath(vol.VolID))
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to get the loop device: %v", err))
}
Expand All @@ -242,8 +238,12 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
}
}

os.RemoveAll(vol.VolPath)
delete(hostPathVolumes, vol.VolID)
if err := deleteHostpathVolume(vol.VolID); err != nil && !os.IsNotExist(err) {
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to delete volume: %s", err))
}

glog.V(4).Infof("volume deleted ok: %s", vol.VolID)

return &csi.DeleteVolumeResponse{}, nil
}

Expand Down
63 changes: 51 additions & 12 deletions pkg/hostpath/hostpath.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package hostpath

import (
"fmt"
"os"

"github.com/golang/glog"

Expand All @@ -34,10 +35,11 @@ const (
)

type hostPath struct {
name string
nodeID string
version string
endpoint string
name string
nodeID string
version string
endpoint string
ephemeral bool

ids *identityServer
ns *nodeServer
Expand Down Expand Up @@ -74,7 +76,7 @@ func init() {
hostPathVolumeSnapshots = map[string]hostPathSnapshot{}
}

func NewHostPathDriver(driverName, nodeID, endpoint, version string) (*hostPath, error) {
func NewHostPathDriver(driverName, nodeID, endpoint, version string, ephemeral bool) (*hostPath, error) {
if driverName == "" {
return nil, fmt.Errorf("No driver name provided")
}
Expand All @@ -94,19 +96,19 @@ func NewHostPathDriver(driverName, nodeID, endpoint, version string) (*hostPath,
glog.Infof("Version: %s", vendorVersion)

return &hostPath{
name: driverName,
version: vendorVersion,
nodeID: nodeID,
endpoint: endpoint,
name: driverName,
version: vendorVersion,
nodeID: nodeID,
endpoint: endpoint,
ephemeral: ephemeral,
}, nil
}

func (hp *hostPath) Run() {

// Create GRPC servers
hp.ids = NewIdentityServer(hp.name, hp.version)
hp.ns = NewNodeServer(hp.nodeID)
hp.cs = NewControllerServer()
hp.ns = NewNodeServer(hp.nodeID, hp.ephemeral)
hp.cs = NewControllerServer(hp.ephemeral)

s := NewNonBlockingGRPCServer()
s.Start(hp.endpoint, hp.ids, hp.cs, hp.ns)
Expand Down Expand Up @@ -137,3 +139,40 @@ func getSnapshotByName(name string) (hostPathSnapshot, error) {
}
return hostPathSnapshot{}, fmt.Errorf("snapshot name %s does not exit in the snapshots list", name)
}

// getVolumePath returs the canonical path for hostpath volume
func getVolumePath(volID string) string {
return fmt.Sprintf("%s/%s", provisionRoot, volID)
}

// createVolume create the directory for the hostpath volume.
// It returns the volume path or err if one occurs.
func createHostpathVolume(volID, name string, cap int64, volAccessType accessType) (*hostPathVolume, error) {
path := getVolumePath(volID)
if volAccessType == mountAccess {
err := os.MkdirAll(path, 0777)
if err != nil {
return nil, err
}
}

hostpathVol := hostPathVolume{
VolID: volID,
VolName: name,
VolSize: cap,
VolPath: path,
VolAccessType: volAccessType,
}
hostPathVolumes[volID] = hostpathVol
return &hostpathVol, nil
}

// deleteVolume deletes the directory for the hostpath volume.
func deleteHostpathVolume(volID string) error {
path := getVolumePath(volID)
if err := os.RemoveAll(path); err != nil {
return err
}
delete(hostPathVolumes, volID)
return nil
}
39 changes: 34 additions & 5 deletions pkg/hostpath/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package hostpath
import (
"fmt"
"os"
"strings"

"github.com/golang/glog"
"golang.org/x/net/context"
Expand All @@ -31,12 +32,14 @@ import (
)

type nodeServer struct {
nodeID string
nodeID string
ephemeral bool
}

func NewNodeServer(nodeId string) *nodeServer {
func NewNodeServer(nodeId string, ephemeral bool) *nodeServer {
return &nodeServer{
nodeID: nodeId,
nodeID: nodeId,
ephemeral: ephemeral,
}
}

Expand All @@ -60,6 +63,18 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return nil, status.Error(codes.InvalidArgument, "cannot have both block and mount access type")
}

// if ephemeral is specified, create volume here to avoid errors
if ns.ephemeral {
volID := req.GetVolumeId()
volName := fmt.Sprintf("ephemeral-%s", volID)
vol, err := createHostpathVolume(req.GetVolumeId(), volName, maxStorageCapacity, mountAccess)
if err != nil && !os.IsExist(err) {
glog.Error("ephemeral mode failed to create volume: ", err)
return nil, status.Error(codes.Internal, err.Error())
}
glog.V(4).Infof("ephemeral mode: created volume: %s", vol.VolPath)
}

vol, err := getVolumeByID(req.GetVolumeId())
if err != nil {
return nil, status.Error(codes.NotFound, err.Error())
Expand Down Expand Up @@ -150,9 +165,16 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
options = append(options, "ro")
}
mounter := mount.New("")
path := provisionRoot + volumeId
path := getVolumePath(volumeId)

if err := mounter.Mount(path, targetPath, "", options); err != nil {
return nil, err
var errList strings.Builder
errList.WriteString(err.Error())
if ns.ephemeral {
if rmErr := os.RemoveAll(path); rmErr != nil && !os.IsNotExist(rmErr) {
errList.WriteString(fmt.Sprintf(" :%s", rmErr.Error()))
}
}
}
}

Expand Down Expand Up @@ -196,6 +218,13 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
glog.V(4).Infof("hostpath: volume %s/%s has been unmounted.", targetPath, volumeID)
}

if ns.ephemeral {
glog.V(4).Infof("deleting volume %s", volumeID)
if err := deleteHostpathVolume(volumeID); err != nil && !os.IsNotExist(err) {
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to delete volume: %s", err))
}
}

return &csi.NodeUnpublishVolumeResponse{}, nil
}

Expand Down

0 comments on commit ac1ccd7

Please sign in to comment.