Skip to content

Commit

Permalink
[CSI 3.0.3] Cross-port telemetry enhancements, CSI Migration bug fix …
Browse files Browse the repository at this point in the history
…related to AttachVolumes and also the nodes cache fix during attach/detach. (#2561)

* Determine cluster distribution server version & type using kubernetes API server version (#2404)

* Determine cluster distribution server version & type using kubernetes API server version

* Exit the container if we are not able to fetch the k8s client

* break the loop as soon as we find the distribution type

* skip adding node to cache during attach or detach operation (#2546)

* Return volume path as part of ListVolumes response for migrated volumes (#2558)

---------

Co-authored-by: Divyen Patel <divyenp@vmware.com>
Co-authored-by: Aditya Kulkarni <adkulkarni@vmware.com>
  • Loading branch information
3 people committed Sep 26, 2023
1 parent fd47ba2 commit 0878d14
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 9 deletions.
47 changes: 47 additions & 0 deletions cmd/syncer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ import (
"fmt"
"net/http"
"os"
"strings"
"time"

"github.com/kubernetes-csi/csi-lib-utils/leaderelection"
"github.com/prometheus/client_golang/prometheus/promhttp"
cnstypes "github.com/vmware/govmomi/cns/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/cns-lib/node"
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/config"
Expand Down Expand Up @@ -216,6 +219,50 @@ func initSyncerComponents(ctx context.Context, clusterFlavor cnstypes.CnsCluster
log.Errorf("failed to initialize nodeManager. Error: %+v", err)
os.Exit(1)
}
if configInfo.Cfg.Global.ClusterDistribution == "" {
config, err := rest.InClusterConfig()
if err != nil {
log.Errorf("failed to get InClusterConfig: %v", err)
os.Exit(1)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Errorf("failed to create kubernetes client with err: %v", err)
os.Exit(1)
}

// Get the version info for the Kubernetes API server
versionInfo, err := clientset.Discovery().ServerVersion()
if err != nil {
log.Errorf("failed to fetch versionInfo with err: %v", err)
os.Exit(1)
}

// Extract the version string from the version info
version := versionInfo.GitVersion
var ClusterDistNameToServerVersion = map[string]string{
"gke": "Anthos",
"racher": "Rancher",
"rke": "Rancher",
"docker": "DockerEE",
"dockeree": "DockerEE",
"openshift": "Openshift",
"wcp": "Supervisor",
"vmware": "TanzuKubernetesCluster",
"nativek8s": "VanillaK8S",
}
distributionUnknown := true
for distServerVersion, distName := range ClusterDistNameToServerVersion {
if strings.Contains(version, distServerVersion) {
configInfo.Cfg.Global.ClusterDistribution = distName
distributionUnknown = false
break
}
}
if distributionUnknown {
configInfo.Cfg.Global.ClusterDistribution = ClusterDistNameToServerVersion["nativek8s"]
}
}
}
go func() {
if err := manager.InitCnsOperator(ctx, clusterFlavor, configInfo, coInitParams); err != nil {
Expand Down
27 changes: 26 additions & 1 deletion pkg/apis/migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"

migrationconfig "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/migration/config"

migrationv1alpha1 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/migration/v1alpha1"
Expand Down Expand Up @@ -67,9 +68,13 @@ type VolumeMigrationService interface {
GetVolumeID(ctx context.Context, volumeSpec *VolumeSpec, registerIfNotFound bool) (string, error)

// GetVolumePath returns VolumePath for a given VolumeID.
// Returns an error if not able to retrieve VolumePath.
// It will also create a CnsVSphereVolumeMigration CR if it's not present.
GetVolumePath(ctx context.Context, volumeID string) (string, error)

// GetVolumePathFromMigrationServiceCache checks the in-memory cache for a volumeID
// a cache hit means that the volume is a migrated in-tree volume
GetVolumePathFromMigrationServiceCache(ctx context.Context, volumeID string) (string, error)

// DeleteVolumeInfo helps delete mapping of volumePath to VolumeID for
// specified volumeID.
DeleteVolumeInfo(ctx context.Context, volumeID string) error
Expand Down Expand Up @@ -362,6 +367,26 @@ func (volumeMigration *volumeMigration) GetVolumePath(ctx context.Context, volum
return fileBackingInfo.FilePath, nil
}

// GetVolumePathFromMigrationServiceCache checks the in-memory cache for a volumeID
// a cache hit means that the volume is a migrated in-tree volume
func (volumeMigration *volumeMigration) GetVolumePathFromMigrationServiceCache(ctx context.Context,
volumeID string) (string, error) {
log := logger.GetLogger(ctx)
var volumePath string
volumeMigration.volumePathToVolumeID.Range(func(key, value interface{}) bool {
if value.(string) == volumeID {
volumePath = key.(string)
log.Infof("Found VolumePath %v for VolumeID: %q in the cache", volumePath, volumeID)
return false
}
return true
})
if volumePath != "" {
return volumePath, nil
}
return "", common.ErrNotFound
}

// saveVolumeInfo helps create CR for given cnsVSphereVolumeMigration. This func
// also update local cache with supplied cnsVSphereVolumeMigration, after
// successful creation of CR
Expand Down
1 change: 0 additions & 1 deletion pkg/common/cns-lib/node/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ func (m *defaultManager) GetNodeByNameOrUUID(
log.Errorf("failed to get node UUID from node: %q. Err: %v", nodeNameOrUUID, err)
return nil, err
}
m.nodeNameToUUID.Store(nodeNameOrUUID, k8snodeUUID)
return m.GetNode(ctx, k8snodeUUID, nil)
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/common/unittestcommon/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,3 +294,8 @@ func (c *FakeK8SOrchestrator) GetCSINodeTopologyInstanceByName(nodeName string)
item interface{}, exists bool, err error) {
return nil, false, nil
}

// GetPVCNamespaceFromVolumeID retrieves the pv name from volumeID.
func (c *FakeK8SOrchestrator) GetPVNameFromCSIVolumeID(volumeID string) (string, bool) {
return "", false
}
3 changes: 3 additions & 0 deletions pkg/csi/service/common/commonco/coagnostic.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type COCommonInterface interface {
GetCSINodeTopologyInstancesList() []interface{}
// GetCSINodeTopologyInstanceByName fetches the CSINodeTopology instance for a given node name in the cluster.
GetCSINodeTopologyInstanceByName(nodeName string) (item interface{}, exists bool, err error)
// GetPVNameFromCSIVolumeID retrieves the pv name from the volumeID.
// This method will not return pv name in case of in-tree migrated volumes
GetPVNameFromCSIVolumeID(volumeID string) (string, bool)
}

// GetContainerOrchestratorInterface returns orchestrator object for a given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1544,3 +1544,8 @@ func (c *K8sOrchestrator) CreateConfigMap(ctx context.Context, name string, name

return nil
}

// GetPVNameFromCSIVolumeID retrieves the pv name from volumeID using volumeIDToNameMap.
func (c *K8sOrchestrator) GetPVNameFromCSIVolumeID(volumeID string) (string, bool) {
return c.volumeIDToNameMap.get(volumeID)
}
23 changes: 16 additions & 7 deletions pkg/csi/service/vanilla/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1804,7 +1804,7 @@ func (c *controller) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequ
createVolumeInternal := func() (
*csi.CreateVolumeResponse, string, error) {
log.Infof("CreateVolume: called with args %+v", *req)
//TODO: If the err is returned by invoking CNS API, then faultType should be
// TODO: If the err is returned by invoking CNS API, then faultType should be
// populated by the underlying layer.
// If the request failed due to validate the request, "csi.fault.InvalidArgument" will be return.
// If thr reqeust failed due to object not found, "csi.fault.NotFound" will be return.
Expand Down Expand Up @@ -1884,7 +1884,7 @@ func (c *controller) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequ
deleteVolumeInternal := func() (
*csi.DeleteVolumeResponse, string, error) {
log.Infof("DeleteVolume: called with args: %+v", *req)
//TODO: If the err is returned by invoking CNS API, then faultType should be
// TODO: If the err is returned by invoking CNS API, then faultType should be
// populated by the underlying layer.
// If the request failed due to validate the request, "csi.fault.InvalidArgument" will be return.
// If thr reqeust failed due to object not found, "csi.fault.NotFound" will be return.
Expand Down Expand Up @@ -2024,7 +2024,7 @@ func (c *controller) ControllerPublishVolume(ctx context.Context, req *csi.Contr
controllerPublishVolumeInternal := func() (
*csi.ControllerPublishVolumeResponse, string, error) {
log.Infof("ControllerPublishVolume: called with args %+v", *req)
//TODO: If the err is returned by invoking CNS API, then faultType should be
// TODO: If the err is returned by invoking CNS API, then faultType should be
// populated by the underlying layer.
// If the request failed due to validate the request, "csi.fault.InvalidArgument" will be return.
// If thr reqeust failed due to object not found, "csi.fault.NotFound" will be return.
Expand Down Expand Up @@ -2174,7 +2174,7 @@ func (c *controller) ControllerUnpublishVolume(ctx context.Context, req *csi.Con
*csi.ControllerUnpublishVolumeResponse, string, error) {
var faultType string
log.Infof("ControllerUnpublishVolume: called with args %+v", *req)
//TODO: If the err is returned by invoking CNS API, then faultType should be
// TODO: If the err is returned by invoking CNS API, then faultType should be
// populated by the underlying layer.
// If the request failed due to validate the request, "csi.fault.InvalidArgument" will be return.
// If thr reqeust failed due to object not found, "csi.fault.NotFound" will be return.
Expand Down Expand Up @@ -2488,6 +2488,7 @@ func (c *controller) ListVolumes(ctx context.Context, req *csi.ListVolumesReques
querySelection := cnstypes.CnsQuerySelection{
Names: []string{
string(cnstypes.QuerySelectionNameTypeVolumeType),
string(cnstypes.QuerySelectionNameTypeVolumeName),
},
}
// For multi-VC configuration, query volumes from all vCenters
Expand Down Expand Up @@ -2636,9 +2637,17 @@ func (c *controller) processQueryResultsListVolumes(ctx context.Context, startin
nodeVMUUID, found := volumeIDToNodeUUIDMap[blockVolID]
if found {
volCounter += 1
//Populate csi.Volume info for the given volume
volumeId := blockVolID
migratedVolumePath, err := volumeMigrationService.GetVolumePathFromMigrationServiceCache(ctx, blockVolID)
if err != nil && err == common.ErrNotFound {
log.Debugf("volumeID: %v not found in migration service in-memory cache "+
"so it's not a migrated in-tree volume", blockVolID)
} else if migratedVolumePath != "" {
volumeId = migratedVolumePath
}
// Populate csi.Volume info for the given volume
blockVolumeInfo := &csi.Volume{
VolumeId: blockVolID,
VolumeId: volumeId,
}
// Getting published nodes
volStatus := &csi.ListVolumesResponse_VolumeStatus{
Expand Down Expand Up @@ -3199,7 +3208,7 @@ func queryAllVolumeSnapshotsForMultiVC(ctx context.Context, c *controller, token
CNSSnapshotsForListSnapshots = snapQueryEntries
CNSVolumeDetailsMap = cnsVolumeDetailsMap
} else {
//fetch snapshots
// fetch snapshots
snapQueryEntries, volumeDetails, err := getSnapshotsAndSourceVolumeDetails(ctx, vCenterManager,
c.manager.VolumeManager, c.manager.VcenterConfig.Host)
if err != nil {
Expand Down

0 comments on commit 0878d14

Please sign in to comment.