diff --git a/cmd/kubectl-directpv/drives.go b/cmd/kubectl-directpv/drives.go
index d134919cd..ce6dd876e 100644
--- a/cmd/kubectl-directpv/drives.go
+++ b/cmd/kubectl-directpv/drives.go
@@ -44,6 +44,9 @@ func init() {
drivesCmd.AddCommand(formatDrivesCmd)
drivesCmd.AddCommand(accessTierCmd)
drivesCmd.AddCommand(releaseDrivesCmd)
+ drivesCmd.AddCommand(cordonDrivesCmd)
+ drivesCmd.AddCommand(uncordonDrivesCmd)
+ drivesCmd.AddCommand(moveDriveCmd)
}
func validateDriveSelectors() (err error) {
diff --git a/cmd/kubectl-directpv/drives_cordon.go b/cmd/kubectl-directpv/drives_cordon.go
new file mode 100644
index 000000000..e3e6fe375
--- /dev/null
+++ b/cmd/kubectl-directpv/drives_cordon.go
@@ -0,0 +1,116 @@
+// This file is part of MinIO DirectPV
+// Copyright (c) 2021, 2022 MinIO, Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "strings"
+
+ directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
+ "github.com/minio/directpv/pkg/client"
+ "github.com/minio/directpv/pkg/consts"
+ "github.com/minio/directpv/pkg/drive"
+ "github.com/minio/directpv/pkg/k8s"
+ "github.com/minio/directpv/pkg/types"
+ "github.com/spf13/cobra"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/klog/v2"
+)
+
+var cordonDrivesCmd = &cobra.Command{
+ Use: "cordon",
+ Short: "Cordon drive(s) to prevent any new volumes to be scheduleed",
+ Example: strings.ReplaceAll(
+ `# Cordon all the drives from all the nodes
+ $ kubectl {PLUGIN_NAME} drives cordon --all
+
+ # Cordon all the drives from a particular node
+ $ kubectl {PLUGIN_NAME} drives cordon --node=node1
+
+ # Cordon specific drives from specified nodes
+ $ kubectl {PLUGIN_NAME} drives cordon --node=node1,node2 --drive=/dev/nvme0n1
+
+ # Cordon specific drives from all the nodes filtered by drive ellipsis
+ $ kubectl {PLUGIN_NAME} drives cordon --drive=/dev/sd{a...b}
+
+ # Cordon all the drives from specific nodes filtered by node ellipsis
+ $ kubectl {PLUGIN_NAME} drives cordon --node=node{0...3}
+
+ # Cordon specific drives from specific nodes filtered by the combination of node and drive ellipsis
+ $ kubectl {PLUGIN_NAME} drives cordon --drive /dev/xvd{a...d} --node node{1...4}`,
+ `{PLUGIN_NAME}`,
+ consts.AppName,
+ ),
+ RunE: func(c *cobra.Command, _ []string) error {
+ if !allFlag {
+ if len(driveArgs) == 0 && len(nodeArgs) == 0 && len(accessTierArgs) == 0 {
+ return fmt.Errorf("atleast one of '%s', '%s', '%s' or '%s' must be specified",
+ bold("--all"),
+ bold("--drive"),
+ bold("--node"),
+ bold("--access-tier"),
+ )
+ }
+ }
+ if err := validateDriveSelectors(); err != nil {
+ return err
+ }
+ return cordonDrives(c.Context())
+ },
+}
+
+func init() {
+ cordonDrivesCmd.PersistentFlags().StringSliceVarP(&driveArgs, "drive", "d", driveArgs, "Filter drives to be cordoned by drive path (supports ellipses pattern)")
+ cordonDrivesCmd.PersistentFlags().StringSliceVarP(&nodeArgs, "node", "n", nodeArgs, "Filter drives to be cordoned by nodes (supports ellipses pattern)")
+ cordonDrivesCmd.PersistentFlags().StringSliceVarP(&accessTierArgs, "access-tier", "", accessTierArgs, fmt.Sprintf("Filter drives to be cordoned by access tier set on the drive [%s]", strings.Join(directpvtypes.SupportedAccessTierValues(), ", ")))
+ cordonDrivesCmd.PersistentFlags().BoolVarP(&allFlag, "all", "a", allFlag, "Cordon all the drives from all the nodes")
+}
+
+func cordonDrives(ctx context.Context) error {
+ ctx, cancelFunc := context.WithCancel(ctx)
+ defer cancelFunc()
+ resultCh, err := drive.ListDrives(ctx, nodeSelectors, driveSelectors, accessTierSelectors, k8s.MaxThreadCount)
+ if err != nil {
+ return err
+ }
+ return drive.ProcessDrives(
+ ctx,
+ resultCh,
+ func(drive *types.Drive) bool {
+ if drive.Status.Status == directpvtypes.DriveStatusCordoned {
+ klog.Errorf("%s is already cordoned", bold(drive.Status.Path))
+ return false
+ }
+ if drive.Status.Status != directpvtypes.DriveStatusOK {
+ klog.Errorf("%s is in %s state. only %s drives can be cordoned", bold(drive.Status.Path), bold(drive.Status.Status), bold(directpvtypes.DriveStatusOK))
+ return false
+ }
+ return true
+ },
+ func(drive *types.Drive) error {
+ drive.Status.Status = directpvtypes.DriveStatusCordoned
+ return nil
+ },
+ func(ctx context.Context, drive *types.Drive) error {
+ _, err := client.DriveClient().Update(ctx, drive, metav1.UpdateOptions{})
+ return err
+ },
+ nil,
+ dryRun,
+ )
+}
diff --git a/cmd/kubectl-directpv/drives_list.go b/cmd/kubectl-directpv/drives_list.go
index 6aa21fdb3..4009f057a 100644
--- a/cmd/kubectl-directpv/drives_list.go
+++ b/cmd/kubectl-directpv/drives_list.go
@@ -22,6 +22,7 @@ import (
"os"
"strings"
+ "github.com/fatih/color"
"github.com/jedib0t/go-pretty/v6/table"
"github.com/jedib0t/go-pretty/v6/text"
directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
@@ -128,6 +129,7 @@ func listDrives(ctx context.Context, args []string) error {
if wideOutput {
headers = append(headers, "MODEL")
headers = append(headers, "VENDOR")
+ headers = append(headers, "DRIVE NAME")
}
text.DisableColors()
@@ -145,7 +147,7 @@ func listDrives(ctx context.Context, args []string) error {
for _, drive := range drives {
volumes := "-"
- if len(drive.Finalizers) > 1 {
+ if len(drive.Finalizers) > 1 && (drive.Status.Status == directpvtypes.DriveStatusOK || drive.Status.Status == directpvtypes.DriveStatusCordoned) {
volumes = fmt.Sprintf("%v", len(drive.Finalizers)-1)
}
row := []interface{}{
@@ -159,7 +161,18 @@ func listDrives(ctx context.Context, args []string) error {
return "-"
}(),
volumes,
- drive.Status.Status,
+ func() string {
+ switch drive.Status.Status {
+ case directpvtypes.DriveStatusOK:
+ return color.HiGreenString(string(drive.Status.Status))
+ case directpvtypes.DriveStatusError, directpvtypes.DriveStatusMoved:
+ return color.HiRedString(string(drive.Status.Status))
+ case directpvtypes.DriveStatusCordoned:
+ return color.HiYellowString(string(drive.Status.Status))
+ default:
+ return color.HiWhiteString(string(drive.Status.Status))
+ }
+ }(),
drive.Status.NodeName,
func() string {
if drive.Status.AccessTier == directpvtypes.AccessTierUnknown {
@@ -171,6 +184,7 @@ func listDrives(ctx context.Context, args []string) error {
if wideOutput {
row = append(row, drive.Status.ModelNumber)
row = append(row, drive.Status.Vendor)
+ row = append(row, drive.Name)
}
writer.AppendRow(row)
}
diff --git a/cmd/kubectl-directpv/drives_move.go b/cmd/kubectl-directpv/drives_move.go
new file mode 100644
index 000000000..069a39e49
--- /dev/null
+++ b/cmd/kubectl-directpv/drives_move.go
@@ -0,0 +1,150 @@
+// This file is part of MinIO DirectPV
+// Copyright (c) 2021, 2022 MinIO, Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package main
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "strings"
+
+ directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
+ "github.com/minio/directpv/pkg/client"
+ "github.com/minio/directpv/pkg/consts"
+ "github.com/minio/directpv/pkg/types"
+ "github.com/minio/directpv/pkg/utils"
+ "github.com/minio/directpv/pkg/volume"
+ "github.com/spf13/cobra"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/util/retry"
+ "k8s.io/klog/v2"
+)
+
+var (
+ fromDrive string
+ toDrive string
+
+ errFromFlagNotSet = errors.New("'--from' flag is not set")
+ errToFlagNotSet = errors.New("'--to' flag is not set")
+ errInvalidState = errors.New("invalid state")
+ errCapacityNotAvailable = errors.New("the drive do not have enough capacity to accomodate")
+)
+
+var moveDriveCmd = &cobra.Command{
+ Use: "move",
+ Short: "Move a drive to another drive within the same node (without data)",
+ Example: strings.ReplaceAll(
+ `# Move a drive to another drive
+$ kubectl {PLUGIN_NAME} drives move --from --to `,
+ `{PLUGIN_NAME}`,
+ consts.AppName,
+ ),
+ RunE: func(c *cobra.Command, _ []string) error {
+ if fromDrive == "" {
+ return errFromFlagNotSet
+ }
+ if toDrive == "" {
+ return errToFlagNotSet
+ }
+ return moveDrive(c.Context())
+ },
+}
+
+func init() {
+ moveDriveCmd.PersistentFlags().StringVarP(&fromDrive, "from", "", fromDrive, fmt.Sprintf("the name of the source %s drive that needs to be moved", consts.AppPrettyName))
+ moveDriveCmd.PersistentFlags().StringVarP(&toDrive, "to", "", toDrive, fmt.Sprintf("the name of the target %s drive to which the source has to be moved", consts.AppPrettyName))
+}
+
+func moveDrive(ctx context.Context) error {
+ if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
+ driveClient := client.DriveClient()
+ sourceDrive, err := driveClient.Get(ctx, strings.TrimSpace(fromDrive), metav1.GetOptions{})
+ if err != nil {
+ return err
+ }
+ targetDrive, err := driveClient.Get(ctx, strings.TrimSpace(toDrive), metav1.GetOptions{})
+ if err != nil {
+ return err
+ }
+ if err := validateMoveRequest(sourceDrive, targetDrive); err != nil {
+ return err
+ }
+ if err := move(ctx, sourceDrive, targetDrive); err != nil {
+ return err
+ }
+ targetDrive.Status.Status = directpvtypes.DriveStatusMoving
+ _, err = driveClient.Update(
+ ctx, targetDrive, metav1.UpdateOptions{TypeMeta: types.NewDriveTypeMeta()},
+ )
+ return err
+ }); err != nil {
+ return err
+ }
+ return nil
+}
+
+func validateMoveRequest(sourceDrive, targetDrive *types.Drive) error {
+ if !(sourceDrive.IsCordoned() || sourceDrive.IsLost()) || !targetDrive.IsCordoned() {
+ klog.Error("please make sure both the source and target drives are cordoned")
+ return errInvalidState
+ }
+ if sourceDrive.Status.NodeName != targetDrive.Status.NodeName {
+ klog.Error("both the source and target drives should be from same node")
+ return errInvalidState
+ }
+ if sourceDrive.Status.AccessTier != targetDrive.Status.AccessTier {
+ klog.Error("the source and target drives does not belong to the same access-tier")
+ return errInvalidState
+ }
+ return nil
+}
+
+func move(ctx context.Context, sourceDrive, targetDrive *types.Drive) error {
+ selector, err := getDriveNameSelectors([]string{sourceDrive.Name})
+ if err != nil {
+ return err
+ }
+ volumes, err := volume.GetVolumeList(ctx, nil, nil, nil, nil, selector)
+ if err != nil {
+ return err
+ }
+ for _, volume := range volumes {
+ if volume.Status.DriveName != sourceDrive.Name {
+ klog.Infof("invalid drive name %s found in volume %s", volume.Status.DriveName, volume.Name)
+ return errInvalidState
+ }
+ if volume.Status.NodeName != sourceDrive.Status.NodeName {
+ klog.Infof("invalid node name %s found in volume %s", volume.Status.NodeName, volume.Name)
+ return errInvalidState
+ }
+ if volume.IsPublished() {
+ klog.Info("please make sure all the volumes in the source drive are not inuse")
+ return fmt.Errorf("volume %s is still published", volume.Name)
+ }
+ if targetDrive.Status.FreeCapacity < volume.Status.TotalCapacity {
+ klog.Info("the target drive cannot accomodate the volumes from the source")
+ return errCapacityNotAvailable
+ }
+ finalizer := consts.DriveFinalizerPrefix + volume.Name
+ if !utils.ItemIn(targetDrive.Finalizers, finalizer) {
+ targetDrive.Status.FreeCapacity -= volume.Status.TotalCapacity
+ targetDrive.Status.AllocatedCapacity += volume.Status.TotalCapacity
+ targetDrive.SetFinalizers(append(targetDrive.GetFinalizers(), finalizer))
+ }
+ }
+ return nil
+}
diff --git a/cmd/kubectl-directpv/drives_release.go b/cmd/kubectl-directpv/drives_release.go
index e997012d6..4afc7c939 100644
--- a/cmd/kubectl-directpv/drives_release.go
+++ b/cmd/kubectl-directpv/drives_release.go
@@ -96,13 +96,17 @@ func releaseDrives(ctx context.Context) error {
klog.Errorf("%s already in 'released' state", bold(drive.Status.Path))
return false
}
- if len(drive.Finalizers) > 1 {
+ if drive.Status.Status != directpvtypes.DriveStatusMoved && len(drive.Finalizers) > 1 {
klog.Errorf("%s has volumes. please purge them before releasing", bold(drive.Status.Path))
return false
}
return true
},
func(drive *types.Drive) error {
+ if drive.Status.Status == directpvtypes.DriveStatusMoved {
+ // Moved drives won't have any volumes allocated, simply empty them and proceed.
+ drive.SetFinalizers([]string{consts.DriveFinalizerDataProtection})
+ }
drive.Status.Status = directpvtypes.DriveStatusReleased
return nil
},
diff --git a/cmd/kubectl-directpv/drives_uncordon.go b/cmd/kubectl-directpv/drives_uncordon.go
new file mode 100644
index 000000000..0bc7fa8d5
--- /dev/null
+++ b/cmd/kubectl-directpv/drives_uncordon.go
@@ -0,0 +1,107 @@
+// This file is part of MinIO DirectPV
+// Copyright (c) 2021, 2022 MinIO, Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "strings"
+
+ directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
+ "github.com/minio/directpv/pkg/client"
+ "github.com/minio/directpv/pkg/consts"
+ "github.com/minio/directpv/pkg/drive"
+ "github.com/minio/directpv/pkg/k8s"
+ "github.com/minio/directpv/pkg/types"
+ "github.com/spf13/cobra"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+var uncordonDrivesCmd = &cobra.Command{
+ Use: "uncordon",
+ Short: "Uncordon drives to resume scheduling of new volumes",
+ Example: strings.ReplaceAll(
+ `# Uncordon all the drives from all the nodes
+$ kubectl {PLUGIN_NAME} drives uncordon --all
+
+# Uncordon all the drives from a particular node
+$ kubectl {PLUGIN_NAME} drives uncordon --node=node1
+
+# Uncordon specific drives from specified nodes
+$ kubectl {PLUGIN_NAME} drives uncordon --node=node1,node2 --drive=/dev/nvme0n1
+
+# Uncordon specific drives from all the nodes filtered by drive ellipsis
+$ kubectl {PLUGIN_NAME} drives uncordon --drive=/dev/sd{a...b}
+
+# Uncordon all the drives from specific nodes filtered by node ellipsis
+$ kubectl {PLUGIN_NAME} drives uncordon --node=node{0...3}
+
+# Uncordon specific drives from specific nodes filtered by the combination of node and drive ellipsis
+$ kubectl {PLUGIN_NAME} drives uncordon --drive /dev/xvd{a...d} --node node{1...4}`,
+ `{PLUGIN_NAME}`,
+ consts.AppName,
+ ),
+ RunE: func(c *cobra.Command, _ []string) error {
+ if !allFlag {
+ if len(driveArgs) == 0 && len(nodeArgs) == 0 && len(accessTierArgs) == 0 {
+ return fmt.Errorf("atleast one of '%s', '%s', '%s' or '%s' must be specified",
+ bold("--all"),
+ bold("--drive"),
+ bold("--node"),
+ bold("--access-tier"),
+ )
+ }
+ }
+ if err := validateDriveSelectors(); err != nil {
+ return err
+ }
+ return uncordonDrives(c.Context())
+ },
+}
+
+func init() {
+ uncordonDrivesCmd.PersistentFlags().StringSliceVarP(&driveArgs, "drive", "d", driveArgs, "Uncordon drives filtered by drive path (supports ellipses pattern)")
+ uncordonDrivesCmd.PersistentFlags().StringSliceVarP(&nodeArgs, "node", "n", nodeArgs, "Uncordon drives filtered by nodes (supports ellipses pattern)")
+ uncordonDrivesCmd.PersistentFlags().StringSliceVarP(&accessTierArgs, "access-tier", "", accessTierArgs, fmt.Sprintf("Uncordon drives filtered by access tier set on the drive [%s]", strings.Join(directpvtypes.SupportedAccessTierValues(), ", ")))
+ uncordonDrivesCmd.PersistentFlags().BoolVarP(&allFlag, "all", "a", allFlag, "Uncordon all the drives from all the nodes")
+}
+
+func uncordonDrives(ctx context.Context) error {
+ ctx, cancelFunc := context.WithCancel(ctx)
+ defer cancelFunc()
+ resultCh, err := drive.ListDrives(ctx, nodeSelectors, driveSelectors, accessTierSelectors, k8s.MaxThreadCount)
+ if err != nil {
+ return err
+ }
+ return drive.ProcessDrives(
+ ctx,
+ resultCh,
+ func(drive *types.Drive) bool {
+ return drive.Status.Status == directpvtypes.DriveStatusCordoned
+ },
+ func(drive *types.Drive) error {
+ drive.Status.Status = directpvtypes.DriveStatusOK
+ return nil
+ },
+ func(ctx context.Context, drive *types.Drive) error {
+ _, err := client.DriveClient().Update(ctx, drive, metav1.UpdateOptions{})
+ return err
+ },
+ nil,
+ dryRun,
+ )
+}
diff --git a/cmd/kubectl-directpv/info.go b/cmd/kubectl-directpv/info.go
index 72488e8fa..5c19d7616 100644
--- a/cmd/kubectl-directpv/info.go
+++ b/cmd/kubectl-directpv/info.go
@@ -28,6 +28,7 @@ import (
"github.com/fatih/color"
"github.com/jedib0t/go-pretty/table"
"github.com/jedib0t/go-pretty/text"
+ directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/consts"
"github.com/minio/directpv/pkg/drive"
"github.com/minio/directpv/pkg/k8s"
@@ -140,7 +141,7 @@ func getInfo(ctx context.Context, args []string) error {
return err
}
- volumes, err := volume.GetVolumeList(ctx, nil, nil, nil, nil)
+ volumes, err := volume.GetVolumeList(ctx, nil, nil, nil, nil, nil)
if err != nil {
if !quiet {
klog.ErrorS(err, "unable to get volume list")
@@ -154,22 +155,27 @@ func getInfo(ctx context.Context, args []string) error {
var totalDriveSize uint64
var totalVolumeSize uint64
+ var totalDriveCount int
for _, n := range nodeList {
driveCount := 0
driveSize := uint64(0)
for _, d := range drives {
+ if d.Status.Status != directpvtypes.DriveStatusOK {
+ continue
+ }
if d.Status.NodeName == n {
driveCount++
driveSize += uint64(d.Status.TotalCapacity)
}
}
totalDriveSize += driveSize
+ totalDriveCount += driveCount
volumeCount := 0
volumeSize := uint64(0)
for _, v := range volumes {
if v.Status.NodeName == n {
- if v.Status.IsPublished() {
+ if v.IsPublished() {
volumeCount++
volumeSize += uint64(v.Status.TotalCapacity)
}
@@ -210,7 +216,7 @@ func getInfo(ctx context.Context, args []string) error {
humanize.IBytes(totalVolumeSize),
humanize.IBytes(totalDriveSize),
color.HiWhiteString("%d", len(volumes)),
- color.HiWhiteString("%d", len(drives)),
+ color.HiWhiteString("%d", totalDriveCount),
)
}
}
diff --git a/cmd/kubectl-directpv/utils.go b/cmd/kubectl-directpv/utils.go
index 3cf8be354..1d3ee9c9e 100644
--- a/cmd/kubectl-directpv/utils.go
+++ b/cmd/kubectl-directpv/utils.go
@@ -233,6 +233,7 @@ func processFilteredVolumes(
driveSelectors,
podNameSelectors,
podNSSelectors,
+ nil,
k8s.MaxThreadCount)
if err != nil {
return err
diff --git a/cmd/kubectl-directpv/volumes.go b/cmd/kubectl-directpv/volumes.go
index 3ae856c9e..7465e067f 100644
--- a/cmd/kubectl-directpv/volumes.go
+++ b/cmd/kubectl-directpv/volumes.go
@@ -17,7 +17,9 @@
package main
import (
+ "errors"
"fmt"
+ "strings"
"github.com/minio/directpv/pkg/consts"
"github.com/minio/directpv/pkg/types"
@@ -25,6 +27,10 @@ import (
"github.com/spf13/cobra"
)
+var (
+ errEmptyValue = errors.New("empty value provided")
+)
+
var (
podNameArgs []string
podNSArgs []string
@@ -66,6 +72,17 @@ func getPodNamespaceSelectors() ([]types.LabelValue, error) {
return getSelectorValues(podNSArgs)
}
+func getDriveNameSelectors(args []string) (values []types.LabelValue, err error) {
+ for i := range args {
+ trimmed := strings.TrimSpace(args[i])
+ if trimmed == "" {
+ return nil, errEmptyValue
+ }
+ values = append(values, types.NewLabelValue(trimmed))
+ }
+ return
+}
+
func validateVolumeSelectors() (err error) {
if driveSelectors, err = getDriveSelectors(); err != nil {
return err
diff --git a/cmd/kubectl-directpv/volumes_list.go b/cmd/kubectl-directpv/volumes_list.go
index c7a1963c3..26aa92371 100644
--- a/cmd/kubectl-directpv/volumes_list.go
+++ b/cmd/kubectl-directpv/volumes_list.go
@@ -106,6 +106,7 @@ func listVolumes(ctx context.Context, args []string) error {
driveSelectors,
podNameSelectors,
podNSSelectors,
+ nil,
k8s.MaxThreadCount,
)
if err != nil {
@@ -118,7 +119,7 @@ func listVolumes(ctx context.Context, args []string) error {
return result.Err
}
- if allFlag || (stagedFlag && result.Volume.Status.IsStaged()) || result.Volume.Status.IsPublished() {
+ if allFlag || (stagedFlag && result.Volume.IsStaged()) || result.Volume.IsPublished() {
volumes = append(volumes, result.Volume)
}
}
@@ -184,11 +185,11 @@ func listVolumes(ctx context.Context, args []string) error {
status := "-"
switch {
- case volume.Status.IsDriveLost():
+ case volume.IsDriveLost():
status = "Lost"
- case volume.Status.IsPublished():
+ case volume.IsPublished():
status = "Published"
- case volume.Status.IsStaged():
+ case volume.IsStaged():
status = "Staged"
}
row = append(row, status)
diff --git a/cmd/kubectl-directpv/volumes_purge_test.go b/cmd/kubectl-directpv/volumes_purge_test.go
index cb25a2399..3a80e6577 100644
--- a/cmd/kubectl-directpv/volumes_purge_test.go
+++ b/cmd/kubectl-directpv/volumes_purge_test.go
@@ -87,7 +87,7 @@ func TestVolumesPurge(t *testing.T) {
t.Fatal(err)
}
- volumes, err := volume.GetVolumeList(ctx, nil, nil, nil, nil)
+ volumes, err := volume.GetVolumeList(ctx, nil, nil, nil, nil, nil)
if err != nil {
t.Fatal(err)
}
diff --git a/pkg/admin/node-api-server.go b/pkg/admin/node-api-server.go
index 6722577ed..f33396647 100644
--- a/pkg/admin/node-api-server.go
+++ b/pkg/admin/node-api-server.go
@@ -333,7 +333,7 @@ func (n *nodeAPIHandler) format(ctx context.Context, device FormatDevice) (forma
}
formatStatus.FSUUID = fsuuid
// Mount the device
- mountTarget := path.Join(consts.MountRootDir, fsuuid)
+ mountTarget := types.GetDriveMountDir(fsuuid)
err = n.mountDevice(device.Path(), mountTarget)
if err != nil {
klog.Errorf("failed to mount drive %s; %w", device.Name, err)
@@ -360,7 +360,7 @@ func (n *nodeAPIHandler) format(ctx context.Context, device FormatDevice) (forma
return
}
// Create symbolic link
- if err := os.Symlink(mountTarget, path.Join(mountTarget, fsuuid)); err != nil {
+ if err := os.Symlink(mountTarget, types.GetVolumeRootDir(fsuuid)); err != nil {
klog.Errorf("failed to create symlink for target %s. device: %s err: %s", mountTarget, device.Name, err.Error())
formatStatus.setErr(err, "failed to create symlink", formatRetrySuggestion)
}
diff --git a/pkg/apis/directpv.min.io/types/types.go b/pkg/apis/directpv.min.io/types/types.go
index b760aa931..7910116fa 100644
--- a/pkg/apis/directpv.min.io/types/types.go
+++ b/pkg/apis/directpv.min.io/types/types.go
@@ -42,6 +42,15 @@ const (
// DriveStatusReleased denotes drive is released.
DriveStatusReleased DriveStatus = "Released"
+
+ // DriveStatusCordoned denotes drive is crdoned.
+ DriveStatusCordoned DriveStatus = "Cordoned"
+
+ // DriveStatusMoving denotes that the drive is processing the move request
+ DriveStatusMoving DriveStatus = "Moving"
+
+ // DriveStatusMoved denotes that the drive is moved to a target drive
+ DriveStatusMoved DriveStatus = "Moved"
)
// AccessTier denotes access tier.
diff --git a/pkg/apis/directpv.min.io/v1beta1/drive.go b/pkg/apis/directpv.min.io/v1beta1/drive.go
index 9b1173de9..ecfa2237e 100644
--- a/pkg/apis/directpv.min.io/v1beta1/drive.go
+++ b/pkg/apis/directpv.min.io/v1beta1/drive.go
@@ -58,6 +58,10 @@ func (drive DirectPVDrive) IsLost() bool {
return drive.Status.Status == types.DriveStatusLost
}
+func (drive DirectPVDrive) IsCordoned() bool {
+ return drive.Status.Status == types.DriveStatusCordoned
+}
+
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// DirectPVDriveList denotes list of drives.
diff --git a/pkg/apis/directpv.min.io/v1beta1/volume.go b/pkg/apis/directpv.min.io/v1beta1/volume.go
index e61c8416a..5d22143ce 100644
--- a/pkg/apis/directpv.min.io/v1beta1/volume.go
+++ b/pkg/apis/directpv.min.io/v1beta1/volume.go
@@ -40,16 +40,31 @@ type DirectPVVolumeStatus struct {
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,1,rep,name=conditions"`
}
-func (status DirectPVVolumeStatus) IsStaged() bool {
- return status.StagingTargetPath != ""
+// +genclient
+// +genclient:nonNamespaced
+// +kubebuilder:resource:scope=Cluster
+// +kubebuilder:storageversion
+// +k8s:openapi-gen=true
+// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
+
+// DirectPVVolume denotes volume CRD object.
+type DirectPVVolume struct {
+ metav1.TypeMeta `json:",inline"`
+ metav1.ObjectMeta `json:"metadata"`
+
+ Status DirectPVVolumeStatus `json:"status"`
+}
+
+func (volume DirectPVVolume) IsStaged() bool {
+ return volume.Status.StagingTargetPath != ""
}
-func (status DirectPVVolumeStatus) IsPublished() bool {
- return status.TargetPath != ""
+func (volume DirectPVVolume) IsPublished() bool {
+ return volume.Status.TargetPath != ""
}
-func (status DirectPVVolumeStatus) IsDriveLost() bool {
- for _, condition := range status.Conditions {
+func (volume DirectPVVolume) IsDriveLost() bool {
+ for _, condition := range volume.Status.Conditions {
if condition.Type == string(types.VolumeConditionTypeLost) &&
condition.Status == metav1.ConditionTrue &&
condition.Reason == string(types.VolumeConditionReasonDriveLost) &&
@@ -61,7 +76,7 @@ func (status DirectPVVolumeStatus) IsDriveLost() bool {
return false
}
-func (status *DirectPVVolumeStatus) SetDriveLost() {
+func (volume *DirectPVVolume) SetDriveLost() {
c := metav1.Condition{
Type: string(types.VolumeConditionTypeLost),
Status: metav1.ConditionTrue,
@@ -70,33 +85,18 @@ func (status *DirectPVVolumeStatus) SetDriveLost() {
LastTransitionTime: metav1.Now(),
}
updated := false
- for i := range status.Conditions {
- if status.Conditions[i].Type == string(types.VolumeConditionTypeLost) {
- status.Conditions[i] = c
+ for i := range volume.Status.Conditions {
+ if volume.Status.Conditions[i].Type == string(types.VolumeConditionTypeLost) {
+ volume.Status.Conditions[i] = c
updated = true
break
}
}
if !updated {
- status.Conditions = append(status.Conditions, c)
+ volume.Status.Conditions = append(volume.Status.Conditions, c)
}
}
-// +genclient
-// +genclient:nonNamespaced
-// +kubebuilder:resource:scope=Cluster
-// +kubebuilder:storageversion
-// +k8s:openapi-gen=true
-// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
-
-// DirectPVVolume denotes volume CRD object.
-type DirectPVVolume struct {
- metav1.TypeMeta `json:",inline"`
- metav1.ObjectMeta `json:"metadata"`
-
- Status DirectPVVolumeStatus `json:"status"`
-}
-
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// DirectPVVolumeList denotes list of volumes.
diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go
index 5646d6d88..682c96612 100644
--- a/pkg/controller/controller.go
+++ b/pkg/controller/controller.go
@@ -192,7 +192,7 @@ func (c *Server) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
labels := map[types.LabelKey]types.LabelValue{
types.NodeLabelKey: types.NewLabelValue(drive.Status.NodeName),
- types.DrivePathLabelKey: types.NewLabelValue(drive.Status.Path),
+ types.DrivePathLabelKey: types.NewLabelValue(utils.TrimDevPrefix(drive.Status.Path)),
types.DriveLabelKey: types.NewLabelValue(drive.Name),
types.VersionLabelKey: types.NewLabelValue(consts.LatestAPIVersion),
types.CreatedByLabelKey: types.NewLabelValue(consts.ControllerName),
@@ -280,7 +280,7 @@ func (c *Server) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest)
return nil, status.Errorf(codes.NotFound, "could not retrieve volume [%s]: %v", volumeID, err)
}
- if volume.Status.IsStaged() || volume.Status.IsPublished() {
+ if volume.IsStaged() || volume.IsPublished() {
return nil, status.Errorf(codes.FailedPrecondition,
"waiting for volume [%s] to be unstaged before deleting", volumeID)
}
diff --git a/pkg/drive/drive.go b/pkg/drive/drive.go
index df58bcfc4..ae7a063f1 100644
--- a/pkg/drive/drive.go
+++ b/pkg/drive/drive.go
@@ -86,7 +86,7 @@ func DeleteDrive(ctx context.Context, drive *types.Drive, force bool) error {
return err
}
- volume.Status.SetDriveLost()
+ volume.SetDriveLost()
_, err = client.VolumeClient().Update(
ctx, volume, metav1.UpdateOptions{TypeMeta: types.NewVolumeTypeMeta()},
)
diff --git a/pkg/drive/event.go b/pkg/drive/event.go
index 66521afdb..88aa5c487 100644
--- a/pkg/drive/event.go
+++ b/pkg/drive/event.go
@@ -18,9 +18,11 @@ package drive
import (
"context"
+ "errors"
"fmt"
"os"
"path"
+ "strings"
directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/client"
@@ -29,22 +31,32 @@ import (
"github.com/minio/directpv/pkg/sys"
"github.com/minio/directpv/pkg/types"
"github.com/minio/directpv/pkg/utils"
+ "github.com/minio/directpv/pkg/volume"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
+ "k8s.io/klog/v2"
+)
+
+var (
+ errDriveInUse = errors.New("drive still has volumes in-use")
)
type driveEventHandler struct {
- nodeID string
- getMounts func() (mountPointMap, deviceMap map[string][]string, err error)
- unmount func(target string, force, detach, expire bool) error
+ nodeID string
+ getMounts func() (mountPointMap, deviceMap map[string][]string, err error)
+ unmount func(target string, force, detach, expire bool) error
+ safeUnmount func(target string, force, detach, expire bool) error
+ stageVolume func(ctx context.Context, volume *types.Volume) error
}
func newDriveEventHandler(nodeID string) *driveEventHandler {
return &driveEventHandler{
- nodeID: nodeID,
- getMounts: sys.GetMounts,
- unmount: sys.Unmount,
+ nodeID: nodeID,
+ getMounts: sys.GetMounts,
+ unmount: sys.Unmount,
+ safeUnmount: sys.SafeUnmount,
+ stageVolume: volume.Stage,
}
}
@@ -70,8 +82,112 @@ func (handler *driveEventHandler) Handle(ctx context.Context, args listener.Even
}
func (handler *driveEventHandler) handleUpdate(ctx context.Context, drive *types.Drive) error {
- if drive.Status.Status == directpvtypes.DriveStatusReleased {
+ switch drive.Status.Status {
+ case directpvtypes.DriveStatusReleased:
return handler.release(ctx, drive)
+ case directpvtypes.DriveStatusMoving:
+ return handler.move(ctx, drive)
+ default:
+ return nil
+ }
+}
+
+// move processes the move request by staging the moved target volumes
+func (handler *driveEventHandler) move(ctx context.Context, drive *types.Drive) error {
+ driveClient := client.DriveClient()
+ volumeClient := client.VolumeClient()
+ finalizers := drive.GetFinalizers()
+ sourceDriveName := ""
+ for _, finalizer := range finalizers {
+ if finalizer == consts.DriveFinalizerDataProtection {
+ continue
+ }
+ volumeName := strings.TrimPrefix(finalizer, consts.DriveFinalizerPrefix)
+ volume, err := volumeClient.Get(ctx, volumeName, metav1.GetOptions{})
+ if err != nil {
+ klog.Errorf("unable to retrieve volume %s: %v", volume.Name, err)
+ return err
+ }
+ if sourceDriveName == "" {
+ sourceDriveName = volume.Status.DriveName
+ }
+ if volume.IsPublished() {
+ klog.Errorf("drive still has published volume: %s", volume.Name)
+ return errDriveInUse
+ }
+ if err := handler.unmountVolume(ctx, volume); err != nil {
+ klog.Errorf("unable to umount volume: %s: %v", volume.Name, err)
+ return err
+ }
+ volume.Status.FSUUID = drive.Status.FSUUID
+ volume.Status.DriveName = drive.Name
+ if volume.IsStaged() {
+ // Stage the volume if the volume in the source is staged already
+ volume.Status.DataPath = types.GetVolumeDir(volume.Status.FSUUID, volume.Name)
+ if err := handler.stageVolume(ctx, volume); err != nil {
+ klog.ErrorS(err, "unable to stage volume",
+ "volume", volume.Name,
+ "dataPath", volume.Status.DataPath,
+ "stagingTargetPath", volume.Status.StagingTargetPath,
+ )
+ return err
+ }
+ }
+ // update the volume
+ types.UpdateLabels(volume, map[types.LabelKey]types.LabelValue{
+ types.DrivePathLabelKey: types.NewLabelValue(utils.TrimDevPrefix(drive.Status.Path)),
+ types.DriveLabelKey: types.NewLabelValue(drive.Name),
+ })
+ if _, err := volumeClient.Update(ctx, volume, metav1.UpdateOptions{
+ TypeMeta: types.NewVolumeTypeMeta(),
+ }); err != nil {
+ return err
+ }
+ }
+ // update the source drive's status as "Moved"
+ sourceDrive, err := driveClient.Get(ctx, sourceDriveName, metav1.GetOptions{})
+ if err != nil {
+ klog.ErrorS(err, "unable to get the source drive",
+ "sourcerive", sourceDrive.Name,
+ )
+ return err
+ }
+ sourceDrive.Status.Status = directpvtypes.DriveStatusMoved
+ if _, err := driveClient.Update(ctx, sourceDrive, metav1.UpdateOptions{
+ TypeMeta: types.NewDriveTypeMeta(),
+ }); err != nil {
+ return err
+ }
+ // Revert the status back to "Cordoned" as the transfer is successful
+ drive.Status.Status = directpvtypes.DriveStatusCordoned
+ _, err = driveClient.Update(ctx, drive, metav1.UpdateOptions{
+ TypeMeta: types.NewDriveTypeMeta(),
+ })
+ return err
+}
+
+func (handler *driveEventHandler) unmountVolume(ctx context.Context, volume *types.Volume) error {
+ if volume.Status.TargetPath != "" {
+ if err := handler.safeUnmount(volume.Status.TargetPath, true, true, false); err != nil {
+ if _, ok := err.(*os.PathError); !ok {
+ klog.ErrorS(err, "unable to unmount container path",
+ "volume", volume.Name,
+ "targetPath", volume.Status.TargetPath,
+ )
+ return err
+ }
+ }
+ }
+ if volume.Status.StagingTargetPath != "" {
+ if err := handler.safeUnmount(volume.Status.StagingTargetPath, true, true, false); err != nil {
+ if _, ok := err.(*os.PathError); !ok {
+ klog.ErrorS(err, "unable to unmount staging path",
+ "volume", volume.Name,
+ "StagingTargetPath", volume.Status.StagingTargetPath,
+ )
+ return err
+ }
+ }
}
return nil
}
diff --git a/pkg/installer/crd.go b/pkg/installer/crd.go
index e912ad951..0b0d03a45 100644
--- a/pkg/installer/crd.go
+++ b/pkg/installer/crd.go
@@ -32,7 +32,7 @@ func removeVolumes(ctx context.Context, c *Config) error {
ctx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
- resultCh, err := volume.ListVolumes(ctx, nil, nil, nil, nil, k8s.MaxThreadCount)
+ resultCh, err := volume.ListVolumes(ctx, nil, nil, nil, nil, nil, k8s.MaxThreadCount)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
diff --git a/pkg/metrics/collector.go b/pkg/metrics/collector.go
index e0fd42165..78198d676 100644
--- a/pkg/metrics/collector.go
+++ b/pkg/metrics/collector.go
@@ -113,6 +113,7 @@ func (c *metricsCollector) Collect(ch chan<- prometheus.Metric) {
nil,
nil,
nil,
+ nil,
k8s.MaxThreadCount,
)
if err != nil {
diff --git a/pkg/node/publish_unpublish.go b/pkg/node/publish_unpublish.go
index 1aa10b101..19ef98ad2 100644
--- a/pkg/node/publish_unpublish.go
+++ b/pkg/node/publish_unpublish.go
@@ -179,7 +179,7 @@ func (server *Server) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
return nil, status.Error(codes.NotFound, err.Error())
}
- if !volume.Status.IsPublished() {
+ if !volume.IsPublished() {
return nil, status.Error(codes.NotFound, fmt.Sprintf("unpublish is called without publish for volume %v", volume.Name))
}
diff --git a/pkg/node/server.go b/pkg/node/server.go
index 204ffd993..b8e2dd7c5 100644
--- a/pkg/node/server.go
+++ b/pkg/node/server.go
@@ -26,6 +26,7 @@ import (
"github.com/minio/directpv/pkg/metrics"
"github.com/minio/directpv/pkg/sys"
"github.com/minio/directpv/pkg/types"
+ "github.com/minio/directpv/pkg/volume"
"github.com/minio/directpv/pkg/xfs"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -49,6 +50,7 @@ type Server struct {
getQuota func(ctx context.Context, device, volumeID string) (quota *xfs.Quota, err error)
setQuota func(ctx context.Context, device, path, volumeID string, quota xfs.Quota) (err error)
mkdir func(path string, perm os.FileMode) error
+ stageVolume func(ctx context.Context, volume *types.Volume) error
}
// NewServer creates node server.
@@ -70,6 +72,7 @@ func NewServer(ctx context.Context,
getQuota: xfs.GetQuota,
setQuota: xfs.SetQuota,
mkdir: os.Mkdir,
+ stageVolume: volume.Stage,
}
go metrics.ServeMetrics(ctx, nodeID, metricsPort)
diff --git a/pkg/node/stage_unstage.go b/pkg/node/stage_unstage.go
index 255e64681..9baef6a5a 100644
--- a/pkg/node/stage_unstage.go
+++ b/pkg/node/stage_unstage.go
@@ -18,17 +18,13 @@ package node
import (
"context"
- "errors"
"fmt"
- "os"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/minio/directpv/pkg/client"
"github.com/minio/directpv/pkg/types"
- "github.com/minio/directpv/pkg/xfs"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
@@ -57,47 +53,11 @@ func (server *Server) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return nil, status.Error(codes.NotFound, err.Error())
}
- volumeDir := types.GetVolumeDir(volume.Status.FSUUID, volumeID)
- if err := server.mkdir(volumeDir, 0o755); err != nil && !errors.Is(err, os.ErrExist) {
- // FIXME: handle I/O error and mark associated drive's status as ERROR.
- klog.ErrorS(err, "unable to create volume directory", "VolumeDir", volumeDir)
- return nil, err
- }
-
- if err := server.bindMount(volumeDir, stagingTargetPath, false); err != nil {
- return nil, status.Errorf(codes.Internal, "unable to bind mount volume directory to staging target path; %v", err)
- }
-
- quota := xfs.Quota{
- HardLimit: uint64(volume.Status.TotalCapacity),
- SoftLimit: uint64(volume.Status.TotalCapacity),
- }
-
- device, err := server.getDeviceByFSUUID(volume.Status.FSUUID)
- if err != nil {
- klog.ErrorS(
- err,
- "unable to find device by FSUUID; "+
- "either device is removed or run command "+
- "`sudo udevadm control --reload-rules && sudo udevadm trigger`"+
- "on the host to reload",
- "FSUUID", volume.Status.FSUUID)
- client.Eventf(
- volume, corev1.EventTypeWarning, "NodeStageVolume",
- "unable to find device by FSUUID %v; "+
- "either device is removed or run command "+
- "`sudo udevadm control --reload-rules && sudo udevadm trigger`"+
- " on the host to reload", volume.Status.FSUUID)
- return nil, status.Errorf(codes.Internal, "Unable to find device by FSUUID %v; %v", volume.Status.FSUUID, err)
- }
-
- if err := server.setQuota(ctx, device, stagingTargetPath, volumeID, quota); err != nil {
- klog.ErrorS(err, "unable to set quota on staging target path", "StagingTargetPath", stagingTargetPath)
- return nil, status.Errorf(codes.Internal, "unable to set quota on staging target path; %v", err)
- }
-
- volume.Status.DataPath = volumeDir
+ volume.Status.DataPath = types.GetVolumeDir(volume.Status.FSUUID, volume.Name)
volume.Status.StagingTargetPath = stagingTargetPath
+ if err := server.stageVolume(ctx, volume); err != nil {
+ return nil, status.Error(codes.Internal, err.Error())
+ }
if _, err := client.VolumeClient().Update(ctx, volume, metav1.UpdateOptions{
TypeMeta: types.NewVolumeTypeMeta(),
@@ -133,7 +93,7 @@ func (server *Server) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
return nil, status.Error(codes.Internal, err.Error())
}
- if !volume.Status.IsStaged() {
+ if !volume.IsStaged() {
return nil, status.Error(codes.NotFound, fmt.Sprintf("unstage is called without stage for volume %v", volume.Name))
}
diff --git a/pkg/volume/event.go b/pkg/volume/event.go
index 4d03069d5..fe7cca948 100644
--- a/pkg/volume/event.go
+++ b/pkg/volume/event.go
@@ -104,6 +104,7 @@ func (handler *volumeEventHandler) delete(ctx context.Context, volume *types.Vol
deletedDir := volume.Status.DataPath + ".deleted"
if err := os.Rename(volume.Status.DataPath, deletedDir); err != nil && !errors.Is(err, os.ErrNotExist) {
+ // FIXME: Also handle input/output error
klog.ErrorS(
err,
"unable to rename data path to deleted data path",
diff --git a/pkg/volume/list.go b/pkg/volume/list.go
index 8e7835cfe..0a539e5e3 100644
--- a/pkg/volume/list.go
+++ b/pkg/volume/list.go
@@ -38,12 +38,13 @@ type ListVolumeResult struct {
}
// ListVolumes lists volumes.
-func ListVolumes(ctx context.Context, nodes, drives, podNames, podNSs []types.LabelValue, maxObjects int64) (<-chan ListVolumeResult, error) {
+func ListVolumes(ctx context.Context, nodes, drives, podNames, podNSs, driveNames []types.LabelValue, maxObjects int64) (<-chan ListVolumeResult, error) {
labelMap := map[types.LabelKey][]types.LabelValue{
types.DrivePathLabelKey: drives,
types.NodeLabelKey: nodes,
types.PodNameLabelKey: podNames,
types.PodNSLabelKey: podNSs,
+ types.DriveLabelKey: driveNames,
}
labelSelector := types.ToLabelSelector(labelMap)
@@ -91,8 +92,8 @@ func ListVolumes(ctx context.Context, nodes, drives, podNames, podNSs []types.La
}
// GetVolumeList gets list of volumes.
-func GetVolumeList(ctx context.Context, nodes, drives, podNames, podNSs []types.LabelValue) ([]types.Volume, error) {
- resultCh, err := ListVolumes(ctx, nodes, drives, podNames, podNSs, k8s.MaxThreadCount)
+func GetVolumeList(ctx context.Context, nodes, drives, podNames, podNSs, driveNames []types.LabelValue) ([]types.Volume, error) {
+ resultCh, err := ListVolumes(ctx, nodes, drives, podNames, podNSs, driveNames, k8s.MaxThreadCount)
if err != nil {
return nil, err
}
diff --git a/pkg/volume/list_test.go b/pkg/volume/list_test.go
index 455d0ba5b..08c55dc36 100644
--- a/pkg/volume/list_test.go
+++ b/pkg/volume/list_test.go
@@ -33,7 +33,7 @@ func TestGetVolumeList(t *testing.T) {
client.SetDriveInterface(clientset.DirectpvLatest().DirectPVDrives())
client.SetVolumeInterface(clientset.DirectpvLatest().DirectPVVolumes())
- volumes, err := GetVolumeList(context.TODO(), nil, nil, nil, nil)
+ volumes, err := GetVolumeList(context.TODO(), nil, nil, nil, nil, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -52,7 +52,7 @@ func TestGetVolumeList(t *testing.T) {
client.SetDriveInterface(clientset.DirectpvLatest().DirectPVDrives())
client.SetVolumeInterface(clientset.DirectpvLatest().DirectPVVolumes())
- volumes, err = GetVolumeList(context.TODO(), nil, nil, nil, nil)
+ volumes, err = GetVolumeList(context.TODO(), nil, nil, nil, nil, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -66,7 +66,7 @@ func TestGetSortedVolumeList(t *testing.T) {
client.SetDriveInterface(clientset.DirectpvLatest().DirectPVDrives())
client.SetVolumeInterface(clientset.DirectpvLatest().DirectPVVolumes())
- volumes, err := GetVolumeList(context.TODO(), nil, nil, nil, nil)
+ volumes, err := GetVolumeList(context.TODO(), nil, nil, nil, nil, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -95,7 +95,7 @@ func TestGetSortedVolumeList(t *testing.T) {
client.SetDriveInterface(clientset.DirectpvLatest().DirectPVDrives())
client.SetVolumeInterface(clientset.DirectpvLatest().DirectPVVolumes())
- volumes, err = GetVolumeList(context.TODO(), nil, nil, nil, nil)
+ volumes, err = GetVolumeList(context.TODO(), nil, nil, nil, nil, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
diff --git a/pkg/volume/stage.go b/pkg/volume/stage.go
new file mode 100644
index 000000000..d43125c31
--- /dev/null
+++ b/pkg/volume/stage.go
@@ -0,0 +1,72 @@
+// This file is part of MinIO DirectPV
+// Copyright (c) 2021, 2022 MinIO, Inc.
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package volume
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "os"
+
+ "github.com/minio/directpv/pkg/client"
+ "github.com/minio/directpv/pkg/device"
+ "github.com/minio/directpv/pkg/types"
+ "github.com/minio/directpv/pkg/xfs"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/klog/v2"
+)
+
+func Stage(ctx context.Context, volume *types.Volume) error {
+ if err := os.Mkdir(volume.Status.DataPath, 0o755); err != nil && !errors.Is(err, os.ErrExist) {
+ // FIXME: handle I/O error and mark associated drive's status as ERROR.
+ klog.ErrorS(err, "unable to create data path", "dataPath", volume.Status.DataPath)
+ return err
+ }
+
+ if err := xfs.BindMount(volume.Status.DataPath, volume.Status.StagingTargetPath, false); err != nil {
+ return fmt.Errorf("unable to bind mount volume directory to staging target path; %v", err)
+ }
+
+ quota := xfs.Quota{
+ HardLimit: uint64(volume.Status.TotalCapacity),
+ SoftLimit: uint64(volume.Status.TotalCapacity),
+ }
+
+ device, err := device.GetDeviceByFSUUID(volume.Status.FSUUID)
+ if err != nil {
+ klog.ErrorS(
+ err,
+ "unable to find device by FSUUID; "+
+ "either device is removed or run command "+
+ "`sudo udevadm control --reload-rules && sudo udevadm trigger`"+
+ "on the host to reload",
+ "FSUUID", volume.Status.FSUUID)
+ client.Eventf(
+ volume, corev1.EventTypeWarning, "NodeStageVolume",
+ "unable to find device by FSUUID %v; "+
+ "either device is removed or run command "+
+ "`sudo udevadm control --reload-rules && sudo udevadm trigger`"+
+ " on the host to reload", volume.Status.FSUUID)
+ return fmt.Errorf("unable to find device by FSUUID %v; %v", volume.Status.FSUUID, err)
+ }
+
+ if err := xfs.SetQuota(ctx, device, volume.Status.StagingTargetPath, volume.Name, quota); err != nil {
+ klog.ErrorS(err, "unable to set quota on staging target path", "StagingTargetPath", volume.Status.StagingTargetPath)
+ return fmt.Errorf("unable to set quota on staging target path; %v", err)
+ }
+ return nil
+}