Skip to content

Commit

Permalink
Add drive replacement feature (#654)
Browse files Browse the repository at this point in the history
  • Loading branch information
Praveenrajmani committed Oct 10, 2022
1 parent 5a55af0 commit 4e9cd7c
Show file tree
Hide file tree
Showing 27 changed files with 689 additions and 103 deletions.
3 changes: 3 additions & 0 deletions cmd/kubectl-directpv/drives.go
Expand Up @@ -44,6 +44,9 @@ func init() {
drivesCmd.AddCommand(formatDrivesCmd)
drivesCmd.AddCommand(accessTierCmd)
drivesCmd.AddCommand(releaseDrivesCmd)
drivesCmd.AddCommand(cordonDrivesCmd)
drivesCmd.AddCommand(uncordonDrivesCmd)
drivesCmd.AddCommand(moveDriveCmd)
}

func validateDriveSelectors() (err error) {
Expand Down
116 changes: 116 additions & 0 deletions cmd/kubectl-directpv/drives_cordon.go
@@ -0,0 +1,116 @@
// This file is part of MinIO DirectPV
// Copyright (c) 2021, 2022 MinIO, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package main

import (
"context"
"fmt"
"strings"

directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/client"
"github.com/minio/directpv/pkg/consts"
"github.com/minio/directpv/pkg/drive"
"github.com/minio/directpv/pkg/k8s"
"github.com/minio/directpv/pkg/types"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
)

var cordonDrivesCmd = &cobra.Command{
Use: "cordon",
Short: "Cordon drive(s) to prevent any new volumes to be scheduleed",
Example: strings.ReplaceAll(
`# Cordon all the drives from all the nodes
$ kubectl {PLUGIN_NAME} drives cordon --all
# Cordon all the drives from a particular node
$ kubectl {PLUGIN_NAME} drives cordon --node=node1
# Cordon specific drives from specified nodes
$ kubectl {PLUGIN_NAME} drives cordon --node=node1,node2 --drive=/dev/nvme0n1
# Cordon specific drives from all the nodes filtered by drive ellipsis
$ kubectl {PLUGIN_NAME} drives cordon --drive=/dev/sd{a...b}
# Cordon all the drives from specific nodes filtered by node ellipsis
$ kubectl {PLUGIN_NAME} drives cordon --node=node{0...3}
# Cordon specific drives from specific nodes filtered by the combination of node and drive ellipsis
$ kubectl {PLUGIN_NAME} drives cordon --drive /dev/xvd{a...d} --node node{1...4}`,
`{PLUGIN_NAME}`,
consts.AppName,
),
RunE: func(c *cobra.Command, _ []string) error {
if !allFlag {
if len(driveArgs) == 0 && len(nodeArgs) == 0 && len(accessTierArgs) == 0 {
return fmt.Errorf("atleast one of '%s', '%s', '%s' or '%s' must be specified",
bold("--all"),
bold("--drive"),
bold("--node"),
bold("--access-tier"),
)
}
}
if err := validateDriveSelectors(); err != nil {
return err
}
return cordonDrives(c.Context())
},
}

func init() {
cordonDrivesCmd.PersistentFlags().StringSliceVarP(&driveArgs, "drive", "d", driveArgs, "Filter drives to be cordoned by drive path (supports ellipses pattern)")
cordonDrivesCmd.PersistentFlags().StringSliceVarP(&nodeArgs, "node", "n", nodeArgs, "Filter drives to be cordoned by nodes (supports ellipses pattern)")
cordonDrivesCmd.PersistentFlags().StringSliceVarP(&accessTierArgs, "access-tier", "", accessTierArgs, fmt.Sprintf("Filter drives to be cordoned by access tier set on the drive [%s]", strings.Join(directpvtypes.SupportedAccessTierValues(), ", ")))
cordonDrivesCmd.PersistentFlags().BoolVarP(&allFlag, "all", "a", allFlag, "Cordon all the drives from all the nodes")
}

func cordonDrives(ctx context.Context) error {
ctx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
resultCh, err := drive.ListDrives(ctx, nodeSelectors, driveSelectors, accessTierSelectors, k8s.MaxThreadCount)
if err != nil {
return err
}
return drive.ProcessDrives(
ctx,
resultCh,
func(drive *types.Drive) bool {
if drive.Status.Status == directpvtypes.DriveStatusCordoned {
klog.Errorf("%s is already cordoned", bold(drive.Status.Path))
return false
}
if drive.Status.Status != directpvtypes.DriveStatusOK {
klog.Errorf("%s is in %s state. only %s drives can be cordoned", bold(drive.Status.Path), bold(drive.Status.Status), bold(directpvtypes.DriveStatusOK))
return false
}
return true
},
func(drive *types.Drive) error {
drive.Status.Status = directpvtypes.DriveStatusCordoned
return nil
},
func(ctx context.Context, drive *types.Drive) error {
_, err := client.DriveClient().Update(ctx, drive, metav1.UpdateOptions{})
return err
},
nil,
dryRun,
)
}
18 changes: 16 additions & 2 deletions cmd/kubectl-directpv/drives_list.go
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"strings"

"github.com/fatih/color"
"github.com/jedib0t/go-pretty/v6/table"
"github.com/jedib0t/go-pretty/v6/text"
directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
Expand Down Expand Up @@ -128,6 +129,7 @@ func listDrives(ctx context.Context, args []string) error {
if wideOutput {
headers = append(headers, "MODEL")
headers = append(headers, "VENDOR")
headers = append(headers, "DRIVE NAME")
}

text.DisableColors()
Expand All @@ -145,7 +147,7 @@ func listDrives(ctx context.Context, args []string) error {

for _, drive := range drives {
volumes := "-"
if len(drive.Finalizers) > 1 {
if len(drive.Finalizers) > 1 && (drive.Status.Status == directpvtypes.DriveStatusOK || drive.Status.Status == directpvtypes.DriveStatusCordoned) {
volumes = fmt.Sprintf("%v", len(drive.Finalizers)-1)
}
row := []interface{}{
Expand All @@ -159,7 +161,18 @@ func listDrives(ctx context.Context, args []string) error {
return "-"
}(),
volumes,
drive.Status.Status,
func() string {
switch drive.Status.Status {
case directpvtypes.DriveStatusOK:
return color.HiGreenString(string(drive.Status.Status))
case directpvtypes.DriveStatusError, directpvtypes.DriveStatusMoved:
return color.HiRedString(string(drive.Status.Status))
case directpvtypes.DriveStatusCordoned:
return color.HiYellowString(string(drive.Status.Status))
default:
return color.HiWhiteString(string(drive.Status.Status))
}
}(),
drive.Status.NodeName,
func() string {
if drive.Status.AccessTier == directpvtypes.AccessTierUnknown {
Expand All @@ -171,6 +184,7 @@ func listDrives(ctx context.Context, args []string) error {
if wideOutput {
row = append(row, drive.Status.ModelNumber)
row = append(row, drive.Status.Vendor)
row = append(row, drive.Name)
}
writer.AppendRow(row)
}
Expand Down
150 changes: 150 additions & 0 deletions cmd/kubectl-directpv/drives_move.go
@@ -0,0 +1,150 @@
// This file is part of MinIO DirectPV
// Copyright (c) 2021, 2022 MinIO, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package main

import (
"context"
"errors"
"fmt"
"strings"

directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/client"
"github.com/minio/directpv/pkg/consts"
"github.com/minio/directpv/pkg/types"
"github.com/minio/directpv/pkg/utils"
"github.com/minio/directpv/pkg/volume"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
)

var (
fromDrive string
toDrive string

errFromFlagNotSet = errors.New("'--from' flag is not set")
errToFlagNotSet = errors.New("'--to' flag is not set")
errInvalidState = errors.New("invalid state")
errCapacityNotAvailable = errors.New("the drive do not have enough capacity to accomodate")
)

var moveDriveCmd = &cobra.Command{
Use: "move",
Short: "Move a drive to another drive within the same node (without data)",
Example: strings.ReplaceAll(
`# Move a drive to another drive
$ kubectl {PLUGIN_NAME} drives move --from <drive_id> --to <drive_id>`,
`{PLUGIN_NAME}`,
consts.AppName,
),
RunE: func(c *cobra.Command, _ []string) error {
if fromDrive == "" {
return errFromFlagNotSet
}
if toDrive == "" {
return errToFlagNotSet
}
return moveDrive(c.Context())
},
}

func init() {
moveDriveCmd.PersistentFlags().StringVarP(&fromDrive, "from", "", fromDrive, fmt.Sprintf("the name of the source %s drive that needs to be moved", consts.AppPrettyName))
moveDriveCmd.PersistentFlags().StringVarP(&toDrive, "to", "", toDrive, fmt.Sprintf("the name of the target %s drive to which the source has to be moved", consts.AppPrettyName))
}

func moveDrive(ctx context.Context) error {
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
driveClient := client.DriveClient()
sourceDrive, err := driveClient.Get(ctx, strings.TrimSpace(fromDrive), metav1.GetOptions{})
if err != nil {
return err
}
targetDrive, err := driveClient.Get(ctx, strings.TrimSpace(toDrive), metav1.GetOptions{})
if err != nil {
return err
}
if err := validateMoveRequest(sourceDrive, targetDrive); err != nil {
return err
}
if err := move(ctx, sourceDrive, targetDrive); err != nil {
return err
}
targetDrive.Status.Status = directpvtypes.DriveStatusMoving
_, err = driveClient.Update(
ctx, targetDrive, metav1.UpdateOptions{TypeMeta: types.NewDriveTypeMeta()},
)
return err
}); err != nil {
return err
}
return nil
}

func validateMoveRequest(sourceDrive, targetDrive *types.Drive) error {
if !(sourceDrive.IsCordoned() || sourceDrive.IsLost()) || !targetDrive.IsCordoned() {
klog.Error("please make sure both the source and target drives are cordoned")
return errInvalidState
}
if sourceDrive.Status.NodeName != targetDrive.Status.NodeName {
klog.Error("both the source and target drives should be from same node")
return errInvalidState
}
if sourceDrive.Status.AccessTier != targetDrive.Status.AccessTier {
klog.Error("the source and target drives does not belong to the same access-tier")
return errInvalidState
}
return nil
}

func move(ctx context.Context, sourceDrive, targetDrive *types.Drive) error {
selector, err := getDriveNameSelectors([]string{sourceDrive.Name})
if err != nil {
return err
}
volumes, err := volume.GetVolumeList(ctx, nil, nil, nil, nil, selector)
if err != nil {
return err
}
for _, volume := range volumes {
if volume.Status.DriveName != sourceDrive.Name {
klog.Infof("invalid drive name %s found in volume %s", volume.Status.DriveName, volume.Name)
return errInvalidState
}
if volume.Status.NodeName != sourceDrive.Status.NodeName {
klog.Infof("invalid node name %s found in volume %s", volume.Status.NodeName, volume.Name)
return errInvalidState
}
if volume.IsPublished() {
klog.Info("please make sure all the volumes in the source drive are not inuse")
return fmt.Errorf("volume %s is still published", volume.Name)
}
if targetDrive.Status.FreeCapacity < volume.Status.TotalCapacity {
klog.Info("the target drive cannot accomodate the volumes from the source")
return errCapacityNotAvailable
}
finalizer := consts.DriveFinalizerPrefix + volume.Name
if !utils.ItemIn(targetDrive.Finalizers, finalizer) {
targetDrive.Status.FreeCapacity -= volume.Status.TotalCapacity
targetDrive.Status.AllocatedCapacity += volume.Status.TotalCapacity
targetDrive.SetFinalizers(append(targetDrive.GetFinalizers(), finalizer))
}
}
return nil
}
6 changes: 5 additions & 1 deletion cmd/kubectl-directpv/drives_release.go
Expand Up @@ -96,13 +96,17 @@ func releaseDrives(ctx context.Context) error {
klog.Errorf("%s already in 'released' state", bold(drive.Status.Path))
return false
}
if len(drive.Finalizers) > 1 {
if drive.Status.Status != directpvtypes.DriveStatusMoved && len(drive.Finalizers) > 1 {
klog.Errorf("%s has volumes. please purge them before releasing", bold(drive.Status.Path))
return false
}
return true
},
func(drive *types.Drive) error {
if drive.Status.Status == directpvtypes.DriveStatusMoved {
// Moved drives won't have any volumes allocated, simply empty them and proceed.
drive.SetFinalizers([]string{consts.DriveFinalizerDataProtection})
}
drive.Status.Status = directpvtypes.DriveStatusReleased
return nil
},
Expand Down

0 comments on commit 4e9cd7c

Please sign in to comment.