diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 5759a6891..3e9820039 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -20,7 +20,7 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-go@v3 with: - go-version: 1.19.3 + go-version: 1.19.4 check-latest: true - uses: docker/setup-qemu-action@v2 - uses: docker/setup-buildx-action@v2 diff --git a/.github/workflows/functests.yml b/.github/workflows/functests.yml index 81cdf588d..877f37634 100644 --- a/.github/workflows/functests.yml +++ b/.github/workflows/functests.yml @@ -20,7 +20,7 @@ jobs: timeout-minutes: 60 strategy: matrix: - kube-version: ['v1.18.20', 'v1.19.16', 'v1.20.15', 'v1.21.14', 'v1.22.15', 'v1.23.13', 'v1.24.7', 'v1.25.3'] + kube-version: ['v1.18.20', 'v1.19.16', 'v1.20.15', 'v1.21.14', 'v1.22.16', 'v1.23.14', 'v1.24.8', 'v1.25.4'] os: [ubuntu-18.04, ubuntu-20.04, ubuntu-22.04] exclude: - os: ubuntu-22.04 @@ -30,7 +30,7 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-go@v3 with: - go-version: 1.19.3 + go-version: 1.19.4 check-latest: true - name: Install dependencies @@ -57,9 +57,9 @@ jobs: docker build -t quay.io/minio/directpv:${VERSION} . - name: Setup Minikube - uses: manusa/actions-setup-minikube@v2.7.0 + uses: manusa/actions-setup-minikube@v2.7.2 with: - minikube version: 'v1.27.1' + minikube version: 'v1.28.0' kubernetes version: ${{ matrix.kube-version }} github token: ${{ secrets.GITHUB_TOKEN }} @@ -73,3 +73,12 @@ jobs: - name: Run tests run: | functests/run-tests.sh + + - name: Run migration tests with DirectCSI v3.2.1 + run: | + functests/run-migration-tests.sh "v3.2.1" + + - name: Run migration tests with DirectCSI v2.0.9 + if: contains(fromJson('["v1.18.20", "v1.19.16", "v1.20.15", "v1.21.14", "v1.22.16"]'), matrix.kube-version) + run: | + functests/run-migration-tests.sh "v2.0.9" diff --git a/.github/workflows/linters.yml b/.github/workflows/linters.yml index be1571d87..07f083a59 100644 --- a/.github/workflows/linters.yml +++ b/.github/workflows/linters.yml @@ -20,7 +20,7 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-go@v3 with: - go-version: 1.19.3 + go-version: 1.19.4 check-latest: true - uses: ludeeus/action-shellcheck@master env: diff --git a/.github/workflows/vulncheck.yml b/.github/workflows/vulncheck.yml index 60de621fc..09e2d556f 100644 --- a/.github/workflows/vulncheck.yml +++ b/.github/workflows/vulncheck.yml @@ -23,7 +23,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v3 with: - go-version: 1.19.3 + go-version: 1.19.4 check-latest: true - name: Install govulncheck run: go install golang.org/x/vuln/cmd/govulncheck@latest diff --git a/cmd/directpv/legacy-controller.go b/cmd/directpv/legacy-controller.go new file mode 100644 index 000000000..f9508564f --- /dev/null +++ b/cmd/directpv/legacy-controller.go @@ -0,0 +1,71 @@ +// This file is part of MinIO DirectPV +// Copyright (c) 2021, 2022 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package main + +import ( + "context" + + "github.com/minio/directpv/pkg/consts" + "github.com/minio/directpv/pkg/controller" + pkgidentity "github.com/minio/directpv/pkg/identity" + legacyclient "github.com/minio/directpv/pkg/legacy/client" + "github.com/spf13/cobra" + "k8s.io/klog/v2" +) + +var legacyControllerCmd = &cobra.Command{ + Use: consts.LegacyControllerServerName, + Short: "Start legacy controller server.", + SilenceUsage: true, + SilenceErrors: true, + RunE: func(c *cobra.Command, args []string) error { + return startLegacyController(c.Context()) + }, +} + +func startLegacyController(ctx context.Context) error { + var cancel context.CancelFunc + ctx, cancel = context.WithCancel(ctx) + defer cancel() + + idServer, err := pkgidentity.NewServer(legacyclient.Identity, Version, map[string]string{}) + if err != nil { + return err + } + klog.V(3).Infof("Legacy identity server started") + + ctrlServer := controller.NewLegacyServer() + klog.V(3).Infof("Legacy controller server started") + + errCh := make(chan error) + + go func() { + if err := runServers(ctx, csiEndpoint, idServer, ctrlServer, nil); err != nil { + klog.ErrorS(err, "unable to start GRPC servers") + errCh <- err + } + }() + + go func() { + if err := serveReadinessEndpoint(ctx); err != nil { + klog.ErrorS(err, "unable to start readiness endpoint") + errCh <- err + } + }() + + return <-errCh +} diff --git a/cmd/directpv/legacy-node-server.go b/cmd/directpv/legacy-node-server.go new file mode 100644 index 000000000..e84c61be5 --- /dev/null +++ b/cmd/directpv/legacy-node-server.go @@ -0,0 +1,71 @@ +// This file is part of MinIO DirectPV +// Copyright (c) 2022 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package main + +import ( + "context" + + "github.com/minio/directpv/pkg/consts" + pkgidentity "github.com/minio/directpv/pkg/identity" + legacyclient "github.com/minio/directpv/pkg/legacy/client" + "github.com/minio/directpv/pkg/node" + "github.com/spf13/cobra" + "k8s.io/klog/v2" +) + +var legacyNodeServerCmd = &cobra.Command{ + Use: consts.LegacyNodeServerName, + Short: "Start legacy node server.", + SilenceUsage: true, + SilenceErrors: true, + RunE: func(c *cobra.Command, args []string) error { + return startLegacyNodeServer(c.Context()) + }, +} + +func startLegacyNodeServer(ctx context.Context) error { + var cancel context.CancelFunc + ctx, cancel = context.WithCancel(ctx) + defer cancel() + + idServer, err := pkgidentity.NewServer(legacyclient.Identity, Version, map[string]string{}) + if err != nil { + return err + } + klog.V(3).Infof("Legacy identity server started") + + errCh := make(chan error) + + nodeServer := node.NewLegacyServer(nodeID, rack, zone, region) + klog.V(3).Infof("Legacy node server started") + + go func() { + if err := runServers(ctx, csiEndpoint, idServer, nil, nodeServer); err != nil { + klog.ErrorS(err, "unable to start GRPC servers") + errCh <- err + } + }() + + go func() { + if err := serveReadinessEndpoint(ctx); err != nil { + klog.ErrorS(err, "unable to start readiness endpoint") + errCh <- err + } + }() + + return <-errCh +} diff --git a/cmd/directpv/main.go b/cmd/directpv/main.go index 96bc4a5ca..f27c709a7 100644 --- a/cmd/directpv/main.go +++ b/cmd/directpv/main.go @@ -127,6 +127,8 @@ func init() { mainCmd.AddCommand(nodeServerCmd) mainCmd.AddCommand(adminServerCmd) mainCmd.AddCommand(nodeAPIServerCmd) + mainCmd.AddCommand(legacyControllerCmd) + mainCmd.AddCommand(legacyNodeServerCmd) } func main() { diff --git a/cmd/directpv/node-server.go b/cmd/directpv/node-server.go index 954acd24a..0cbbda573 100644 --- a/cmd/directpv/node-server.go +++ b/cmd/directpv/node-server.go @@ -21,7 +21,6 @@ import ( "errors" "os" - "github.com/container-storage-interface/spec/lib/go/csi" "github.com/minio/directpv/pkg/consts" "github.com/minio/directpv/pkg/device" "github.com/minio/directpv/pkg/drive" @@ -44,7 +43,7 @@ var nodeServerCmd = &cobra.Command{ if err := device.Sync(c.Context(), nodeID); err != nil { return err } - return startNodeServer(c.Context(), args) + return startNodeServer(c.Context()) }, } @@ -52,7 +51,7 @@ func init() { nodeServerCmd.PersistentFlags().IntVar(&metricsPort, "metrics-port", metricsPort, "Metrics port at "+consts.AppPrettyName+" exports metrics data") } -func startNodeServer(ctx context.Context, args []string) error { +func startNodeServer(ctx context.Context) error { var cancel context.CancelFunc ctx, cancel = context.WithCancel(ctx) defer cancel() @@ -79,8 +78,7 @@ func startNodeServer(ctx context.Context, args []string) error { } }() - var nodeServer csi.NodeServer - nodeServer, err = node.NewServer( + nodeServer := node.NewServer( ctx, identity, nodeID, @@ -89,9 +87,6 @@ func startNodeServer(ctx context.Context, args []string) error { region, metricsPort, ) - if err != nil { - return err - } klog.V(3).Infof("Node server started") if err = sys.Mkdir(consts.MountRootDir, 0o755); err != nil && !errors.Is(err, os.ErrExist) { diff --git a/cmd/kubectl-directpv/install.go b/cmd/kubectl-directpv/install.go index cd3e1cee3..d3859f424 100644 --- a/cmd/kubectl-directpv/install.go +++ b/cmd/kubectl-directpv/install.go @@ -27,9 +27,12 @@ import ( "github.com/fatih/color" "github.com/minio/directpv/pkg/admin" + directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types" "github.com/minio/directpv/pkg/consts" "github.com/minio/directpv/pkg/installer" + legacyclient "github.com/minio/directpv/pkg/legacy/client" "github.com/minio/directpv/pkg/utils" + "github.com/minio/directpv/pkg/volume" "github.com/spf13/cobra" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/version" @@ -49,6 +52,7 @@ var ( disableAdminService bool k8sVersion string kubeVersion *version.Version + legacyFlag bool ) var installCmd = &cobra.Command{ @@ -88,6 +92,7 @@ func init() { installCmd.PersistentFlags().StringVar(&configDir, "config-dir", configDir, "Path to configuration directory") installCmd.PersistentFlags().BoolVar(&disableAdminService, "disable-admin-service", disableAdminService, "Skip installing a node-port service for admin server") installCmd.PersistentFlags().StringVar(&k8sVersion, "kube-version", k8sVersion, "If present, use this as kubernetes version for dry-run") + installCmd.PersistentFlags().BoolVar(&legacyFlag, "legacy", legacyFlag, "If present, include legacy services for dry-run") addDryRunFlag(installCmd) } @@ -202,7 +207,48 @@ func getCredential() (*admin.Credential, bool, error) { return &cred, true, nil } +func getLegacyFlag(ctx context.Context) bool { + if dryRunFlag { + return legacyFlag + } + + ctx, cancelFunc := context.WithCancel(ctx) + defer cancelFunc() + + resultCh := volume.NewLister(). + LabelSelector( + map[directpvtypes.LabelKey]directpvtypes.LabelValue{ + directpvtypes.MigratedLabelKey: "true", + }, + ). + IgnoreNotFound(true). + List(ctx) + for result := range resultCh { + if result.Err != nil { + utils.Eprintf(quietFlag, true, "unable to get volumes; %v", result.Err) + break + } else { + return true + } + } + + legacyclient.Init() + + for result := range legacyclient.ListVolumes(ctx) { + if result.Err != nil { + utils.Eprintf(quietFlag, true, "unable to get legacy volumes; %v", result.Err) + break + } else { + return true + } + } + + return false +} + func installMain(ctx context.Context) { + legacyFlag = getLegacyFlag(ctx) + auditFile := fmt.Sprintf("install.log.%v", time.Now().UTC().Format(time.RFC3339Nano)) file, err := openAuditFile(auditFile) if err != nil { @@ -241,6 +287,7 @@ func installMain(ctx context.Context) { args.Quiet = quietFlag args.DisableAdminService = disableAdminService args.KubeVersion = kubeVersion + args.Legacy = legacyFlag if err = installer.Install(ctx, args); err != nil { utils.Eprintf(quietFlag, true, "%v\n", err) diff --git a/cmd/kubectl-directpv/migrate.go b/cmd/kubectl-directpv/migrate.go index 714d02128..956c1a192 100644 --- a/cmd/kubectl-directpv/migrate.go +++ b/cmd/kubectl-directpv/migrate.go @@ -1,5 +1,5 @@ // This file is part of MinIO DirectPV -// Copyright (c) 2021, 2022 MinIO, Inc. +// Copyright (c) 2022 MinIO, Inc. // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by @@ -20,363 +20,39 @@ import ( "context" "fmt" "os" - "regexp" - "strconv" "strings" - "github.com/minio/directpv/pkg/apis/directpv.min.io/types" - directpv "github.com/minio/directpv/pkg/apis/directpv.min.io/v1beta1" "github.com/minio/directpv/pkg/consts" - "github.com/minio/directpv/pkg/k8s" - directv1alpha1 "github.com/minio/directpv/pkg/legacy/apis/direct.csi.min.io/v1alpha1" - directv1beta1 "github.com/minio/directpv/pkg/legacy/apis/direct.csi.min.io/v1beta1" - directv1beta2 "github.com/minio/directpv/pkg/legacy/apis/direct.csi.min.io/v1beta2" - directv1beta3 "github.com/minio/directpv/pkg/legacy/apis/direct.csi.min.io/v1beta3" - directv1beta4 "github.com/minio/directpv/pkg/legacy/apis/direct.csi.min.io/v1beta4" - directcsi "github.com/minio/directpv/pkg/legacy/apis/direct.csi.min.io/v1beta5" - directcsiclient "github.com/minio/directpv/pkg/legacy/client" - typeddirectcsi "github.com/minio/directpv/pkg/legacy/clientset/typed/direct.csi.min.io/v1beta5" + "github.com/minio/directpv/pkg/installer" "github.com/minio/directpv/pkg/utils" "github.com/spf13/cobra" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - runtime "k8s.io/apimachinery/pkg/runtime" - "k8s.io/cli-runtime/pkg/printers" - "k8s.io/klog/v2" ) var migrateCmd = &cobra.Command{ Use: "migrate", - Short: "Migrate CSRs", + Short: "Migrate drives and volumes from legacy DirectCSI", Example: strings.ReplaceAll( - `# Migrate all CSRs + `# Migrate drives and volumes from legacy DirectCSI $ kubectl {PLUGIN_NAME} migrate`, `{PLUGIN_NAME}`, consts.AppName, ), Run: func(c *cobra.Command, args []string) { - driveIDArgs = args - - if err := validateMigrateCmd(); err != nil { - utils.Eprintf(quietFlag, true, "%v\n", err) - os.Exit(-1) - } - migrateMain(c.Context()) }, } -const ( - // DriveTypeMetaKind is for the kind - DriveTypeMetaKind = "DirectPVDrive" - // VolumeTypeMetaKind is for the volume kind - VolumeTypeMetaKind = "DirectPVVolume" - // APIVersion is for the version - APIVersion = "directpv.min.io/v1beta1" - // DirectCSIGroup is for the group - DirectCSIGroup = "direct.csi.min.io" -) - -var ( - directCSIDriveClient typeddirectcsi.DirectCSIDriveInterface - directCSIVolumeClient typeddirectcsi.DirectCSIVolumeInterface - uuidRegex = regexp.MustCompile("^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$") -) - -// Preparing a map[driveName]fsuuid -var driveFsuuid = make(map[string]string) - -// ListDriveResult is for listing drive results -type ListDriveResult struct { - Drive directcsi.DirectCSIDrive - Err error -} - -// ListVolumeResult is for list volumes results -type ListVolumeResult struct { - Volume directcsi.DirectCSIVolume - Err error -} - -// ListDrives is for listing drives -func ListDrives(ctx context.Context) <-chan ListDriveResult { - resultCh := make(chan ListDriveResult) - go func() { - defer close(resultCh) - - send := func(result ListDriveResult) bool { - select { - case <-ctx.Done(): - return false - case resultCh <- result: - return true - } - } - - options := metav1.ListOptions{Limit: 1000} - for { - result, err := directCSIDriveClient.List(ctx, options) - if err != nil { - send(ListDriveResult{Err: err}) - return - } - - for _, item := range result.Items { - switch item.Status.DriveStatus { - case directcsi.DriveStatusReady, directcsi.DriveStatusInUse: - if !send(ListDriveResult{Drive: item}) { - return - } - } - } - - if result.Continue == "" { - return - } - - options.Continue = result.Continue - } - }() - - return resultCh -} - -// ListVolumes is for listing volumes -func ListVolumes(ctx context.Context) <-chan ListVolumeResult { - resultCh := make(chan ListVolumeResult) - go func() { - defer close(resultCh) - - send := func(result ListVolumeResult) bool { - select { - case <-ctx.Done(): - return false - case resultCh <- result: - return true - } - } - - options := metav1.ListOptions{Limit: 1000} - for { - result, err := directCSIVolumeClient.List(ctx, options) - if err != nil { - send(ListVolumeResult{Err: err}) - return - } - - for _, item := range result.Items { - if !send(ListVolumeResult{Volume: item}) { - return - } - } - - if result.Continue == "" { - return - } - - options.Continue = result.Continue - } - }() - - return resultCh -} - -func validateMigrateCmd() error { - return nil -} - -func migrateVolumeCRD(oldVolumeList []directcsi.DirectCSIVolume) (newVolumeList []directpv.DirectPVVolume, theerror error) { - newVolumes := []directpv.DirectPVVolume{} - for _, OldVolume := range oldVolumeList { - // FIXME: use types.NewDrive() instead of creating structure. - NewVolume := directpv.DirectPVVolume{} - - // TypeMeta fields - NewVolume.TypeMeta.Kind = VolumeTypeMetaKind - NewVolume.TypeMeta.APIVersion = APIVersion - - // ObjectMeta fields - NewVolume.ObjectMeta = OldVolume.ObjectMeta - - // Status fields - NewVolume.Status.DataPath = OldVolume.Status.HostPath - NewVolume.Status.StagingTargetPath = OldVolume.Status.StagingPath - NewVolume.Status.TargetPath = OldVolume.Status.ContainerPath - NewVolume.Status.FSUUID = driveFsuuid[OldVolume.Status.Drive] - NewVolume.Status.TotalCapacity = OldVolume.Status.TotalCapacity - NewVolume.Status.AvailableCapacity = OldVolume.Status.AvailableCapacity - NewVolume.Status.UsedCapacity = OldVolume.Status.UsedCapacity - - // Append the new drive to the list - newVolumes = append(newVolumes, NewVolume) - } - return newVolumes, nil -} - -func migrateDriveCRD(oldDriveList []directcsi.DirectCSIDrive) (newDriveList []directpv.DirectPVDrive, therror error) { - // Set of unique FSUUIDs: - setOfUniqueFSUUIDs := make(map[string]struct{}) - - newDrives := []directpv.DirectPVDrive{} - for _, OldDrive := range oldDriveList { - - // we can filter out the drives other than Ready and InUse - if OldDrive.Status.DriveStatus != directcsi.DriveStatusReady && OldDrive.Status.DriveStatus != directcsi.DriveStatusInUse { - continue // skip this iteration to filter out this state. - } - - // FIXME: use types.NewVolume() instead of creating structure. - NewDrive := directpv.DirectPVDrive{} - - // TypeMeta fields - NewDrive.TypeMeta.Kind = DriveTypeMetaKind - NewDrive.TypeMeta.APIVersion = APIVersion - - // ObjectMeta fields - NewDrive.ObjectMeta = OldDrive.ObjectMeta - - // Status fields - // Cast from old type to new type - NewDrive.Status.Status = types.DriveStatusReady - NewDrive.Status.TotalCapacity = OldDrive.Status.TotalCapacity - NewDrive.Status.AllocatedCapacity = OldDrive.Status.AllocatedCapacity - NewDrive.Status.FreeCapacity = OldDrive.Status.FreeCapacity - NewDrive.Status.FSUUID = OldDrive.Status.FilesystemUUID - NewDrive.Status.Topology = OldDrive.Status.Topology - - // The name of the new drive should be its FSUUID and it should not be empty - if NewDrive.ObjectMeta.Name == NewDrive.Status.FSUUID && len(NewDrive.Status.FSUUID) > 0 { - fmt.Println("Validate that the name of the new drive is its FSUUID") - // We need to also add validation to check if this is unique among the entire drive list set for conversion. - // https://stackoverflow.com/questions/9251234/how-to-check-the-uniqueness-inside-a-for-loop - if _, ok := setOfUniqueFSUUIDs[NewDrive.Status.FSUUID]; ok { - fmt.Println("FSUUID found already, we can't convert this drive unless is unique: ", NewDrive.Status.FSUUID) - } else { - fmt.Println("element not found, FSUUID is unique, proceed to be added/appended for conversion") - setOfUniqueFSUUIDs[NewDrive.Status.FSUUID] = struct{}{} - // Append the new drive to the list - newDrives = append(newDrives, NewDrive) - driveFsuuid[NewDrive.ObjectMeta.Name] = NewDrive.Status.FSUUID - } - } - } - - // Return the new list of drives - return newDrives, nil +func init() { + addDryRunFlag(migrateCmd) } func migrateMain(ctx context.Context) { - k8s.Init() - - var err error - if directCSIDriveClient, err = directcsiclient.DirectCSIDriveInterfaceForConfig(k8s.KubeConfig()); err != nil { - klog.Fatalf("unable to create new direct-csi drive interface; %v", err) - } - - if directCSIVolumeClient, err = directcsiclient.DirectCSIVolumeInterfaceForConfig(k8s.KubeConfig()); err != nil { - klog.Fatalf("unable to create new direct-csi volume interface; %v", err) - } - - gvk, err := directcsiclient.GetGroupKindVersions( - DirectCSIGroup, - "DirectCSIDrive", - "v1beta5", - directv1beta4.Version, - directv1beta3.Version, - directv1beta2.Version, - directv1beta1.Version, - directv1alpha1.Version, - ) - if err != nil { - fmt.Println("migrateMain: unable to get GetGroupKindVersions of direct-csi: ", err) - return + if err := installer.Migrate(ctx, dryRunFlag); err != nil { + utils.Eprintf(quietFlag, true, "migration failed; %v", err) + os.Exit(1) } - if gvk.Group != DirectCSIGroup { - klog.Fatalf("migration does not support direct-csi group %v", gvk.Group) - } - - switch gvk.Version { - case "v1beta3", "v1beta4", "v1beta5": - default: - klog.Fatalf("migration does not support direct-csi version %v", gvk.Version) - } - - // Then we list the drives - // This is getting all the info from all the drives - ctx, cancelCtx := context.WithCancel(context.Background()) - defer cancelCtx() - - filteredDrives := []directcsi.DirectCSIDrive{} - for result := range ListDrives(ctx) { - if result.Err != nil { - klog.Fatalf("unable to get direct-csi drives; %v", result.Err) - } - - // FIXME: ignore result.Drive if its source version is < v1beta3 - - if !uuidRegex.MatchString(result.Drive.Status.FilesystemUUID) { - klog.Fatalf("invalid FilesystemUUID %v in DirectCSIDrive %v", result.Drive.Status.FilesystemUUID, result.Drive.Name) - } - - for _, finalizer := range result.Drive.Finalizers { - if !strings.Contains(finalizer, DirectCSIGroup) { - klog.Fatalf("invalid finalizer value %v in DirectCSIDrive %v", finalizer, result.Drive.Name) - } - } - - filteredDrives = append(filteredDrives, result.Drive) - } - - // To convert from old CRDs to new CRDs - newDriveCRD, _ := migrateDriveCRD(filteredDrives) - // Output Drives as YAML - for index, drive := range newDriveCRD { - // Output as YAML - yamlName := "drive-" + strconv.Itoa(index+1) + ".yaml" - newFile, _ := os.Create(yamlName) - y := printers.YAMLPrinter{} - defer newFile.Close() - createDrive := func() *directpv.DirectPVDrive { - return &directpv.DirectPVDrive{ - TypeMeta: drive.TypeMeta, - ObjectMeta: drive.ObjectMeta, - Spec: drive.Spec, - Status: drive.Status, - } - } - runTimeObject := []runtime.Object{ - createDrive(), - } - y.PrintObj(runTimeObject[0], newFile) - } - - filteredVolumes := []directcsi.DirectCSIVolume{} - for result := range ListVolumes(ctx) { - if result.Err != nil { - klog.Fatalf("unable to get direct-csi volumes; %v", result.Err) - } - filteredVolumes = append(filteredVolumes, result.Volume) - } - - newVolumeCRD, _ := migrateVolumeCRD(filteredVolumes) - - // Output Volumes as YAML - for index, volume := range newVolumeCRD { - // Output as YAML - yamlName := "volume-" + strconv.Itoa(index+1) + ".yaml" - newFile, _ := os.Create(yamlName) - y := printers.YAMLPrinter{} - defer newFile.Close() - createVolume := func() *directpv.DirectPVVolume { - return &directpv.DirectPVVolume{ - TypeMeta: volume.TypeMeta, - ObjectMeta: volume.ObjectMeta, - Status: volume.Status, - } - } - runTimeObject := []runtime.Object{ - createVolume(), - } - y.PrintObj(runTimeObject[0], newFile) + if !quietFlag { + fmt.Println("Migration successful") } } diff --git a/cmd/kubectl-directpv/migrate_test.go b/cmd/kubectl-directpv/migrate_test.go deleted file mode 100644 index 97a1b8490..000000000 --- a/cmd/kubectl-directpv/migrate_test.go +++ /dev/null @@ -1,20 +0,0 @@ -package main - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - - directpv "github.com/minio/directpv/pkg/apis/directpv.min.io/v1beta1" - directcsi "github.com/minio/directpv/pkg/legacy/apis/direct.csi.min.io/v1beta5" -) - -func TestMigrate(t *testing.T) { - oldDriveList := []directcsi.DirectCSIDrive{} - newDriveList1 := []directpv.DirectPVDrive{} - newDriveList2, therror := migrateDriveCRD(oldDriveList) - fmt.Println("message: ", newDriveList2, therror) - // Checking the conversion is taking place from old to new resource definition - assert.Equal(t, newDriveList1, newDriveList2, "The two arrays should be same") -} diff --git a/codegen.sh b/codegen.sh index 5ab700ab5..460fb62f0 100755 --- a/codegen.sh +++ b/codegen.sh @@ -103,6 +103,7 @@ client-gen \ echo "Running controller-gen ..." controller-gen crd:crdVersions=v1 paths=./... output:dir=pkg/installer +rm -f pkg/installer/direct.csi.min.io_directcsidrives.yaml pkg/installer/direct.csi.min.io_directcsivolumes.yaml echo "Running conversion-gen ..." conversion-gen \ diff --git a/functests/common.sh b/functests/common.sh index acb464482..432d1dee6 100755 --- a/functests/common.sh +++ b/functests/common.sh @@ -78,10 +78,11 @@ function wait_for_service() { done } +# install_directpv function install_directpv() { "${DIRECTPV_CLIENT}" install --quiet - required_count=5 + required_count="$1" running_count=0 while [[ $running_count -lt $required_count ]]; do echo "$ME: waiting for $(( required_count - running_count )) DirectPV pods to come up" @@ -99,13 +100,14 @@ function install_directpv() { sleep 1m } +# uninstall_directpv function uninstall_directpv() { "${DIRECTPV_CLIENT}" uninstall --quiet - pending=5 + pending="$1" while [[ $pending -gt 0 ]]; do echo "$ME: waiting for ${pending} DirectPV pods to go down" - sleep ${pending} + sleep "${pending}" pending=$(kubectl get pods --field-selector=status.phase=Running --no-headers --namespace=directpv-min-io | wc -l) done @@ -161,8 +163,11 @@ function remove_drives() { "${DIRECTPV_CLIENT}" remove --all --quiet } +# usage: deploy_minio function deploy_minio() { - kubectl apply -f functests/minio.yaml + minio_yaml="$1" + + kubectl apply -f "${minio_yaml}" required_count=4 running_count=0 @@ -173,8 +178,11 @@ function deploy_minio() { done } -function uninstall_minio() { - kubectl delete -f functests/minio.yaml +# usage: delete_minio +function delete_minio() { + minio_yaml="$1" + + kubectl delete -f "${minio_yaml}" pending=4 retry_count=0 while [[ $pending -gt 0 ]]; do @@ -183,9 +191,16 @@ function uninstall_minio() { fi retry_count=$((retry_count + 1)) echo "$ME: waiting for ${pending} minio pods to go down" - sleep ${pending} + sleep "${pending}" pending=$(kubectl get pods --field-selector=status.phase=Running --no-headers | grep -c '^minio-' || true) done +} + +# usage: uninstall_minio +function uninstall_minio() { + minio_yaml="$1" + + delete_minio "${minio_yaml}" kubectl delete pvc --all --force sleep 3 @@ -205,3 +220,51 @@ function uninstall_minio() { # Show output for manual debugging. "${DIRECTPV_CLIENT}" list drives --all } + +# usage: install_directcsi +function install_directcsi() { + directcsi_client="$1" + required_count="$2" + + "${directcsi_client}" install + + running_count=0 + while [[ $running_count -lt $required_count ]]; do + echo "$ME: waiting for $(( required_count - running_count )) DirectCSI pods to come up" + sleep $(( required_count - running_count )) + running_count=$(kubectl get pods --field-selector=status.phase=Running --no-headers --namespace=direct-csi-min-io | wc -l) + done + + while ! "${directcsi_client}" info; do + echo "$ME: waiting for DirectCSI to come up" + sleep 5 + done + + sleep 1m +} + +# usage: install_directcsi +function uninstall_directcsi() { + directcsi_client="$1" + pending="$2" + + "${directcsi_client}" uninstall + + while [[ $pending -gt 0 ]]; do + echo "$ME: waiting for ${pending} direct-csi pods to go down" + sleep "${pending}" + pending=$(kubectl get pods --field-selector=status.phase=Running --no-headers --namespace=direct-csi-min-io | wc -l) + done + + while kubectl get namespace direct-csi-min-io --no-headers | grep -q .; do + echo "$ME: waiting for direct-csi-min-io namespace to be removed" + sleep 5 + done +} + +# usage: force_install_directcsi +function force_uninstall_directcsi() { + directcsi_client="$1" + "${directcsi_client}" uninstall --force --crd + sleep 5 +} diff --git a/functests/migration-tests.sh b/functests/migration-tests.sh new file mode 100644 index 000000000..48dae5095 --- /dev/null +++ b/functests/migration-tests.sh @@ -0,0 +1,72 @@ +#!/usr/bin/env bash +# +# This file is part of MinIO DirectPV +# Copyright (c) 2022 MinIO, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +if [ "$#" -ne 1 ]; then + echo "usage: migration-tests.sh " + exit 255 +fi + +LEGACY_VERSION="$1" +LEGACY_FILE="kubectl-direct_csi_${LEGACY_VERSION:1}_linux_amd64" + +set -ex + +source "${SCRIPT_DIR}/common.sh" + +sed -e s:directpv-min-io:direct-csi-min-io:g -e s:directpv.min.io:direct.csi.min.io:g functests/minio.yaml > functests/directcsi-minio.yaml + +# usage: migrate_test +function migrate_test() { + directcsi_client="$1" + pod_count="$2" + + setup_lvm + setup_luks + + install_directcsi "${directcsi_client}" "${pod_count}" + + "${directcsi_client}" drives format --all --force + + deploy_minio functests/directcsi-minio.yaml + + uninstall_directcsi "${directcsi_client}" "${pod_count}" + + export DIRECTPV_CLIENT=./kubectl-directpv + install_directpv 9 + + delete_minio functests/directcsi-minio.yaml + + deploy_minio functests/directcsi-minio.yaml + + uninstall_minio functests/directcsi-minio.yaml + + force_uninstall_directcsi "${directcsi_client}" + + remove_drives + uninstall_directpv 9 + + mount | awk '/direct|pvc-/ {print $3}' | xargs sudo umount -fl + + remove_luks + remove_lvm +} + +echo "-------------- Migration test on DirectCSI ${LEGACY_VERSION} -----------------------" +wget --quiet "https://github.com/minio/directpv/releases/download/${LEGACY_VERSION}/${LEGACY_FILE}" +chmod a+x "${LEGACY_FILE}" +migrate_test "./${LEGACY_FILE}" 4 diff --git a/functests/run-migration-tests.sh b/functests/run-migration-tests.sh new file mode 100755 index 000000000..fbd89a6cf --- /dev/null +++ b/functests/run-migration-tests.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +# +# This file is part of MinIO DirectPV +# Copyright (c) 2022 MinIO, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +ME=$(basename "$0") +export ME + +if [ "$#" -ne 1 ]; then + echo "usage: $ME " + exit 255 +fi + +SCRIPT_DIR=$(dirname "$0") +export SCRIPT_DIR + +"${SCRIPT_DIR}/execute.sh" "${SCRIPT_DIR}/migration-tests.sh" "$1" diff --git a/functests/tests.sh b/functests/tests.sh index 78444e27e..04b4c268f 100755 --- a/functests/tests.sh +++ b/functests/tests.sh @@ -22,12 +22,12 @@ source "${SCRIPT_DIR}/common.sh" function test_build() { export DIRECTPV_CLIENT=./kubectl-directpv - install_directpv + install_directpv 5 add_drives - deploy_minio - uninstall_minio + deploy_minio functests/minio.yaml + uninstall_minio functests/minio.yaml remove_drives - uninstall_directpv + uninstall_directpv 5 } echo "$ME: Setup environment" diff --git a/go.mod b/go.mod index 29efa28ba..afbfb2966 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,6 @@ require ( github.com/prometheus/client_model v0.3.0 github.com/spf13/cobra v1.6.1 github.com/spf13/viper v1.13.0 - github.com/stretchr/testify v1.8.0 golang.org/x/sys v0.1.0 golang.org/x/time v0.1.0 google.golang.org/grpc v1.50.1 @@ -23,7 +22,6 @@ require ( k8s.io/api v0.25.4 k8s.io/apiextensions-apiserver v0.25.3 k8s.io/apimachinery v0.25.4 - k8s.io/cli-runtime v0.25.4 k8s.io/client-go v0.25.4 k8s.io/klog/v2 v2.80.1 k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 @@ -64,7 +62,6 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.0.4 // indirect - github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect github.com/magiconair/properties v1.8.6 // indirect github.com/mailru/easyjson v0.7.6 // indirect github.com/mattn/go-colorable v0.1.12 // indirect @@ -78,7 +75,6 @@ require ( github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.0.5 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect diff --git a/go.sum b/go.sum index 17b9d2c41..e244af165 100644 --- a/go.sum +++ b/go.sum @@ -363,8 +363,6 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kubernetes-csi/csi-lib-utils v0.11.0 h1:FHWOBtAZBA/hVk7v/qaXgG9Sxv0/n06DebPFuDwumqg= github.com/kubernetes-csi/csi-lib-utils v0.11.0/go.mod h1:BmGZZB16L18+9+Lgg9YWwBKfNEHIDdgGfAyuW6p2NV0= -github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0= -github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo= github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= @@ -1076,8 +1074,6 @@ k8s.io/apiextensions-apiserver v0.25.3/go.mod h1:ZJqwpCkxIx9itilmZek7JgfUAM0dnTs k8s.io/apimachinery v0.22.0/go.mod h1:O3oNtNadZdeOMxHFVxOreoznohCpy0z6mocxbZr7oJ0= k8s.io/apimachinery v0.25.4 h1:CtXsuaitMESSu339tfhVXhQrPET+EiWnIY1rcurKnAc= k8s.io/apimachinery v0.25.4/go.mod h1:jaF9C/iPNM1FuLl7Zuy5b9v+n35HGSh6AQ4HYRkCqwo= -k8s.io/cli-runtime v0.25.4 h1:GTSBN7aKBrc2LqpdO30CmHQqJtRmotxV7XsMSP+QZIk= -k8s.io/cli-runtime v0.25.4/go.mod h1:JGOw1CR8v4Mcz6cEKA7bFQe0bPrNn1l5sGAX1/Ke4Eg= k8s.io/client-go v0.22.0/go.mod h1:GUjIuXR5PiEv/RVK5OODUsm6eZk7wtSWZSaSJbpFdGg= k8s.io/client-go v0.25.4 h1:3RNRDffAkNU56M/a7gUfXaEzdhZlYhoW8dgViGy5fn8= k8s.io/client-go v0.25.4/go.mod h1:8trHCAC83XKY0wsBIpbirZU4NTUpbuhc2JnI7OruGZw= diff --git a/pkg/apis/directpv.min.io/types/label.go b/pkg/apis/directpv.min.io/types/label.go index 2e844a932..a5305a917 100644 --- a/pkg/apis/directpv.min.io/types/label.go +++ b/pkg/apis/directpv.min.io/types/label.go @@ -70,6 +70,9 @@ const ( // TopologyDriverRegion label key for region TopologyDriverRegion LabelKey = consts.GroupName + "/region" + + // MigratedLabelKey denotes drive/volume migrated from legacy DirectCSI + MigratedLabelKey LabelKey = consts.GroupName + "/migrated" ) // LabelValue is a type definition for label value diff --git a/pkg/apis/directpv.min.io/v1beta1/drive.go b/pkg/apis/directpv.min.io/v1beta1/drive.go index af104db65..2f8af725e 100644 --- a/pkg/apis/directpv.min.io/v1beta1/drive.go +++ b/pkg/apis/directpv.min.io/v1beta1/drive.go @@ -34,6 +34,8 @@ const ( type DriveSpec struct { // +optional Unschedulable bool `json:"unschedulable,omitempty"` + // +optional + Relabel bool `json:"relabel,omitempty"` } // DriveStatus denotes drive information. @@ -278,6 +280,16 @@ func (drive *DirectPVDrive) setErrorCondition(errType, reason, message string) { } } +// SetMigratedLabel sets migrated label to this drive. +func (drive *DirectPVDrive) SetMigratedLabel() { + drive.SetLabel(types.MigratedLabelKey, "true") +} + +// IsMigrated indicates whether this is migrated drive or not. +func (drive *DirectPVDrive) IsMigrated() bool { + return drive.getLabel(types.MigratedLabelKey) == "true" +} + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // DirectPVDriveList denotes list of drives. diff --git a/pkg/apis/directpv.min.io/v1beta1/openapi_generated.go b/pkg/apis/directpv.min.io/v1beta1/openapi_generated.go index 7cb86cebd..3931dda18 100644 --- a/pkg/apis/directpv.min.io/v1beta1/openapi_generated.go +++ b/pkg/apis/directpv.min.io/v1beta1/openapi_generated.go @@ -243,6 +243,12 @@ func schema_pkg_apis_directpvminio_v1beta1_DriveSpec(ref common.ReferenceCallbac Format: "", }, }, + "relabel": { + SchemaProps: spec.SchemaProps{ + Type: []string{"boolean"}, + Format: "", + }, + }, }, }, }, diff --git a/pkg/apis/directpv.min.io/v1beta1/volume.go b/pkg/apis/directpv.min.io/v1beta1/volume.go index f40f9d117..9e4d2706d 100644 --- a/pkg/apis/directpv.min.io/v1beta1/volume.go +++ b/pkg/apis/directpv.min.io/v1beta1/volume.go @@ -251,6 +251,16 @@ func (volume *DirectPVVolume) SetCreatedByLabel() { volume.SetLabel(types.CreatedByLabelKey, consts.ControllerName) } +// SetMigratedLabel sets migrated label to this volume. +func (volume *DirectPVVolume) SetMigratedLabel() { + volume.SetLabel(types.MigratedLabelKey, "true") +} + +// IsMigrated indicates whether this is migrated volume or not. +func (volume *DirectPVVolume) IsMigrated() bool { + return volume.getLabel(types.MigratedLabelKey) == "true" +} + // SetPodName sets associated pod name to this volume. func (volume *DirectPVVolume) SetPodName(name string) { volume.SetLabel(types.PodNameLabelKey, types.ToLabelValue(name)) diff --git a/pkg/client/event.go b/pkg/client/event.go index c56f03753..801f41152 100644 --- a/pkg/client/event.go +++ b/pkg/client/event.go @@ -49,6 +49,7 @@ const ( EventReasonDriveMounted EventReason = "DriveMounted" EventReasonDriveHasMultipleMatches EventReason = "DriveHasMultipleMatches" EventReasonDriveIOError EventReason = "DriveHasIOError" + EventReasonDriveRelabelError EventReason = "DriveHasRelabelError" ) var ( diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index 3db0e5561..946263c14 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -87,6 +87,9 @@ const ( NodeServerName = "node-server" ControllerServerName = "controller" + LegacyNodeServerName = "legacy-node-server" + LegacyControllerServerName = "legacy-controller" + NodeAPIServerName = "node-api-server" NodeAPIPortName = "node-api-port" NodeAPIPort = 50443 diff --git a/pkg/consts/consts.go.in b/pkg/consts/consts.go.in index 2a3e124f5..b424646ce 100644 --- a/pkg/consts/consts.go.in +++ b/pkg/consts/consts.go.in @@ -85,6 +85,9 @@ const ( NodeServerName = "node-server" ControllerServerName = "controller" + LegacyNodeServerName = "legacy-node-server" + LegacyControllerServerName = "legacy-controller" + NodeAPIServerName = "node-api-server" NodeAPIPortName = "node-api-port" NodeAPIPort = 50443 diff --git a/pkg/controller/legacy_server.go b/pkg/controller/legacy_server.go new file mode 100644 index 000000000..c051803b5 --- /dev/null +++ b/pkg/controller/legacy_server.go @@ -0,0 +1,46 @@ +// This file is part of MinIO DirectPV +// Copyright (c) 2021, 2022 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package controller + +import ( + "context" + "fmt" + + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/minio/directpv/pkg/consts" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// LegacyServer denotes legacy controller server. +type LegacyServer struct { + Server +} + +// NewLegacyServer creates new legacy controller server. +func NewLegacyServer() *LegacyServer { + return &LegacyServer{} +} + +// CreateVolume - Creates a volume +// reference: https://github.com/container-storage-interface/spec/blob/master/spec.md#createvolume +func (c *LegacyServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { + return nil, status.Errorf( + codes.InvalidArgument, + fmt.Sprintf("legacy volume creation not supported; use %v storage class", consts.Identity), + ) +} diff --git a/pkg/controller/controller.go b/pkg/controller/server.go similarity index 98% rename from pkg/controller/controller.go rename to pkg/controller/server.go index 51d85922f..7e0daee63 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/server.go @@ -71,10 +71,10 @@ import ( * */ -// Server contains controller server properties +// Server denotes controller server. type Server struct{} -// NewServer returns the instance of controller server with the provided properties +// NewServer creates new controller server. func NewServer() *Server { return &Server{} } diff --git a/pkg/controller/controller_test.go b/pkg/controller/server_test.go similarity index 100% rename from pkg/controller/controller_test.go rename to pkg/controller/server_test.go diff --git a/pkg/device/sync.go b/pkg/device/sync.go index 26d2fc5fc..38342543d 100644 --- a/pkg/device/sync.go +++ b/pkg/device/sync.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "os" directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types" "github.com/minio/directpv/pkg/client" @@ -138,7 +139,37 @@ func Sync(ctx context.Context, nodeID directpvtypes.NodeID) error { client.Eventf(drive, client.EventTypeWarning, client.EventReasonDriveMountError, "unable to mount the drive; %v", err) klog.ErrorS(err, "unable to mount the drive", "Source", source, "Target", target) } else { - client.Eventf(drive, client.EventTypeNormal, client.EventReasonDriveMounted, "Drive mounted successfully to %s", target) + client.Eventf( + drive, + client.EventTypeNormal, + client.EventReasonDriveMounted, + "Drive mounted successfully to %s", target, + ) + if drive.Spec.Relabel { + if err = os.Symlink(".", types.GetVolumeRootDir(drive.Status.FSUUID)); err != nil { + if errors.Is(err, os.ErrExist) { + err = nil + } else { + client.Eventf( + drive, + client.EventTypeWarning, + client.EventReasonDriveRelabelError, + "unable to relabel; %v", err, + ) + klog.ErrorS( + err, + "unable to create symlink", + "symlink", types.GetVolumeRootDir(drive.Status.FSUUID), + "drive", drive.Name, + ) + } + } + + if err == nil { + drive.Spec.Relabel = false + updated = true + } + } } } default: diff --git a/pkg/installer/args.go b/pkg/installer/args.go index 650125047..cdc5c7567 100644 --- a/pkg/installer/args.go +++ b/pkg/installer/args.go @@ -50,6 +50,7 @@ type Args struct { Quiet bool DisableAdminService bool KubeVersion *version.Version + Legacy bool podSecurityAdmission bool csiProvisionerImage string diff --git a/pkg/installer/csidriver.go b/pkg/installer/csidriver.go index a6cc82d8e..48b4972f9 100644 --- a/pkg/installer/csidriver.go +++ b/pkg/installer/csidriver.go @@ -24,34 +24,25 @@ import ( "github.com/minio/directpv/pkg/consts" "github.com/minio/directpv/pkg/k8s" + legacyclient "github.com/minio/directpv/pkg/legacy/client" storagev1 "k8s.io/api/storage/v1" storagev1beta1 "k8s.io/api/storage/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" ) var errCSIDriverVersionUnsupported = errors.New("unsupported CSIDriver version found") -func createCSIDriver(ctx context.Context, args *Args) error { - var gvk *schema.GroupVersionKind - if args.DryRun { - if args.KubeVersion.Major() >= 1 && args.KubeVersion.Minor() < 19 { - gvk = &schema.GroupVersionKind{Version: "v1beta1"} - } else { - gvk = &schema.GroupVersionKind{Version: "v1"} - } - } else { - var err error - if gvk, err = k8s.GetGroupVersionKind("storage.k8s.io", "CSIDriver", "v1", "v1beta1"); err != nil { - return err - } - } - +func doCreateCSIDriver(ctx context.Context, args *Args, version string, legacy bool) error { podInfoOnMount := true attachRequired := false - switch gvk.Version { + name := consts.Identity + if legacy { + name = legacyclient.Identity + } + + switch version { case "v1": csiDriver := &storagev1.CSIDriver{ TypeMeta: metav1.TypeMeta{ @@ -59,7 +50,7 @@ func createCSIDriver(ctx context.Context, args *Args) error { Kind: "CSIDriver", }, ObjectMeta: metav1.ObjectMeta{ - Name: consts.Identity, + Name: name, Namespace: metav1.NamespaceNone, Annotations: map[string]string{}, Labels: defaultLabels, @@ -97,7 +88,7 @@ func createCSIDriver(ctx context.Context, args *Args) error { Kind: "CSIDriver", }, ObjectMeta: metav1.ObjectMeta{ - Name: consts.Identity, + Name: name, Namespace: metav1.NamespaceNone, Annotations: map[string]string{}, Labels: defaultLabels, @@ -133,20 +124,40 @@ func createCSIDriver(ctx context.Context, args *Args) error { } } -func deleteCSIDriver(ctx context.Context) error { - gvk, err := k8s.GetGroupVersionKind("storage.k8s.io", "CSIDriver", "v1", "v1beta1") - if err != nil { +func createCSIDriver(ctx context.Context, args *Args) error { + version := "v1" + if args.DryRun { + if args.KubeVersion.Major() >= 1 && args.KubeVersion.Minor() < 19 { + version = "v1beta1" + } + } else { + gvk, err := k8s.GetGroupVersionKind("storage.k8s.io", "CSIDriver", "v1", "v1beta1") + if err != nil { + return err + } + version = gvk.Version + } + + if err := doCreateCSIDriver(ctx, args, version, false); err != nil { return err } - switch gvk.Version { + if args.Legacy { + return doCreateCSIDriver(ctx, args, version, true) + } + + return nil +} + +func doDeleteCSIDriver(ctx context.Context, version, name string) (err error) { + switch version { case "v1": err = k8s.KubeClient().StorageV1().CSIDrivers().Delete( - ctx, consts.Identity, metav1.DeleteOptions{}, + ctx, name, metav1.DeleteOptions{}, ) case "v1beta1": err = k8s.KubeClient().StorageV1beta1().CSIDrivers().Delete( - ctx, consts.Identity, metav1.DeleteOptions{}, + ctx, name, metav1.DeleteOptions{}, ) default: return errCSIDriverVersionUnsupported @@ -158,3 +169,16 @@ func deleteCSIDriver(ctx context.Context) error { return nil } + +func deleteCSIDriver(ctx context.Context) error { + gvk, err := k8s.GetGroupVersionKind("storage.k8s.io", "CSIDriver", "v1", "v1beta1") + if err != nil { + return err + } + + if err = doDeleteCSIDriver(ctx, gvk.Version, consts.Identity); err != nil { + return err + } + + return doDeleteCSIDriver(ctx, gvk.Version, legacyclient.Identity) +} diff --git a/pkg/installer/daemonset.go b/pkg/installer/daemonset.go index 22700c950..12905dcfd 100644 --- a/pkg/installer/daemonset.go +++ b/pkg/installer/daemonset.go @@ -23,6 +23,7 @@ import ( "github.com/minio/directpv/pkg/consts" "github.com/minio/directpv/pkg/k8s" + legacyclient "github.com/minio/directpv/pkg/legacy/client" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -31,20 +32,24 @@ import ( ) const ( - nodeAPIServerCertsSecretName = "nodeapiservercerts" - nodeAPIServerCASecretName = "nodeapiservercacert" - volumeNameMountpointDir = "mountpoint-dir" - volumeNameRegistrationDir = "registration-dir" - volumeNamePluginDir = "plugins-dir" - volumeNameAppRootDir = consts.AppName + "-common-root" - appRootDir = consts.AppRootDir + "/" - volumeNameSysDir = "sysfs" - volumeNameDevDir = "devfs" - volumePathDevDir = "/dev" - volumeNameRunUdevData = "run-udev-data-dir" - volumePathRunUdevData = consts.UdevDataDir - socketFile = "/csi.sock" - nodeAPIServerCertsDir = "node-api-server-certs" + nodeAPIServerCertsSecretName = "nodeapiservercerts" + nodeAPIServerCASecretName = "nodeapiservercacert" + legacyNodeAPIServerCertsSecretName = "legacynodeapiservercerts" + legacyNodeAPIServerCASecretName = "legacynodeapiservercacert" + volumeNameMountpointDir = "mountpoint-dir" + volumeNameRegistrationDir = "registration-dir" + volumeNamePluginDir = "plugins-dir" + volumeNameAppRootDir = consts.AppName + "-common-root" + volumeNameLegacyAppRootDir = "direct-csi-common-root" + appRootDir = consts.AppRootDir + "/" + legacyAppRootDir = "/var/lib/direct-csi/" + volumeNameSysDir = "sysfs" + volumeNameDevDir = "devfs" + volumePathDevDir = "/dev" + volumeNameRunUdevData = "run-udev-data-dir" + volumePathRunUdevData = consts.UdevDataDir + socketFile = "/csi.sock" + nodeAPIServerCertsDir = "node-api-server-certs" ) func createOrUpdateNodeAPIServerSecrets(ctx context.Context, args *Args) error { @@ -79,23 +84,22 @@ func createOrUpdateNodeAPIServerSecrets(ctx context.Context, args *Args) error { ) } -func doCreateDaemonset(ctx context.Context, args *Args) error { - if err := createOrUpdateNodeAPIServerSecrets(ctx, args); err != nil { - return err - } - +func newSecurityContext(seccompProfile string) *corev1.SecurityContext { privileged := true securityContext := &corev1.SecurityContext{Privileged: &privileged} - - if args.SeccompProfile != "" { + if seccompProfile != "" { securityContext.SeccompProfile = &corev1.SeccompProfile{ Type: corev1.SeccompProfileTypeLocalhost, - LocalhostProfile: &args.SeccompProfile, + LocalhostProfile: &seccompProfile, } } - volumes := []corev1.Volume{ - newHostPathVolume(volumeNameSocketDir, newPluginsSocketDir(kubeletDirPath, consts.Identity)), + return securityContext +} + +func getVolumesAndMounts(pluginSocketDir, certsSecretName string, legacy bool) (volumes []corev1.Volume, volumeMounts []corev1.VolumeMount) { + volumes = []corev1.Volume{ + newHostPathVolume(volumeNameSocketDir, pluginSocketDir), newHostPathVolume(volumeNameMountpointDir, kubeletDirPath+"/pods"), newHostPathVolume(volumeNameRegistrationDir, kubeletDirPath+"/plugins_registry"), newHostPathVolume(volumeNamePluginDir, kubeletDirPath+"/plugins"), @@ -103,9 +107,9 @@ func doCreateDaemonset(ctx context.Context, args *Args) error { newHostPathVolume(volumeNameSysDir, volumePathSysDir), newHostPathVolume(volumeNameDevDir, volumePathDevDir), newHostPathVolume(volumeNameRunUdevData, volumePathRunUdevData), - newSecretVolume(nodeAPIServerCertsDir, nodeAPIServerCertsSecretName), // node api server + newSecretVolume(nodeAPIServerCertsDir, certsSecretName), } - volumeMounts := []corev1.VolumeMount{ + volumeMounts = []corev1.VolumeMount{ newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false), newVolumeMount(volumeNameMountpointDir, kubeletDirPath+"/pods", corev1.MountPropagationBidirectional, false), newVolumeMount(volumeNamePluginDir, kubeletDirPath+"/plugins", corev1.MountPropagationBidirectional, false), @@ -115,6 +119,141 @@ func doCreateDaemonset(ctx context.Context, args *Args) error { newVolumeMount(volumeNameRunUdevData, volumePathRunUdevData, corev1.MountPropagationBidirectional, true), } + if legacy { + volumes = append( + volumes, + newHostPathVolume(volumeNameLegacyAppRootDir, legacyAppRootDir), + ) + volumeMounts = append( + volumeMounts, + newVolumeMount(volumeNameLegacyAppRootDir, legacyAppRootDir, corev1.MountPropagationBidirectional, false), + ) + } + + return +} + +func nodeDriverRegistrarContainer(image, pluginSocketDir string) corev1.Container { + return corev1.Container{ + Name: "node-driver-registrar", + Image: image, + Args: []string{ + fmt.Sprintf("--v=%d", logLevel), + fmt.Sprintf("--csi-address=%v", UnixCSIEndpoint), + fmt.Sprintf("--kubelet-registration-path=%s%s", pluginSocketDir, socketFile), + }, + Env: []corev1.EnvVar{kubeNodeNameEnvVar}, + VolumeMounts: []corev1.VolumeMount{ + newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false), + newVolumeMount(volumeNameRegistrationDir, "/registration", corev1.MountPropagationNone, false), + }, + TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError, + TerminationMessagePath: "/var/log/driver-registrar-termination-log", + } +} + +func nodeServerContainer(image string, args []string, securityContext *corev1.SecurityContext, volumeMounts []corev1.VolumeMount) corev1.Container { + return corev1.Container{ + Name: consts.NodeServerName, + Image: image, + Args: args, + SecurityContext: securityContext, + Env: []corev1.EnvVar{kubeNodeNameEnvVar, csiEndpointEnvVar}, + TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError, + TerminationMessagePath: "/var/log/driver-termination-log", + VolumeMounts: volumeMounts, + Ports: append(commonContainerPorts, corev1.ContainerPort{ + ContainerPort: consts.MetricsPort, + Name: "metrics", + Protocol: corev1.ProtocolTCP, + }), + ReadinessProbe: &corev1.Probe{ProbeHandler: readinessHandler}, + LivenessProbe: &corev1.Probe{ + FailureThreshold: 5, + InitialDelaySeconds: 300, + TimeoutSeconds: 5, + PeriodSeconds: 5, + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: healthZContainerPortPath, + Port: intstr.FromString(healthZContainerPortName), + }, + }, + }, + } +} + +func livenessProbeContainer(image string) corev1.Container { + return corev1.Container{ + Name: "liveness-probe", + Image: image, + Args: []string{ + fmt.Sprintf("--csi-address=%v%v", socketDir, socketFile), + fmt.Sprintf("--health-port=%v", healthZContainerPort), + }, + TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError, + TerminationMessagePath: "/var/log/driver-liveness-termination-log", + VolumeMounts: []corev1.VolumeMount{ + newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false), + }, + } +} + +func newDaemonset(podSpec corev1.PodSpec, name, appArmorProfile string) *appsv1.DaemonSet { + annotations := map[string]string{createdByLabel: pluginName} + if appArmorProfile != "" { + annotations["container.apparmor.security.beta.kubernetes.io/"+consts.AppName] = appArmorProfile + } + selectorValue := fmt.Sprintf("%v-%v", consts.Identity, getRandSuffix()) + + return &appsv1.DaemonSet{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "DaemonSet", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: consts.Identity, + Annotations: map[string]string{}, + Labels: defaultLabels, + }, + Spec: appsv1.DaemonSetSpec{ + Selector: metav1.AddLabelToSelector(&metav1.LabelSelector{}, selectorKey, selectorValue), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: consts.Identity, + Annotations: annotations, + Labels: map[string]string{ + selectorKey: selectorValue, + serviceSelector: selectorValueEnabled, + }, + }, + Spec: podSpec, + }, + }, + Status: appsv1.DaemonSetStatus{}, + } +} + +func doCreateDaemonset(ctx context.Context, args *Args) error { + if err := createOrUpdateNodeAPIServerSecrets(ctx, args); err != nil { + return err + } + + securityContext := newSecurityContext(args.SeccompProfile) + pluginSocketDir := newPluginsSocketDir(kubeletDirPath, consts.Identity) + volumes, volumeMounts := getVolumesAndMounts(pluginSocketDir, nodeAPIServerCertsSecretName, false) + containerArgs := []string{ + consts.NodeServerName, + fmt.Sprintf("-v=%d", logLevel), + fmt.Sprintf("--identity=%s", consts.Identity), + fmt.Sprintf("--csi-endpoint=$(%s)", csiEndpointEnvVarName), + fmt.Sprintf("--kube-node-name=$(%s)", kubeNodeNameEnvVarName), + fmt.Sprintf("--readiness-port=%d", consts.ReadinessPort), + fmt.Sprintf("--metrics-port=%d", consts.MetricsPort), + } + podSpec := corev1.PodSpec{ ServiceAccountName: consts.Identity, HostIPC: false, @@ -122,64 +261,8 @@ func doCreateDaemonset(ctx context.Context, args *Args) error { Volumes: volumes, ImagePullSecrets: args.getImagePullSecrets(), Containers: []corev1.Container{ - { - Name: "node-driver-registrar", - Image: args.getNodeDriverRegistrarImage(), - Args: []string{ - fmt.Sprintf("--v=%d", logLevel), - fmt.Sprintf("--csi-address=%v", UnixCSIEndpoint), - fmt.Sprintf( - "--kubelet-registration-path=%s%s", - newPluginsSocketDir(kubeletDirPath, consts.Identity), socketFile, - ), - }, - Env: []corev1.EnvVar{kubeNodeNameEnvVar}, - VolumeMounts: []corev1.VolumeMount{ - newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false), - newVolumeMount(volumeNameRegistrationDir, "/registration", corev1.MountPropagationNone, false), - }, - TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError, - TerminationMessagePath: "/var/log/driver-registrar-termination-log", - }, - { - Name: consts.NodeServerName, - Image: args.getContainerImage(), - Args: func() []string { - args := []string{ - consts.NodeServerName, - fmt.Sprintf("-v=%d", logLevel), - fmt.Sprintf("--identity=%s", consts.Identity), - fmt.Sprintf("--csi-endpoint=$(%s)", csiEndpointEnvVarName), - fmt.Sprintf("--kube-node-name=$(%s)", kubeNodeNameEnvVarName), - fmt.Sprintf("--readiness-port=%d", consts.ReadinessPort), - fmt.Sprintf("--metrics-port=%d", consts.MetricsPort), - } - return args - }(), - SecurityContext: securityContext, - Env: []corev1.EnvVar{kubeNodeNameEnvVar, csiEndpointEnvVar}, - TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError, - TerminationMessagePath: "/var/log/driver-termination-log", - VolumeMounts: volumeMounts, - Ports: append(commonContainerPorts, corev1.ContainerPort{ - ContainerPort: consts.MetricsPort, - Name: "metrics", - Protocol: corev1.ProtocolTCP, - }), - ReadinessProbe: &corev1.Probe{ProbeHandler: readinessHandler}, - LivenessProbe: &corev1.Probe{ - FailureThreshold: 5, - InitialDelaySeconds: 300, - TimeoutSeconds: 5, - PeriodSeconds: 5, - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: healthZContainerPortPath, - Port: intstr.FromString(healthZContainerPortName), - }, - }, - }, - }, + nodeDriverRegistrarContainer(args.getNodeDriverRegistrarImage(), pluginSocketDir), + nodeServerContainer(args.getContainerImage(), containerArgs, securityContext, volumeMounts), { Name: consts.NodeAPIServerName, Image: args.getContainerImage(), @@ -218,61 +301,62 @@ func doCreateDaemonset(ctx context.Context, args *Args) error { }, },*/ }, - { - Name: "liveness-probe", - Image: args.getLivenessProbeImage(), - Args: []string{ - fmt.Sprintf("--csi-address=%v%v", socketDir, socketFile), - fmt.Sprintf("--health-port=%v", healthZContainerPort), - }, - TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError, - TerminationMessagePath: "/var/log/driver-liveness-termination-log", - VolumeMounts: []corev1.VolumeMount{ - newVolumeMount(volumeNameSocketDir, socketDir, corev1.MountPropagationNone, false), - }, - }, + livenessProbeContainer(args.getLivenessProbeImage()), }, NodeSelector: args.NodeSelector, Tolerations: args.Tolerations, } - annotations := map[string]string{ - createdByLabel: pluginName, + daemonset := newDaemonset(podSpec, consts.NodeServerName, args.AppArmorProfile) + + if args.DryRun { + fmt.Print(mustGetYAML(daemonset)) + return nil } - if args.AppArmorProfile != "" { - annotations["container.apparmor.security.beta.kubernetes.io/"+consts.AppName] = args.AppArmorProfile + + _, err := k8s.KubeClient().AppsV1().DaemonSets(consts.Identity).Create( + ctx, daemonset, metav1.CreateOptions{}, + ) + if err != nil { + if apierrors.IsAlreadyExists(err) { + err = nil + } + return err } - selectorValue := fmt.Sprintf("%v-%v", consts.Identity, getRandSuffix()) - daemonset := &appsv1.DaemonSet{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "apps/v1", - Kind: "DaemonSet", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: consts.NodeServerName, - Namespace: consts.Identity, - Annotations: map[string]string{}, - Labels: defaultLabels, - }, - Spec: appsv1.DaemonSetSpec{ - Selector: metav1.AddLabelToSelector(&metav1.LabelSelector{}, selectorKey, selectorValue), - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Name: consts.NodeServerName, - Namespace: consts.Identity, - Annotations: annotations, - Labels: map[string]string{ - selectorKey: selectorValue, - serviceSelector: selectorValueEnabled, - }, - }, - Spec: podSpec, - }, + _, err = io.WriteString(args.auditWriter, mustGetYAML(daemonset)) + return err +} + +func doCreateLegacyDaemonset(ctx context.Context, args *Args) error { + securityContext := newSecurityContext(args.SeccompProfile) + pluginSocketDir := newPluginsSocketDir(kubeletDirPath, legacyclient.Identity) + volumes, volumeMounts := getVolumesAndMounts(pluginSocketDir, legacyNodeAPIServerCertsSecretName, true) + containerArgs := []string{ + consts.LegacyNodeServerName, + fmt.Sprintf("-v=%d", logLevel), + fmt.Sprintf("--csi-endpoint=$(%s)", csiEndpointEnvVarName), + fmt.Sprintf("--kube-node-name=$(%s)", kubeNodeNameEnvVarName), + fmt.Sprintf("--readiness-port=%d", consts.ReadinessPort), + } + + podSpec := corev1.PodSpec{ + ServiceAccountName: consts.Identity, + HostIPC: false, + HostPID: true, + Volumes: volumes, + ImagePullSecrets: args.getImagePullSecrets(), + Containers: []corev1.Container{ + nodeDriverRegistrarContainer(args.getNodeDriverRegistrarImage(), pluginSocketDir), + nodeServerContainer(args.getContainerImage(), containerArgs, securityContext, volumeMounts), + livenessProbeContainer(args.getLivenessProbeImage()), }, - Status: appsv1.DaemonSetStatus{}, + NodeSelector: args.NodeSelector, + Tolerations: args.Tolerations, } + daemonset := newDaemonset(podSpec, consts.LegacyNodeServerName, args.AppArmorProfile) + if args.DryRun { fmt.Print(mustGetYAML(daemonset)) return nil @@ -294,7 +378,15 @@ func doCreateDaemonset(ctx context.Context, args *Args) error { func createDaemonset(ctx context.Context, args *Args) error { if args.DryRun { - return doCreateDaemonset(ctx, args) + if err := doCreateDaemonset(ctx, args); err != nil { + return err + } + + if args.Legacy { + return doCreateLegacyDaemonset(ctx, args) + } + + return nil } _, err := k8s.KubeClient().AppsV1().DaemonSets(consts.Identity).Get( @@ -304,7 +396,25 @@ func createDaemonset(ctx context.Context, args *Args) error { if !apierrors.IsNotFound(err) { return err } - return doCreateDaemonset(ctx, args) + if err := doCreateDaemonset(ctx, args); err != nil { + return err + } + } + + if !args.Legacy { + return nil + } + + _, err = k8s.KubeClient().AppsV1().DaemonSets(consts.Identity).Get( + ctx, consts.LegacyNodeServerName, metav1.GetOptions{}, + ) + if err != nil { + if !apierrors.IsNotFound(err) { + return err + } + if err := doCreateLegacyDaemonset(ctx, args); err != nil { + return err + } } return nil @@ -327,5 +437,23 @@ func deleteDaemonset(ctx context.Context) error { if err != nil && !apierrors.IsNotFound(err) { return err } + + err = k8s.KubeClient().AppsV1().DaemonSets(consts.Identity).Delete( + ctx, consts.LegacyNodeServerName, metav1.DeleteOptions{}, + ) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + + err = k8s.KubeClient().CoreV1().Secrets(consts.Identity).Delete(ctx, legacyNodeAPIServerCertsSecretName, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + + err = k8s.KubeClient().CoreV1().Secrets(consts.Identity).Delete(ctx, legacyNodeAPIServerCASecretName, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + return nil } diff --git a/pkg/installer/deployment.go b/pkg/installer/deployment.go index 78c18b863..6a9ff7497 100644 --- a/pkg/installer/deployment.go +++ b/pkg/installer/deployment.go @@ -38,7 +38,23 @@ const ( nodeAPIServerCADir = "node-api-server-ca" ) -func createDeployment(ctx context.Context, args *Args) error { +func doCreateDeployment(ctx context.Context, args *Args, legacy bool) error { + name := consts.ControllerServerName + containerArgs := []string{name, fmt.Sprintf("--identity=%s", consts.Identity)} + if legacy { + name = consts.LegacyControllerServerName + containerArgs = []string{name} + } + containerArgs = append( + containerArgs, + []string{ + fmt.Sprintf("-v=%d", logLevel), + fmt.Sprintf("--csi-endpoint=$(%s)", csiEndpointEnvVarName), + fmt.Sprintf("--kube-node-name=$(%s)", kubeNodeNameEnvVarName), + fmt.Sprintf("--readiness-port=%d", consts.ReadinessPort), + }..., + ) + privileged := true podSpec := corev1.PodSpec{ ServiceAccountName: consts.Identity, @@ -87,14 +103,7 @@ func createDeployment(ctx context.Context, args *Args) error { { Name: consts.ControllerServerName, Image: args.getContainerImage(), - Args: []string{ - consts.ControllerServerName, - fmt.Sprintf("-v=%d", logLevel), - fmt.Sprintf("--identity=%s", consts.Identity), - fmt.Sprintf("--csi-endpoint=$(%s)", csiEndpointEnvVarName), - fmt.Sprintf("--kube-node-name=$(%s)", kubeNodeNameEnvVarName), - fmt.Sprintf("--readiness-port=%d", consts.ReadinessPort), - }, + Args: containerArgs, SecurityContext: &corev1.SecurityContext{ Privileged: &privileged, }, @@ -116,7 +125,7 @@ func createDeployment(ctx context.Context, args *Args) error { Kind: "Deployment", }, ObjectMeta: metav1.ObjectMeta{ - Name: consts.ControllerServerName, + Name: name, Namespace: consts.Identity, Annotations: map[string]string{}, Labels: defaultLabels, @@ -127,7 +136,7 @@ func createDeployment(ctx context.Context, args *Args) error { Selector: metav1.AddLabelToSelector(&metav1.LabelSelector{}, selectorKey, selectorValue), Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Name: consts.ControllerServerName, + Name: name, Namespace: consts.Identity, Annotations: map[string]string{ createdByLabel: pluginName, @@ -161,6 +170,18 @@ func createDeployment(ctx context.Context, args *Args) error { return err } +func createDeployment(ctx context.Context, args *Args) error { + if err := doCreateDeployment(ctx, args, false); err != nil { + return err + } + + if args.Legacy { + return doCreateDeployment(ctx, args, true) + } + + return nil +} + func removeFinalizer(objectMeta *metav1.ObjectMeta, finalizer string) []string { removeByIndex := func(s []string, index int) []string { return append(s[:index], s[index+1:]...) @@ -195,7 +216,11 @@ func doDeleteDeployment(ctx context.Context, name string) error { } func deleteDeployment(ctx context.Context) error { - return doDeleteDeployment(ctx, consts.ControllerServerName) + if err := doDeleteDeployment(ctx, consts.ControllerServerName); err != nil { + return err + } + + return doDeleteDeployment(ctx, consts.LegacyControllerServerName) } func createAdminService(ctx context.Context, args *Args) error { diff --git a/pkg/installer/directpv.min.io_directpvdrives.yaml b/pkg/installer/directpv.min.io_directpvdrives.yaml index 5fdc2c213..045bff202 100644 --- a/pkg/installer/directpv.min.io_directpvdrives.yaml +++ b/pkg/installer/directpv.min.io_directpvdrives.yaml @@ -35,6 +35,8 @@ spec: spec: description: DriveSpec represents DirectPV drive specification values. properties: + relabel: + type: boolean unschedulable: type: boolean type: object diff --git a/pkg/installer/installer.go b/pkg/installer/installer.go index e7cef5f45..913fc1233 100644 --- a/pkg/installer/installer.go +++ b/pkg/installer/installer.go @@ -155,6 +155,18 @@ func Install(ctx context.Context, args *Args) error { return err } + if args.Legacy { + err := execute( + "Migrate legacy drives and volumes", + func(ctx context.Context, args *Args) error { + return Migrate(ctx, args.DryRun) + }, + ) + if err != nil { + return err + } + } + if err := execute("CSI Driver", createCSIDriver); err != nil { return err } diff --git a/pkg/installer/migrate.go b/pkg/installer/migrate.go new file mode 100644 index 000000000..5628e131d --- /dev/null +++ b/pkg/installer/migrate.go @@ -0,0 +1,359 @@ +// This file is part of MinIO DirectPV +// Copyright (c) 2022 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package installer + +import ( + "context" + "fmt" + "regexp" + "strings" + + directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types" + "github.com/minio/directpv/pkg/client" + "github.com/minio/directpv/pkg/consts" + directv1beta3 "github.com/minio/directpv/pkg/legacy/apis/direct.csi.min.io/v1beta3" + directv1beta4 "github.com/minio/directpv/pkg/legacy/apis/direct.csi.min.io/v1beta4" + directv1beta5 "github.com/minio/directpv/pkg/legacy/apis/direct.csi.min.io/v1beta5" + legacyclient "github.com/minio/directpv/pkg/legacy/client" + "github.com/minio/directpv/pkg/types" + "github.com/minio/directpv/pkg/utils" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + legacyAccessTierLabelKey = legacyclient.GroupName + "/access-tier" + legacyCreatedByLabelKey = legacyclient.GroupName + "/created-by" + legacyNodeLabelKey = legacyclient.GroupName + "/node" + legacyPathLabelKey = legacyclient.GroupName + "/path" + legacyVersionLabelKey = legacyclient.GroupName + "/version" + legacyDriveLabelKey = legacyclient.GroupName + "/drive" + legacyDrivePathLabelKey = legacyclient.GroupName + "/drive-path" + legacyPVProtection = legacyclient.GroupName + "/pv-protection" + legacyPurgeProtection = legacyclient.GroupName + "/purge-protection" +) + +var uuidRegex = regexp.MustCompile("^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$") + +func migrateDrives(ctx context.Context, dryRun bool) (driveMap map[string]string, legacyDriveErrors map[string]error, driveErrors map[string]error, err error) { + ctx, cancelFunc := context.WithCancel(ctx) + defer cancelFunc() + + driveMap = map[string]string{} + legacyDriveErrors = map[string]error{} + driveErrors = map[string]error{} + + fsUUIDs := make(utils.StringSet) + for result := range legacyclient.ListDrives(ctx) { + if result.Err != nil { + return nil, legacyDriveErrors, driveErrors, fmt.Errorf( + "unable to get legacy drives; %v", result.Err, + ) + } + + if result.Drive.Status.DriveStatus != directv1beta5.DriveStatusReady && + result.Drive.Status.DriveStatus != directv1beta5.DriveStatusInUse { + continue // ignore other than Ready/InUse drives + } + + if !uuidRegex.MatchString(result.Drive.Status.FilesystemUUID) { + legacyDriveErrors[result.Drive.Name] = fmt.Errorf( + "invalid filesystem UUID %v in legacy drive %v", + result.Drive.Status.FilesystemUUID, result.Drive.Name, + ) + continue + } + + if fsUUIDs.Exist(result.Drive.Status.FilesystemUUID) { + legacyDriveErrors[result.Drive.Name] = fmt.Errorf( + "duplicate filesystem UUID %v found in legacy drive %v", + result.Drive.Status.FilesystemUUID, result.Drive.Name, + ) + continue + } + + fsUUIDs.Set(result.Drive.Status.FilesystemUUID) + driveMap[result.Drive.Name] = result.Drive.Status.FilesystemUUID + + driveID := directpvtypes.DriveID(result.Drive.Status.FilesystemUUID) + nodeID := directpvtypes.NodeID(result.Drive.Status.NodeName) + driveName := directpvtypes.DriveName(utils.TrimDevPrefix(result.Drive.Status.Path)) + accessTier := directpvtypes.AccessTierDefault + switch result.Drive.Status.AccessTier { + case directv1beta5.AccessTierCold, directv1beta5.AccessTierWarm, directv1beta5.AccessTierHot: + accessTier = directpvtypes.AccessTier(result.Drive.Status.AccessTier) + } + + topology := map[string]string{} + for key, value := range result.Drive.Status.Topology { + if strings.HasPrefix(key, legacyclient.GroupName) { + key = strings.Replace(key, legacyclient.GroupName, consts.GroupName, 1) + } + + if key == string(directpvtypes.TopologyDriverIdentity) && + strings.HasPrefix(value, legacyclient.Identity) { + value = strings.Replace(value, legacyclient.Identity, consts.Identity, 1) + } + topology[key] = value + } + + status := types.DriveStatus{ + TotalCapacity: result.Drive.Status.TotalCapacity, + AllocatedCapacity: result.Drive.Status.AllocatedCapacity, + FreeCapacity: result.Drive.Status.FreeCapacity, + FSUUID: result.Drive.Status.FilesystemUUID, + Status: directpvtypes.DriveStatusReady, + Topology: topology, + } + + drive := types.NewDrive( + driveID, + status, + nodeID, + driveName, + accessTier, + ) + drive.SetMigratedLabel() + drive.Spec.Relabel = true + + for key, value := range result.Drive.Labels { + switch key { + case legacyAccessTierLabelKey: + case legacyCreatedByLabelKey: + case legacyNodeLabelKey: + case legacyPathLabelKey: + case legacyVersionLabelKey: + default: + if strings.HasPrefix(key, legacyclient.GroupName) { + key = strings.Replace(key, legacyclient.GroupName, consts.GroupName, 1) + } + drive.Labels[key] = value + } + } + + for _, finalizer := range result.Drive.Finalizers { + if strings.HasPrefix(finalizer, legacyclient.GroupName) { + finalizer = strings.Replace(finalizer, legacyclient.GroupName, consts.GroupName, 1) + } + + if !utils.Contains(drive.Finalizers, finalizer) { + drive.Finalizers = append(drive.Finalizers, finalizer) + } + } + + existingDrive, err := client.DriveClient().Get(ctx, string(driveID), metav1.GetOptions{}) + if err != nil { + switch { + case apierrors.IsNotFound(err): + if !dryRun { + _, err = client.DriveClient().Create(ctx, drive, metav1.CreateOptions{}) + if err != nil { + legacyDriveErrors[result.Drive.Name] = fmt.Errorf( + "unable to create drive %v by migrating legacy drive %v; %w", + driveID, result.Drive.Name, err, + ) + } + } + default: + driveErrors[string(driveID)] = fmt.Errorf( + "unable to get drive by drive ID %v; %w", driveID, err, + ) + delete(driveMap, result.Drive.Name) + } + } else { + if existingDrive.IsMigrated() { + legacyDriveErrors[result.Drive.Name] = fmt.Errorf( + "legacy drive %v is already migrated to drive %v", + result.Drive.Name, existingDrive.Name, + ) + } else { + legacyDriveErrors[result.Drive.Name] = fmt.Errorf( + "unable to migrate legacy drive %v; drive %v already exists", + result.Drive.Name, existingDrive.Name, + ) + } + } + } + + return driveMap, legacyDriveErrors, driveErrors, nil +} + +func migrateVolumes(ctx context.Context, driveMap map[string]string, dryRun bool) (legacyVolumeErrors map[string]error, volumeErrors map[string]error, err error) { + ctx, cancelFunc := context.WithCancel(ctx) + defer cancelFunc() + + legacyVolumeErrors = map[string]error{} + volumeErrors = map[string]error{} + + for result := range legacyclient.ListVolumes(ctx) { + if result.Err != nil { + return legacyVolumeErrors, volumeErrors, fmt.Errorf( + "unable to get legacy volumes; %v", result.Err, + ) + } + + fsuuid, found := driveMap[result.Volume.Status.Drive] + if !found { + legacyVolumeErrors[result.Volume.Name] = fmt.Errorf( + "referring drive %v of volume %v not found", + result.Volume.Status.Drive, result.Volume.Name, + ) + continue + } + + name := result.Volume.Name + nodeID := directpvtypes.NodeID(result.Volume.Status.NodeName) + driveID := directpvtypes.DriveID(fsuuid) + driveName := directpvtypes.DriveName(result.Volume.Labels["direct.csi.min.io/drive-path"]) + size := result.Volume.Status.TotalCapacity + + volume := types.NewVolume( + name, + fsuuid, + nodeID, + driveID, + driveName, + size, + ) + volume.SetMigratedLabel() + volume.Status.DataPath = result.Volume.Status.HostPath + volume.Status.StagingTargetPath = result.Volume.Status.StagingPath + volume.Status.TargetPath = result.Volume.Status.ContainerPath + volume.Status.AvailableCapacity = result.Volume.Status.AvailableCapacity + volume.Status.UsedCapacity = result.Volume.Status.UsedCapacity + if volume.Status.StagingTargetPath != "" { + volume.Status.Status = directpvtypes.VolumeStatusReady + } + + for key, value := range result.Volume.Labels { + switch key { + case legacyCreatedByLabelKey: + case legacyDriveLabelKey: + case legacyDrivePathLabelKey: + case legacyNodeLabelKey: + case legacyVersionLabelKey: + default: + if strings.HasPrefix(key, legacyclient.GroupName) { + key = strings.Replace(key, legacyclient.GroupName, consts.GroupName, 1) + } + volume.Labels[key] = value + } + } + + for _, finalizer := range result.Volume.Finalizers { + switch finalizer { + case legacyPVProtection: + case legacyPurgeProtection: + default: + if strings.HasPrefix(finalizer, legacyclient.GroupName) { + finalizer = strings.Replace(finalizer, legacyclient.GroupName, consts.GroupName, 1) + } + + if !utils.Contains(volume.Finalizers, finalizer) { + volume.Finalizers = append(volume.Finalizers, finalizer) + } + } + } + + existingVolume, err := client.VolumeClient().Get(ctx, name, metav1.GetOptions{}) + if err != nil { + switch { + case apierrors.IsNotFound(err): + if !dryRun { + _, err = client.VolumeClient().Create(ctx, volume, metav1.CreateOptions{}) + if err != nil { + legacyVolumeErrors[result.Volume.Name] = fmt.Errorf( + "unable to create volume %v by migrating legacy volume %v; %w", + name, result.Volume.Name, err, + ) + } + } + default: + volumeErrors[name] = fmt.Errorf("unable to get volume %v; %w", name, err) + } + } else { + if existingVolume.IsMigrated() { + legacyVolumeErrors[result.Volume.Name] = fmt.Errorf( + "legacy volume %v is already migrated to volume %v", + result.Volume.Name, existingVolume.Name, + ) + } else { + legacyVolumeErrors[result.Volume.Name] = fmt.Errorf( + "unable to migrate legacy volume %v; volume %v already exists", + result.Volume.Name, existingVolume.Name, + ) + } + } + } + + return legacyVolumeErrors, volumeErrors, nil +} + +// Migrate migrates legacy drives and volumes. +func Migrate(ctx context.Context, dryRun bool) error { + legacyclient.Init() + + version, _, err := legacyclient.GetGroupVersion("DirectCSIDrive") + if err != nil { + return fmt.Errorf("unable to probe DirectCSIDrive version; %w", err) + } + + switch version { + case directv1beta5.Version, directv1beta4.Version, directv1beta3.Version: + default: + return fmt.Errorf("migration does not support DirectCSIDrive version %v", version) + } + + version, _, err = legacyclient.GetGroupVersion("DirectCSIVolume") + if err != nil { + return fmt.Errorf("unable to probe DirectCSIVolume version; %w", err) + } + + switch version { + case directv1beta5.Version, directv1beta4.Version, directv1beta3.Version: + default: + return fmt.Errorf("migration does not support DirectCSIVolume version %v", version) + } + + driveMap, legacyDriveErrors, driveErrors, err := migrateDrives(ctx, dryRun) + if err != nil { + return err + } + + legacyVolumeErrors, volumeErrors, err := migrateVolumes(ctx, driveMap, dryRun) + if err != nil { + return err + } + + if len(legacyDriveErrors) != 0 { + fmt.Printf("legacy drive errors:\n%+v\n", legacyDriveErrors) + } + + if len(driveErrors) != 0 { + fmt.Printf("drive errors:\n%+v\n", driveErrors) + } + + if len(legacyVolumeErrors) != 0 { + fmt.Printf("legacy volume errors:\n%+v\n", legacyVolumeErrors) + } + + if len(volumeErrors) != 0 { + fmt.Printf("volume errors:\n%+v\n", volumeErrors) + } + + return nil +} diff --git a/pkg/installer/migrate_test.go b/pkg/installer/migrate_test.go new file mode 100644 index 000000000..7cba9e1bb --- /dev/null +++ b/pkg/installer/migrate_test.go @@ -0,0 +1,498 @@ +// This file is part of MinIO DirectPV +// Copyright (c) 2022 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package installer + +import ( + "context" + "reflect" + "testing" + + directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types" + "github.com/minio/directpv/pkg/client" + clientsetfake "github.com/minio/directpv/pkg/clientset/fake" + directv1beta5 "github.com/minio/directpv/pkg/legacy/apis/direct.csi.min.io/v1beta5" + legacyclient "github.com/minio/directpv/pkg/legacy/client" + legacyclientsetfake "github.com/minio/directpv/pkg/legacy/clientset/fake" + "github.com/minio/directpv/pkg/types" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestMigrateDrivesError(t *testing.T) { + // invalid FilesystemUUID + drive := &directv1beta5.DirectCSIDrive{ + TypeMeta: legacyclient.DirectCSIDriveTypeMeta(), + ObjectMeta: metav1.ObjectMeta{Name: "887e88cc-16ca-428f-9637-fde599b21b26"}, + Status: directv1beta5.DirectCSIDriveStatus{DriveStatus: directv1beta5.DriveStatusReady}, + } + + legacyclient.SetDriveClient(legacyclientsetfake.NewSimpleClientset(drive)) + driveMap, legacyDriveErrors, driveErrors, err := migrateDrives(context.TODO(), false) + if len(driveMap) == 0 && len(legacyDriveErrors) == 0 && len(driveErrors) == 0 && err == nil { + t.Fatalf("expected error, but succeeded\n") + } + + drive = &directv1beta5.DirectCSIDrive{ + TypeMeta: legacyclient.DirectCSIDriveTypeMeta(), + ObjectMeta: metav1.ObjectMeta{Name: "7d552939-1daf-4887-8879-777c503cb1d7"}, + Status: directv1beta5.DirectCSIDriveStatus{ + DriveStatus: directv1beta5.DriveStatusInUse, + FilesystemUUID: "5e6849e1126441c18d51e3c568acc6fc", + }, + } + legacyclient.SetDriveClient(legacyclientsetfake.NewSimpleClientset(drive)) + driveMap, legacyDriveErrors, driveErrors, err = migrateDrives(context.TODO(), false) + if len(driveMap) == 0 && len(legacyDriveErrors) == 0 && len(driveErrors) == 0 && err == nil { + t.Fatalf("expected error, but succeeded\n") + } + + // duplicate FilesystemUUID + drive1 := &directv1beta5.DirectCSIDrive{ + TypeMeta: legacyclient.DirectCSIDriveTypeMeta(), + ObjectMeta: metav1.ObjectMeta{Name: "887e88cc-16ca-428f-9637-fde599b21b26"}, + Status: directv1beta5.DirectCSIDriveStatus{ + DriveStatus: directv1beta5.DriveStatusReady, + FilesystemUUID: "3fb25851-18aa-48f2-8972-5d07c48154e5", + }, + } + drive2 := &directv1beta5.DirectCSIDrive{ + TypeMeta: legacyclient.DirectCSIDriveTypeMeta(), + ObjectMeta: metav1.ObjectMeta{Name: "7d552939-1daf-4887-8879-777c503cb1d7"}, + Status: directv1beta5.DirectCSIDriveStatus{ + DriveStatus: directv1beta5.DriveStatusInUse, + FilesystemUUID: "3fb25851-18aa-48f2-8972-5d07c48154e5", + }, + } + legacyclient.SetDriveClient(legacyclientsetfake.NewSimpleClientset(drive1, drive2)) + driveMap, legacyDriveErrors, driveErrors, err = migrateDrives(context.TODO(), false) + if len(driveMap) == 0 && len(legacyDriveErrors) == 0 && len(driveErrors) == 0 && err == nil { + t.Fatalf("expected error, but succeeded\n") + } +} + +func TestMigrateNoDrives(t *testing.T) { + legacyclient.SetDriveClient(legacyclientsetfake.NewSimpleClientset()) + clientset := types.NewExtFakeClientset(clientsetfake.NewSimpleClientset()) + client.SetDriveInterface(clientset.DirectpvLatest().DirectPVDrives()) + + _, legacyDriveErrors, driveErrors, err := migrateDrives(context.TODO(), false) + if len(legacyDriveErrors) != 0 || len(driveErrors) != 0 || err != nil { + t.Fatalf("unexpected error; %v\n", err) + } + driveList, err := client.DriveClient().List(context.TODO(), metav1.ListOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + t.Fatalf("unexpected error; %v\n", err) + } + if driveList != nil && len(driveList.Items) != 0 { + t.Fatalf("expected: ; got: %v\n", len(driveList.Items)) + } + + drive1 := &directv1beta5.DirectCSIDrive{ + TypeMeta: legacyclient.DirectCSIDriveTypeMeta(), + ObjectMeta: metav1.ObjectMeta{Name: "887e88cc-16ca-428f-9637-fde599b21b26"}, + Status: directv1beta5.DirectCSIDriveStatus{DriveStatus: directv1beta5.DriveStatusAvailable}, + } + drive2 := &directv1beta5.DirectCSIDrive{ + TypeMeta: legacyclient.DirectCSIDriveTypeMeta(), + ObjectMeta: metav1.ObjectMeta{Name: "7d552939-1daf-4887-8879-777c503cb1d7"}, + Status: directv1beta5.DirectCSIDriveStatus{DriveStatus: directv1beta5.DriveStatusTerminating}, + } + legacyclient.SetDriveClient(legacyclientsetfake.NewSimpleClientset(drive1, drive2)) + clientset = types.NewExtFakeClientset(clientsetfake.NewSimpleClientset()) + client.SetDriveInterface(clientset.DirectpvLatest().DirectPVDrives()) + + _, legacyDriveErrors, driveErrors, err = migrateDrives(context.TODO(), false) + if len(legacyDriveErrors) != 0 || len(driveErrors) != 0 || err != nil { + t.Fatalf("unexpected error; %v\n", err) + } + driveList, err = client.DriveClient().List(context.TODO(), metav1.ListOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + t.Fatalf("unexpected error; %v\n", err) + } + if driveList != nil && len(driveList.Items) != 0 { + t.Fatalf("expected: ; got: %v\n", len(driveList.Items)) + } +} + +func TestMigrateReadyDrive(t *testing.T) { + drive := &directv1beta5.DirectCSIDrive{ + TypeMeta: legacyclient.DirectCSIDriveTypeMeta(), + ObjectMeta: metav1.ObjectMeta{ + Name: "2bf15006-a710-4c5f-8678-e3c996baaf2f", + Finalizers: []string{"direct.csi.min.io/data-protection"}, + Labels: map[string]string{ + "direct.csi.min.io/access-tier": "Unknown", + "direct.csi.min.io/created-by": "directcsi-driver", + "direct.csi.min.io/node": "c7", + "direct.csi.min.io/path": "vdb", + "direct.csi.min.io/version": "v1beta5", + }, + }, + Status: directv1beta5.DirectCSIDriveStatus{ + AccessTier: directv1beta5.AccessTierUnknown, + AllocatedCapacity: 3616768, + Conditions: []metav1.Condition{ + { + LastTransitionTime: metav1.Now(), + Reason: string(directv1beta5.DirectCSIDriveReasonAdded), + Status: metav1.ConditionTrue, + Type: string(directv1beta5.DirectCSIDriveConditionOwned), + }, + { + LastTransitionTime: metav1.Now(), + Reason: string(directv1beta5.DirectCSIDriveReasonAdded), + Status: metav1.ConditionTrue, + Type: string(directv1beta5.DirectCSIDriveConditionMounted), + }, + { + LastTransitionTime: metav1.Now(), + Reason: string(directv1beta5.DirectCSIDriveReasonAdded), + Status: metav1.ConditionTrue, + Type: string(directv1beta5.DirectCSIDriveConditionFormatted), + }, + { + LastTransitionTime: metav1.Now(), + Reason: string(directv1beta5.DirectCSIDriveReasonInitialized), + Status: metav1.ConditionTrue, + Type: string(directv1beta5.DirectCSIDriveConditionInitialized), + }, + { + LastTransitionTime: metav1.Now(), + Reason: string(directv1beta5.DirectCSIDriveReasonReady), + Status: metav1.ConditionTrue, + Type: string(directv1beta5.DirectCSIDriveConditionReady), + }, + }, + DriveStatus: directv1beta5.DriveStatusReady, + Filesystem: "xfs", + FilesystemUUID: "08450612-7ab3-40f9-ab83-38645fba6d29", + FreeCapacity: 533254144, + MajorNumber: 253, + MinorNumber: 16, + MountOptions: []string{"noatime", "rw"}, + Mountpoint: "/var/lib/direct-csi/mnt/08450612-7ab3-40f9-ab83-38645fba6d29", + NodeName: "c7", + Path: "/dev/vdb", + PCIPath: "pci-0000:08:00.0", + RootPartition: "vdb", + Topology: map[string]string{ + "direct.csi.min.io/identity": "direct-csi-min-io", + "direct.csi.min.io/node": "c7", + "direct.csi.min.io/rack": "default", + "direct.csi.min.io/region": "default", + "direct.csi.min.io/zone": "default", + }, + TotalCapacity: 536870912, + UeventFSUUID: "08450612-7ab3-40f9-ab83-38645fba6d29", + }, + } + + legacyclient.SetDriveClient(legacyclientsetfake.NewSimpleClientset(drive)) + clientset := types.NewExtFakeClientset(clientsetfake.NewSimpleClientset()) + client.SetDriveInterface(clientset.DirectpvLatest().DirectPVDrives()) + + driveMap, legacyDriveErrors, driveErrors, err := migrateDrives(context.TODO(), false) + if len(legacyDriveErrors) != 0 || len(driveErrors) != 0 || err != nil { + t.Fatalf("unexpected error; %v, %v, %v\n", legacyDriveErrors, driveErrors, err) + } + if len(driveMap) == 0 { + t.Fatalf("empty drive map\n") + } + + result, err := client.DriveClient().Get(context.TODO(), "08450612-7ab3-40f9-ab83-38645fba6d29", metav1.GetOptions{}) + if err != nil { + t.Fatalf("unexpected error; %v\n", err) + } + + expectedResult := types.NewDrive( + directpvtypes.DriveID("08450612-7ab3-40f9-ab83-38645fba6d29"), + types.DriveStatus{ + TotalCapacity: 536870912, + AllocatedCapacity: 3616768, + FreeCapacity: 533254144, + FSUUID: "08450612-7ab3-40f9-ab83-38645fba6d29", + Status: directpvtypes.DriveStatusReady, + Topology: map[string]string{ + "directpv.min.io/identity": "directpv-min-io", + "directpv.min.io/node": "c7", + "directpv.min.io/rack": "default", + "directpv.min.io/region": "default", + "directpv.min.io/zone": "default", + }, + }, + directpvtypes.NodeID("c7"), + directpvtypes.DriveName("vdb"), + directpvtypes.AccessTierDefault, + ) + expectedResult.SetMigratedLabel() + expectedResult.Spec.Relabel = true + + if !reflect.DeepEqual(result, expectedResult) { + t.Fatalf("expected: %#+v; got: %#+v\n", expectedResult, result) + } +} + +func TestMigrateInUseDrive(t *testing.T) { + drive := &directv1beta5.DirectCSIDrive{ + TypeMeta: legacyclient.DirectCSIDriveTypeMeta(), + ObjectMeta: metav1.ObjectMeta{ + Name: "08450612-7ab3-40f9-ab83-38645fba6d29", + Finalizers: []string{ + "direct.csi.min.io/data-protection", + "direct.csi.min.io.volume/pvc-c60680cc-c698-4dae-9f11-67611aeb563f", + "direct.csi.min.io.volume/pvc-bfcbb9a7-1781-4c05-8da1-4d087688a213", + "direct.csi.min.io.volume/pvc-4cf566ce-03cc-442a-b1ec-eb48897e3453", + "direct.csi.min.io.volume/pvc-7b6b5cd2-f9b4-4958-b8db-e071b2d1c5a1", + "direct.csi.min.io.volume/pvc-009bfc49-4a66-4055-9f19-bd039cc3b4f5", + "direct.csi.min.io.volume/pvc-1bace7a4-c575-429f-9e76-dcd3b14255c8", + "direct.csi.min.io.volume/pvc-bff0997f-b442-4a19-89cb-eb43c132c207", + "direct.csi.min.io.volume/pvc-aac3b633-9265-4288-9c97-4de0de36a546", + "direct.csi.min.io.volume/pvc-d745d7fa-2b64-4dfb-aece-4b664f4db939", + "direct.csi.min.io.volume/pvc-1b098369-faad-453b-80fe-d820e0f2da88", + "direct.csi.min.io.volume/pvc-8f074641-da31-4867-8d66-c6f65cfd64c9", + "direct.csi.min.io.volume/pvc-f1d4cdc5-0855-4c82-92f7-b62f90ac018e", + "direct.csi.min.io.volume/pvc-6cf26e90-2b01-421d-9986-e97b2a30bd81", + "direct.csi.min.io.volume/pvc-a7b6013e-9eb6-41f8-8d56-a94451f95587", + "direct.csi.min.io.volume/pvc-10f64e68-5f02-4939-a941-7a6f24ad7dc5", + "direct.csi.min.io.volume/pvc-6123a87f-8f4e-4e4f-991a-fdd23aadf158", + }, + Labels: map[string]string{ + "direct.csi.min.io/access-tier": "Unknown", + "direct.csi.min.io/created-by": "directcsi-driver", + "direct.csi.min.io/node": "c7", + "direct.csi.min.io/path": "vdb", + "direct.csi.min.io/version": "v1beta5", + }, + }, + Status: directv1beta5.DirectCSIDriveStatus{ + AccessTier: directv1beta5.AccessTierUnknown, + AllocatedCapacity: 272052224, + Conditions: []metav1.Condition{ + { + LastTransitionTime: metav1.Now(), + Reason: string(directv1beta5.DirectCSIDriveReasonAdded), + Status: metav1.ConditionTrue, + Type: string(directv1beta5.DirectCSIDriveConditionOwned), + }, + { + LastTransitionTime: metav1.Now(), + Reason: string(directv1beta5.DirectCSIDriveReasonAdded), + Status: metav1.ConditionTrue, + Type: string(directv1beta5.DirectCSIDriveConditionMounted), + }, + { + LastTransitionTime: metav1.Now(), + Reason: string(directv1beta5.DirectCSIDriveReasonAdded), + Status: metav1.ConditionTrue, + Type: string(directv1beta5.DirectCSIDriveConditionFormatted), + }, + { + LastTransitionTime: metav1.Now(), + Reason: string(directv1beta5.DirectCSIDriveReasonInitialized), + Status: metav1.ConditionTrue, + Type: string(directv1beta5.DirectCSIDriveConditionInitialized), + }, + { + LastTransitionTime: metav1.Now(), + Reason: string(directv1beta5.DirectCSIDriveReasonReady), + Status: metav1.ConditionTrue, + Type: string(directv1beta5.DirectCSIDriveConditionReady), + }, + }, + DriveStatus: directv1beta5.DriveStatusInUse, + Filesystem: "xfs", + FilesystemUUID: "08450612-7ab3-40f9-ab83-38645fba6d29", + FreeCapacity: 264818688, + MajorNumber: 253, + MinorNumber: 16, + MountOptions: []string{"noatime", "rw"}, + Mountpoint: "/var/lib/direct-csi/mnt/08450612-7ab3-40f9-ab83-38645fba6d29", + NodeName: "c7", + Path: "/dev/vdb", + PCIPath: "pci-0000:08:00.0", + RootPartition: "vdb", + Topology: map[string]string{ + "direct.csi.min.io/identity": "direct-csi-min-io", + "direct.csi.min.io/node": "c7", + "direct.csi.min.io/rack": "default", + "direct.csi.min.io/region": "default", + "direct.csi.min.io/zone": "default", + }, + TotalCapacity: 536870912, + UeventFSUUID: "08450612-7ab3-40f9-ab83-38645fba6d29", + }, + } + + legacyclient.SetDriveClient(legacyclientsetfake.NewSimpleClientset(drive)) + clientset := types.NewExtFakeClientset(clientsetfake.NewSimpleClientset()) + client.SetDriveInterface(clientset.DirectpvLatest().DirectPVDrives()) + + driveMap, legacyDriveErrors, driveErrors, err := migrateDrives(context.TODO(), false) + if len(legacyDriveErrors) != 0 || len(driveErrors) != 0 || err != nil { + t.Fatalf("unexpected error; %v, %v, %v\n", legacyDriveErrors, driveErrors, err) + } + if len(driveMap) == 0 { + t.Fatalf("empty drive map\n") + } + + result, err := client.DriveClient().Get(context.TODO(), "08450612-7ab3-40f9-ab83-38645fba6d29", metav1.GetOptions{}) + if err != nil { + t.Fatalf("unexpected error; %v\n", err) + } + + expectedResult := types.NewDrive( + directpvtypes.DriveID("08450612-7ab3-40f9-ab83-38645fba6d29"), + types.DriveStatus{ + TotalCapacity: 536870912, + AllocatedCapacity: 272052224, + FreeCapacity: 264818688, + FSUUID: "08450612-7ab3-40f9-ab83-38645fba6d29", + Status: directpvtypes.DriveStatusReady, + Topology: map[string]string{ + "directpv.min.io/identity": "directpv-min-io", + "directpv.min.io/node": "c7", + "directpv.min.io/rack": "default", + "directpv.min.io/region": "default", + "directpv.min.io/zone": "default", + }, + }, + directpvtypes.NodeID("c7"), + directpvtypes.DriveName("vdb"), + directpvtypes.AccessTierDefault, + ) + expectedResult.SetMigratedLabel() + expectedResult.Spec.Relabel = true + expectedResult.AddVolumeFinalizer("pvc-c60680cc-c698-4dae-9f11-67611aeb563f") + expectedResult.AddVolumeFinalizer("pvc-bfcbb9a7-1781-4c05-8da1-4d087688a213") + expectedResult.AddVolumeFinalizer("pvc-4cf566ce-03cc-442a-b1ec-eb48897e3453") + expectedResult.AddVolumeFinalizer("pvc-7b6b5cd2-f9b4-4958-b8db-e071b2d1c5a1") + expectedResult.AddVolumeFinalizer("pvc-009bfc49-4a66-4055-9f19-bd039cc3b4f5") + expectedResult.AddVolumeFinalizer("pvc-1bace7a4-c575-429f-9e76-dcd3b14255c8") + expectedResult.AddVolumeFinalizer("pvc-bff0997f-b442-4a19-89cb-eb43c132c207") + expectedResult.AddVolumeFinalizer("pvc-aac3b633-9265-4288-9c97-4de0de36a546") + expectedResult.AddVolumeFinalizer("pvc-d745d7fa-2b64-4dfb-aece-4b664f4db939") + expectedResult.AddVolumeFinalizer("pvc-1b098369-faad-453b-80fe-d820e0f2da88") + expectedResult.AddVolumeFinalizer("pvc-8f074641-da31-4867-8d66-c6f65cfd64c9") + expectedResult.AddVolumeFinalizer("pvc-f1d4cdc5-0855-4c82-92f7-b62f90ac018e") + expectedResult.AddVolumeFinalizer("pvc-6cf26e90-2b01-421d-9986-e97b2a30bd81") + expectedResult.AddVolumeFinalizer("pvc-a7b6013e-9eb6-41f8-8d56-a94451f95587") + expectedResult.AddVolumeFinalizer("pvc-10f64e68-5f02-4939-a941-7a6f24ad7dc5") + expectedResult.AddVolumeFinalizer("pvc-6123a87f-8f4e-4e4f-991a-fdd23aadf158") + + if !reflect.DeepEqual(result, expectedResult) { + t.Fatalf("expected: %#+v; got: %#+v\n", expectedResult, result) + } +} + +func TestMigrateVolumes(t *testing.T) { + volume := &directv1beta5.DirectCSIVolume{ + TypeMeta: legacyclient.DirectCSIVolumeTypeMeta(), + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc-009bfc49-4a66-4055-9f19-bd039cc3b4f5", + Finalizers: []string{"direct.csi.min.io/pv-protection", "direct.csi.min.io/purge-protection"}, + Labels: map[string]string{ + "direct.csi.min.io/app": "minio-example", + "direct.csi.min.io/created-by": "directcsi-controller", + "direct.csi.min.io/drive": "08450612-7ab3-40f9-ab83-38645fba6d29", + "direct.csi.min.io/drive-path": "vdb", + "direct.csi.min.io/node": "c7", + "direct.csi.min.io/organization": "minio", + "direct.csi.min.io/pod.name": "minio-1", + "direct.csi.min.io/pod.namespace": "default", + "direct.csi.min.io/tenant": "tenant-1", + "direct.csi.min.io/version": "v1beta5", + }, + }, + Status: directv1beta5.DirectCSIVolumeStatus{ + AvailableCapacity: 16777216, + Conditions: []metav1.Condition{ + { + LastTransitionTime: metav1.Now(), + Reason: string(directv1beta5.DirectCSIVolumeReasonInUse), + Status: metav1.ConditionTrue, + Type: string(directv1beta5.DirectCSIVolumeConditionStaged), + }, + { + LastTransitionTime: metav1.Now(), + Reason: string(directv1beta5.DirectCSIVolumeReasonInUse), + Status: metav1.ConditionTrue, + Type: string(directv1beta5.DirectCSIVolumeConditionPublished), + }, + { + LastTransitionTime: metav1.Now(), + Reason: string(directv1beta5.DirectCSIVolumeReasonReady), + Status: metav1.ConditionTrue, + Type: string(directv1beta5.DirectCSIVolumeConditionReady), + }, + }, + ContainerPath: "/var/lib/kubelet/pods/52a3bbb9-30bd-429d-85b1-f1ada882e0ce/volumes/kubernetes.io~csi/pvc-009bfc49-4a66-4055-9f19-bd039cc3b4f5/mount", + Drive: "08450612-7ab3-40f9-ab83-38645fba6d29", + HostPath: "/var/lib/direct-csi/mnt/08450612-7ab3-40f9-ab83-38645fba6d29/pvc-009bfc49-4a66-4055-9f19-bd039cc3b4f5", + NodeName: "c7", + StagingPath: "/var/lib/kubelet/plugins/kubernetes.io/csi/pv/pvc-009bfc49-4a66-4055-9f19-bd039cc3b4f5/globalmount", + TotalCapacity: 16777216, + }, + } + + legacyclient.SetVolumeClient(legacyclientsetfake.NewSimpleClientset(volume)) + clientset := types.NewExtFakeClientset(clientsetfake.NewSimpleClientset()) + client.SetVolumeInterface(clientset.DirectpvLatest().DirectPVVolumes()) + + legacyVolumeErrors, volumeErrors, err := migrateVolumes( + context.TODO(), + map[string]string{"08450612-7ab3-40f9-ab83-38645fba6d29": "a9908089-96dd-4e8b-8f06-0c0b5e391f39"}, + false, + ) + if len(legacyVolumeErrors) != 0 || len(volumeErrors) != 0 || err != nil { + t.Fatalf("unexpected error; %v, %v, %v\n", legacyVolumeErrors, volumeErrors, err) + } + + result, err := client.VolumeClient().Get(context.TODO(), "pvc-009bfc49-4a66-4055-9f19-bd039cc3b4f5", metav1.GetOptions{}) + if err != nil { + t.Fatalf("unexpected error; %v\n", err) + } + + expectedResult := types.NewVolume( + "pvc-009bfc49-4a66-4055-9f19-bd039cc3b4f5", + "a9908089-96dd-4e8b-8f06-0c0b5e391f39", + directpvtypes.NodeID("c7"), + directpvtypes.DriveID("a9908089-96dd-4e8b-8f06-0c0b5e391f39"), + directpvtypes.DriveName("vdb"), + 16777216, + ) + expectedResult.SetMigratedLabel() + expectedResult.Status.DataPath = "/var/lib/direct-csi/mnt/08450612-7ab3-40f9-ab83-38645fba6d29/pvc-009bfc49-4a66-4055-9f19-bd039cc3b4f5" + expectedResult.Status.StagingTargetPath = "/var/lib/kubelet/plugins/kubernetes.io/csi/pv/pvc-009bfc49-4a66-4055-9f19-bd039cc3b4f5/globalmount" + expectedResult.Status.TargetPath = "/var/lib/kubelet/pods/52a3bbb9-30bd-429d-85b1-f1ada882e0ce/volumes/kubernetes.io~csi/pvc-009bfc49-4a66-4055-9f19-bd039cc3b4f5/mount" + expectedResult.SetPodName("minio-1") + expectedResult.SetPodNS("default") + expectedResult.Labels["directpv.min.io/app"] = "minio-example" + expectedResult.Labels["directpv.min.io/organization"] = "minio" + expectedResult.Labels["directpv.min.io/tenant"] = "tenant-1" + expectedResult.Status.Status = directpvtypes.VolumeStatusReady + + if !reflect.DeepEqual(result, expectedResult) { + t.Fatalf("expected: %#+v; got: %#+v\n", expectedResult, result) + } + + // no fsuuid found error + legacyclient.SetVolumeClient(legacyclientsetfake.NewSimpleClientset(volume)) + legacyVolumeErrors, volumeErrors, err = migrateVolumes(context.TODO(), map[string]string{}, false) + if len(legacyVolumeErrors) == 0 && len(volumeErrors) == 0 && err == nil { + t.Fatalf("expected error; but succeeded\n") + } +} diff --git a/pkg/installer/storageclass.go b/pkg/installer/storageclass.go index bb6f4d238..479e47de7 100644 --- a/pkg/installer/storageclass.go +++ b/pkg/installer/storageclass.go @@ -25,44 +25,34 @@ import ( directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types" "github.com/minio/directpv/pkg/consts" "github.com/minio/directpv/pkg/k8s" + legacyclient "github.com/minio/directpv/pkg/legacy/client" corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" storagev1beta1 "k8s.io/api/storage/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" ) var errStorageClassVersionUnsupported = errors.New("unsupported StorageClass version found") -func createStorageClass(ctx context.Context, args *Args) error { - var gvk *schema.GroupVersionKind - if args.DryRun { - if args.KubeVersion.Major() >= 1 && args.KubeVersion.Minor() < 16 { - gvk = &schema.GroupVersionKind{Version: "v1beta1"} - } else { - gvk = &schema.GroupVersionKind{Version: "v1"} - } - } else { - var err error - if gvk, err = k8s.GetGroupVersionKind("storage.k8s.io", "CSIDriver", "v1", "v1beta1"); err != nil { - return err - } +func doCreateStorageClass(ctx context.Context, args *Args, version string, legacy bool) error { + name := consts.Identity + if legacy { + name = legacyclient.Identity } allowExpansion := false - name := consts.StorageClassName allowTopologiesWithName := corev1.TopologySelectorTerm{ MatchLabelExpressions: []corev1.TopologySelectorLabelRequirement{ { Key: string(directpvtypes.TopologyDriverIdentity), - Values: []string{string(directpvtypes.ToLabelValue(consts.Identity))}, + Values: []string{consts.Identity}, }, }, } retainPolicy := corev1.PersistentVolumeReclaimDelete - switch gvk.Version { + switch version { case "v1": bindingMode := storagev1.VolumeBindingWaitForFirstConsumer storageClass := &storagev1.StorageClass{ @@ -146,20 +136,41 @@ func createStorageClass(ctx context.Context, args *Args) error { } } -func deleteStorageClass(ctx context.Context) error { - gvk, err := k8s.GetGroupVersionKind("storage.k8s.io", "CSIDriver", "v1", "v1beta1") - if err != nil { +func createStorageClass(ctx context.Context, args *Args) error { + version := "v1" + switch { + case args.DryRun: + if args.KubeVersion.Major() >= 1 && args.KubeVersion.Minor() < 16 { + version = "v1beta1" + } + default: + gvk, err := k8s.GetGroupVersionKind("storage.k8s.io", "CSIDriver", "v1", "v1beta1") + if err != nil { + return err + } + version = gvk.Version + } + + if err := doCreateStorageClass(ctx, args, version, false); err != nil { return err } - switch gvk.Version { + if args.Legacy { + return doCreateStorageClass(ctx, args, version, true) + } + + return nil +} + +func doDeleteStorageClass(ctx context.Context, version, name string) (err error) { + switch version { case "v1": err = k8s.KubeClient().StorageV1().StorageClasses().Delete( - ctx, consts.StorageClassName, metav1.DeleteOptions{}, + ctx, name, metav1.DeleteOptions{}, ) case "v1beta1": err = k8s.KubeClient().StorageV1beta1().StorageClasses().Delete( - ctx, consts.StorageClassName, metav1.DeleteOptions{}, + ctx, name, metav1.DeleteOptions{}, ) default: return errStorageClassVersionUnsupported @@ -171,3 +182,16 @@ func deleteStorageClass(ctx context.Context) error { return nil } + +func deleteStorageClass(ctx context.Context) error { + gvk, err := k8s.GetGroupVersionKind("storage.k8s.io", "CSIDriver", "v1", "v1beta1") + if err != nil { + return err + } + + if err = doDeleteStorageClass(ctx, gvk.Version, consts.StorageClassName); err != nil { + return err + } + + return doDeleteStorageClass(ctx, gvk.Version, legacyclient.Identity) +} diff --git a/pkg/k8s/k8s.go b/pkg/k8s/k8s.go index a162c5f96..b33d360b2 100644 --- a/pkg/k8s/k8s.go +++ b/pkg/k8s/k8s.go @@ -121,16 +121,6 @@ func IsCondition(conditions []metav1.Condition, ctype string, status metav1.Cond return false } -// IsConditionStatus checks whether type/status in conditions or not. -func IsConditionStatus(conditions []metav1.Condition, ctype string, status metav1.ConditionStatus) bool { - for i := range conditions { - if conditions[i].Type == ctype && conditions[i].Status == status { - return true - } - } - return false -} - // UpdateCondition updates type/status/reason/message of conditions matched by condition type. func UpdateCondition(conditions []metav1.Condition, ctype string, status metav1.ConditionStatus, reason, message string) { for i := range conditions { diff --git a/pkg/legacy/client/client.go b/pkg/legacy/client/client.go index ebce2a5f8..4eb52ece1 100644 --- a/pkg/legacy/client/client.go +++ b/pkg/legacy/client/client.go @@ -19,12 +19,25 @@ package client import ( "github.com/minio/directpv/pkg/k8s" directcsi "github.com/minio/directpv/pkg/legacy/apis/direct.csi.min.io/v1beta5" + typeddirectcsi "github.com/minio/directpv/pkg/legacy/clientset/typed/direct.csi.min.io/v1beta5" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/restmapper" "k8s.io/klog/v2" ) +var ( + initialized int32 + driveClient typeddirectcsi.DirectCSIDriveInterface + volumeClient typeddirectcsi.DirectCSIVolumeInterface +) + +// DirectCSI group and identity names. +const ( + GroupName = "direct.csi.min.io" + Identity = "direct-csi-min-io" +) + // DirectCSIVersionLabelKey is the version with group and version ... const DirectCSIVersionLabelKey = directcsi.Group + "/" + directcsi.Version @@ -69,3 +82,13 @@ func GetGroupKindVersions(group, kind string, versions ...string) (*schema.Group } return gvk, nil } + +// DriveClient gets latest versioned drive interface. +func DriveClient() typeddirectcsi.DirectCSIDriveInterface { + return driveClient +} + +// VolumeClient gets latest versioned volume interface. +func VolumeClient() typeddirectcsi.DirectCSIVolumeInterface { + return volumeClient +} diff --git a/pkg/legacy/client/fake.go b/pkg/legacy/client/fake.go new file mode 100644 index 000000000..c2f89246b --- /dev/null +++ b/pkg/legacy/client/fake.go @@ -0,0 +1,43 @@ +// This file is part of MinIO DirectPV +// Copyright (c) 2022 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package client + +import ( + "github.com/minio/directpv/pkg/k8s" + legacyclientsetfake "github.com/minio/directpv/pkg/legacy/clientset/fake" +) + +// FakeInit initializes fake clients. +func FakeInit() { + k8s.FakeInit() + + fakeClientset := legacyclientsetfake.NewSimpleClientset() + driveClient = fakeClientset.DirectV1beta5().DirectCSIDrives() + volumeClient = fakeClientset.DirectV1beta5().DirectCSIVolumes() +} + +// SetDriveClient sets drive interface from fake clientset. +// Note: To be used for writing test cases only +func SetDriveClient(clientset *legacyclientsetfake.Clientset) { + driveClient = clientset.DirectV1beta5().DirectCSIDrives() +} + +// SetVolumeClient sets volume interface from fake clientset. +// Note: To be used for writing test cases only +func SetVolumeClient(clientset *legacyclientsetfake.Clientset) { + volumeClient = clientset.DirectV1beta5().DirectCSIVolumes() +} diff --git a/pkg/legacy/client/init.go b/pkg/legacy/client/init.go new file mode 100644 index 000000000..acee6ef12 --- /dev/null +++ b/pkg/legacy/client/init.go @@ -0,0 +1,42 @@ +// This file is part of MinIO DirectPV +// Copyright (c) 2022 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package client + +import ( + "sync/atomic" + + "github.com/minio/directpv/pkg/k8s" + "k8s.io/klog/v2" +) + +// Init initializes legacy clients. +func Init() { + if atomic.AddInt32(&initialized, 1) != 1 { + return + } + + k8s.Init() + + var err error + if driveClient, err = DirectCSIDriveInterfaceForConfig(k8s.KubeConfig()); err != nil { + klog.Fatalf("unable to create new DirectCSI drive interface; %v", err) + } + + if volumeClient, err = DirectCSIVolumeInterfaceForConfig(k8s.KubeConfig()); err != nil { + klog.Fatalf("unable to create new DirectCSI volume interface; %v", err) + } +} diff --git a/pkg/legacy/client/interface.go b/pkg/legacy/client/interface.go index 3c7bc0c65..34fae095f 100644 --- a/pkg/legacy/client/interface.go +++ b/pkg/legacy/client/interface.go @@ -41,6 +41,34 @@ import ( "k8s.io/klog/v2" ) +// GetGroupVersion probes group and version of given resource kind. +func GetGroupVersion(kind string) (version, group string, err error) { + gvk, err := GetGroupKindVersions( + directcsi.Group, + kind, + directcsi.Version, + directv1beta4.Version, + directv1beta3.Version, + directv1beta2.Version, + directv1beta1.Version, + directv1alpha1.Version, + ) + if err != nil && !meta.IsNoMatchError(err) { + return "", "", err + } + + version = directcsi.Version + if gvk != nil { + version = gvk.Version + } + group = directcsi.Group + if gvk != nil { + group = gvk.Group + } + + return version, group, nil +} + // GetLatestDirectCSIRESTClient gets REST client of the latest direct-csi. func GetLatestDirectCSIRESTClient() rest.Interface { directClientset, err := clientset.NewForConfig(k8s.KubeConfig()) @@ -73,31 +101,16 @@ type directCSIInterface struct { } func directCSIInterfaceForConfig(config *rest.Config, kind, resource string) (*directCSIInterface, error) { - gvk, err := GetGroupKindVersions( - directcsi.Group, - kind, - directcsi.Version, - directv1beta4.Version, - directv1beta3.Version, - directv1beta2.Version, - directv1beta1.Version, - directv1alpha1.Version, - ) - if err != nil && !meta.IsNoMatchError(err) { + version, group, err := GetGroupVersion(kind) + if err != nil { return nil, err } + resourceInterface, err := dynamic.NewForConfig(config) if err != nil { return nil, err } - version := directcsi.Version - if gvk != nil { - version = gvk.Version - } - group := directcsi.Group - if gvk != nil { - group = gvk.Group - } + return &directCSIInterface{ resourceInterface: resourceInterface.Resource( schema.GroupVersionResource{ diff --git a/pkg/legacy/client/list.go b/pkg/legacy/client/list.go new file mode 100644 index 000000000..c709deb4f --- /dev/null +++ b/pkg/legacy/client/list.go @@ -0,0 +1,124 @@ +// This file is part of MinIO DirectPV +// Copyright (c) 2022 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package client + +import ( + "context" + + directv1beta5 "github.com/minio/directpv/pkg/legacy/apis/direct.csi.min.io/v1beta5" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// ListDriveResult denotes list of drive result. +type ListDriveResult struct { + Drive directv1beta5.DirectCSIDrive + Err error +} + +// ListDrives returns channel to loop through drive items. +func ListDrives(ctx context.Context) <-chan ListDriveResult { + resultCh := make(chan ListDriveResult) + go func() { + defer close(resultCh) + + send := func(result ListDriveResult) bool { + select { + case <-ctx.Done(): + return false + case resultCh <- result: + return true + } + } + + options := metav1.ListOptions{Limit: 1000} + for { + result, err := DriveClient().List(ctx, options) + if err != nil { + if !apierrors.IsNotFound(err) { + send(ListDriveResult{Err: err}) + } + return + } + + for _, item := range result.Items { + switch item.Status.DriveStatus { + case directv1beta5.DriveStatusReady, directv1beta5.DriveStatusInUse: + if !send(ListDriveResult{Drive: item}) { + return + } + } + } + + if result.Continue == "" { + return + } + + options.Continue = result.Continue + } + }() + + return resultCh +} + +// ListVolumeResult denotes list of volume result. +type ListVolumeResult struct { + Volume directv1beta5.DirectCSIVolume + Err error +} + +// ListVolumes returns channel to loop through volume items. +func ListVolumes(ctx context.Context) <-chan ListVolumeResult { + resultCh := make(chan ListVolumeResult) + go func() { + defer close(resultCh) + + send := func(result ListVolumeResult) bool { + select { + case <-ctx.Done(): + return false + case resultCh <- result: + return true + } + } + + options := metav1.ListOptions{Limit: 1000} + for { + result, err := VolumeClient().List(ctx, options) + if err != nil { + if !apierrors.IsNotFound(err) { + send(ListVolumeResult{Err: err}) + } + return + } + + for _, item := range result.Items { + if !send(ListVolumeResult{Volume: item}) { + return + } + } + + if result.Continue == "" { + return + } + + options.Continue = result.Continue + } + }() + + return resultCh +} diff --git a/pkg/node/legacy_server.go b/pkg/node/legacy_server.go new file mode 100644 index 000000000..b4fc19cd4 --- /dev/null +++ b/pkg/node/legacy_server.go @@ -0,0 +1,54 @@ +// This file is part of MinIO DirectPV +// Copyright (c) 2022 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package node + +import ( + "context" + + "github.com/container-storage-interface/spec/lib/go/csi" + directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types" +) + +// LegacyServer denotes legacy node server. +type LegacyServer struct { + Server +} + +// NewLegacyServer creates legacy node server. +func NewLegacyServer(nodeID directpvtypes.NodeID, rack, zone, region string) *LegacyServer { + return &LegacyServer{Server: newServer("direct-csi-min-io", nodeID, rack, zone, region)} +} + +// NodeGetInfo gets node information. +// reference: https://github.com/container-storage-interface/spec/blob/master/spec.md#nodegetinfo +func (server *LegacyServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) { + topology := &csi.Topology{ + Segments: map[string]string{ + "direct.csi.min.io/identity": server.identity, + "direct.csi.min.io/rack": server.rack, + "direct.csi.min.io/region": server.region, + "direct.csi.min.io/zone": server.zone, + "direct.csi.min.io/node": string(server.nodeID), + }, + } + + return &csi.NodeGetInfoResponse{ + NodeId: string(server.nodeID), + MaxVolumesPerNode: 100, + AccessibleTopology: topology, + }, nil +} diff --git a/pkg/node/server.go b/pkg/node/server.go index 3a0b9ead3..9a5c051d4 100644 --- a/pkg/node/server.go +++ b/pkg/node/server.go @@ -50,12 +50,8 @@ type Server struct { mkdir func(path string) error } -// NewServer creates node server. -func NewServer(ctx context.Context, - identity string, nodeID directpvtypes.NodeID, rack, zone, region string, - metricsPort int, -) (*Server, error) { - nodeServer := &Server{ +func newServer(identity string, nodeID directpvtypes.NodeID, rack, zone, region string) Server { + return Server{ nodeID: nodeID, identity: identity, rack: rack, @@ -75,10 +71,16 @@ func NewServer(ctx context.Context, return sys.Mkdir(dir, 0o755) }, } +} +// NewServer creates node server. +func NewServer(ctx context.Context, + identity string, nodeID directpvtypes.NodeID, rack, zone, region string, + metricsPort int, +) *Server { go metrics.ServeMetrics(ctx, nodeID, metricsPort) - - return nodeServer, nil + server := newServer(identity, nodeID, rack, zone, region) + return &server } // NodeGetInfo gets node information. diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 826964631..c8d2c954f 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -56,7 +56,8 @@ func WriteObject(writer io.Writer, obj interface{}) error { if _, err = writer.Write([]byte(y)); err != nil { return err } - return nil + _, err = writer.Write([]byte("---\n")) + return err } // SafeFile is used to write the yaml diff --git a/pkg/volume/list.go b/pkg/volume/list.go index 6e0b9f762..2ac5e5189 100644 --- a/pkg/volume/list.go +++ b/pkg/volume/list.go @@ -24,6 +24,7 @@ import ( "github.com/minio/directpv/pkg/k8s" "github.com/minio/directpv/pkg/types" "github.com/minio/directpv/pkg/utils" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -159,6 +160,10 @@ func (lister *Lister) List(ctx context.Context) <-chan ListVolumeResult { for { result, err := client.VolumeClient().List(ctx, options) if err != nil { + if apierrors.IsNotFound(err) && lister.ignoreNotFound { + break + } + send(ListVolumeResult{Err: err}) return } @@ -193,6 +198,10 @@ func (lister *Lister) List(ctx context.Context) <-chan ListVolumeResult { for _, volumeName := range lister.volumeNames { volume, err := client.VolumeClient().Get(ctx, volumeName, metav1.GetOptions{}) if err != nil { + if apierrors.IsNotFound(err) && lister.ignoreNotFound { + continue + } + send(ListVolumeResult{Err: err}) return }