Skip to content

Commit

Permalink
Merge pull request #2123 from MartinForReal/shafan/migrate
Browse files Browse the repository at this point in the history
Refactor: move flags definition to config functions
  • Loading branch information
andyzhangx committed Dec 26, 2023
2 parents f80e4c7 + 40ed357 commit ee47913
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 160 deletions.
53 changes: 14 additions & 39 deletions pkg/azuredisk/azuredisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,40 +52,6 @@ import (
azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
)

// DriverOptions defines driver parameters specified in driver deployment
type DriverOptions struct {
NodeID string
DriverName string
VolumeAttachLimit int64
EnablePerfOptimization bool
CloudConfigSecretName string
CloudConfigSecretNamespace string
CustomUserAgent string
UserAgentSuffix string
UseCSIProxyGAInterface bool
EnableDiskOnlineResize bool
AllowEmptyCloudConfig bool
EnableListVolumes bool
EnableListSnapshots bool
SupportZone bool
GetNodeInfoFromLabels bool
EnableDiskCapacityCheck bool
DisableUpdateCache bool
EnableTrafficManager bool
TrafficManagerPort int64
AttachDetachInitialDelayInMs int64
VMSSCacheTTLInSeconds int64
VMType string
EnableWindowsHostProcess bool
GetNodeIDFromIMDS bool
EnableOtelTracing bool
WaitForSnapshotReady bool
CheckDiskLUNCollision bool
Kubeconfig string
Endpoint string
DisableAVSetNodes bool
}

// CSIDriver defines the interface for a CSI driver.
type CSIDriver interface {
csi.ControllerServer
Expand Down Expand Up @@ -183,7 +149,10 @@ func newDriverV1(options *DriverOptions) *Driver {
driver.volumeLocks = volumehelper.NewVolumeLocks()
driver.ioHandler = azureutils.NewOSIOHandler()
driver.hostUtil = hostutil.NewHostUtil()

if driver.NodeID == "" {
// nodeid is not needed in controller component
klog.Warning("nodeid is empty")
}
topologyKey = fmt.Sprintf("topology.%s/zone", driver.Name)

cache, err := azcache.NewTimedCache(5*time.Minute, func(key string) (interface{}, error) {
Expand Down Expand Up @@ -294,14 +263,20 @@ func (d *Driver) Run(ctx context.Context) error {
csi.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
})
grpcInterceptor := grpc.UnaryInterceptor(csicommon.LogGRPC)
if d.enableOtelTracing {
grpcInterceptor = grpc.ChainUnaryInterceptor(csicommon.LogGRPC, otelgrpc.UnaryServerInterceptor())
}
opts := []grpc.ServerOption{
grpcInterceptor,
}

if d.enableOtelTracing {
exporter, err := InitOtelTracing()
if err != nil {
klog.Fatalf("Failed to initialize otel tracing: %v", err)
}
// Exporter will flush traces on shutdown
defer func() {
if err := exporter.Shutdown(context.Background()); err != nil {
klog.Errorf("Could not shutdown otel exporter: %v", err)
}
}()
opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler()))
}

Expand Down
100 changes: 100 additions & 0 deletions pkg/azuredisk/azuredisk_option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package azuredisk

import (
"flag"

consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
)

// DriverOptions defines driver parameters specified in driver deployment
type DriverOptions struct {
// Common options
NodeID string
DriverName string
VolumeAttachLimit int64
EnablePerfOptimization bool
CloudConfigSecretName string
CloudConfigSecretNamespace string
CustomUserAgent string
UserAgentSuffix string
UseCSIProxyGAInterface bool
EnableOtelTracing bool

//only used in v1
EnableDiskOnlineResize bool
AllowEmptyCloudConfig bool
EnableListVolumes bool
EnableListSnapshots bool
SupportZone bool
GetNodeInfoFromLabels bool
EnableDiskCapacityCheck bool
DisableUpdateCache bool
EnableTrafficManager bool
TrafficManagerPort int64
AttachDetachInitialDelayInMs int64
VMSSCacheTTLInSeconds int64
VMType string
EnableWindowsHostProcess bool
GetNodeIDFromIMDS bool
WaitForSnapshotReady bool
CheckDiskLUNCollision bool
Kubeconfig string
Endpoint string
DisableAVSetNodes bool
}

func (o *DriverOptions) AddFlags() *flag.FlagSet {
if o == nil {
return nil
}
fs := flag.NewFlagSet("", flag.ExitOnError)
fs.StringVar(&o.NodeID, "nodeid", "", "node id")
fs.StringVar(&o.DriverName, "drivername", consts.DefaultDriverName, "name of the driver")
fs.Int64Var(&o.VolumeAttachLimit, "volume-attach-limit", -1, "maximum number of attachable volumes per node")
fs.BoolVar(&o.EnablePerfOptimization, "enable-perf-optimization", false, "boolean flag to enable disk perf optimization")
fs.StringVar(&o.CloudConfigSecretName, "cloud-config-secret-name", "azure-cloud-provider", "cloud config secret name")
fs.StringVar(&o.CloudConfigSecretNamespace, "cloud-config-secret-namespace", "kube-system", "cloud config secret namespace")
fs.StringVar(&o.CustomUserAgent, "custom-user-agent", "", "custom userAgent")
fs.StringVar(&o.UserAgentSuffix, "user-agent-suffix", "", "userAgent suffix")
fs.BoolVar(&o.UseCSIProxyGAInterface, "use-csiproxy-ga-interface", true, "boolean flag to enable csi-proxy GA interface on Windows")
fs.BoolVar(&o.EnableOtelTracing, "enable-otel-tracing", false, "If set, enable opentelemetry tracing for the driver. The tracing is disabled by default. Configure the exporter endpoint with OTEL_EXPORTER_OTLP_ENDPOINT and other env variables, see https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#general-sdk-configuration.")
//only used in v1
fs.BoolVar(&o.EnableDiskOnlineResize, "enable-disk-online-resize", true, "boolean flag to enable disk online resize")
fs.BoolVar(&o.AllowEmptyCloudConfig, "allow-empty-cloud-config", true, "Whether allow running driver without cloud config")
fs.BoolVar(&o.EnableListVolumes, "enable-list-volumes", false, "boolean flag to enable ListVolumes on controller")
fs.BoolVar(&o.EnableListSnapshots, "enable-list-snapshots", false, "boolean flag to enable ListSnapshots on controller")
fs.BoolVar(&o.SupportZone, "support-zone", true, "boolean flag to get zone info in NodeGetInfo")
fs.BoolVar(&o.GetNodeInfoFromLabels, "get-node-info-from-labels", false, "boolean flag to get zone info from node labels in NodeGetInfo")
fs.BoolVar(&o.EnableDiskCapacityCheck, "enable-disk-capacity-check", false, "boolean flag to enable volume capacity check in CreateVolume")
fs.BoolVar(&o.DisableUpdateCache, "disable-update-cache", false, "boolean flag to disable update cache during disk attach/detach")
fs.BoolVar(&o.EnableTrafficManager, "enable-traffic-manager", false, "boolean flag to enable traffic manager")
fs.Int64Var(&o.TrafficManagerPort, "traffic-manager-port", 7788, "default traffic manager port")
fs.Int64Var(&o.AttachDetachInitialDelayInMs, "attach-detach-initial-delay-ms", 1000, "initial delay in milliseconds for batch disk attach/detach")
fs.Int64Var(&o.VMSSCacheTTLInSeconds, "vmss-cache-ttl-seconds", -1, "vmss cache TTL in seconds (600 by default)")
fs.StringVar(&o.VMType, "vm-type", "", "type of agent node. available values: vmss, standard")
fs.BoolVar(&o.EnableWindowsHostProcess, "enable-windows-host-process", false, "enable windows host process")
fs.BoolVar(&o.GetNodeIDFromIMDS, "get-nodeid-from-imds", false, "boolean flag to get NodeID from IMDS")
fs.BoolVar(&o.WaitForSnapshotReady, "wait-for-snapshot-ready", true, "boolean flag to wait for snapshot ready when creating snapshot in same region")
fs.BoolVar(&o.CheckDiskLUNCollision, "check-disk-lun-collision", false, "boolean flag to check disk lun collisio before attaching disk")
fs.StringVar(&o.Kubeconfig, "kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
fs.BoolVar(&o.DisableAVSetNodes, "disable-avset-nodes", false, "disable DisableAvailabilitySetNodes in cloud config for controller")
fs.StringVar(&o.Endpoint, "endpoint", "unix://tmp/csi.sock", "CSI endpoint")

return fs
}
37 changes: 37 additions & 0 deletions pkg/azuredisk/azuredisk_option_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package azuredisk

import (
"flag"
"reflect"
"testing"
)

func TestDriverOptions_AddFlags(t *testing.T) {
o := &DriverOptions{}
typeInfo := reflect.TypeOf(*o)

got := o.AddFlags()
count := 0
got.VisitAll(func(f *flag.Flag) {
count++
})
if count != typeInfo.NumField() {
t.Errorf("DriverOptions.AddFlags() = %v, want %v", count, typeInfo.NumField())
}
}
68 changes: 34 additions & 34 deletions pkg/azuredisk/azuredisk_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,67 +86,56 @@ func newDriverV2(options *DriverOptions) *DriverV2 {
driver.endpoint = options.Endpoint

topologyKey = fmt.Sprintf("topology.%s/zone", driver.Name)
return &driver
}

// Run driver initialization
func (d *DriverV2) Run(ctx context.Context) error {
versionMeta, err := GetVersionYAML(d.Name)
if err != nil {
klog.Fatalf("%v", err)
}
klog.Infof("\nDRIVER INFORMATION:\n-------------------\n%s\n\nStreaming logs below:", versionMeta)

userAgent := GetUserAgent(d.Name, d.customUserAgent, d.userAgentSuffix)
userAgent := GetUserAgent(driver.Name, driver.customUserAgent, driver.userAgentSuffix)
klog.V(2).Infof("driver userAgent: %s", userAgent)

cloud, err := azureutils.GetCloudProvider(context.Background(), d.kubeconfig, d.cloudConfigSecretName, d.cloudConfigSecretNamespace,
userAgent, d.allowEmptyCloudConfig, d.enableTrafficManager, d.trafficManagerPort)
cloud, err := azureutils.GetCloudProvider(context.Background(), driver.kubeconfig, driver.cloudConfigSecretName, driver.cloudConfigSecretNamespace,
userAgent, driver.allowEmptyCloudConfig, driver.enableTrafficManager, driver.trafficManagerPort)
if err != nil {
klog.Fatalf("failed to get Azure Cloud Provider, error: %v", err)
}
d.cloud = cloud
driver.cloud = cloud

if d.cloud != nil {
if d.vmType != "" {
klog.V(2).Infof("override VMType(%s) in cloud config as %s", d.cloud.VMType, d.vmType)
d.cloud.VMType = d.vmType
if driver.cloud != nil {
if driver.vmType != "" {
klog.V(2).Infof("override VMType(%s) in cloud config as %s", driver.cloud.VMType, driver.vmType)
driver.cloud.VMType = driver.vmType
}

if d.NodeID == "" {
if driver.NodeID == "" {
// Disable UseInstanceMetadata for controller to mitigate a timeout issue using IMDS
// https://github.com/kubernetes-sigs/azuredisk-csi-driver/issues/168
klog.V(2).Infof("disable UseInstanceMetadata for controller")
d.cloud.Config.UseInstanceMetadata = false
driver.cloud.Config.UseInstanceMetadata = false

if d.cloud.VMType == consts.VMTypeStandard && d.cloud.DisableAvailabilitySetNodes {
klog.V(2).Infof("set DisableAvailabilitySetNodes as false since VMType is %s", d.cloud.VMType)
d.cloud.DisableAvailabilitySetNodes = false
if driver.cloud.VMType == consts.VMTypeStandard && driver.cloud.DisableAvailabilitySetNodes {
klog.V(2).Infof("set DisableAvailabilitySetNodes as false since VMType is %s", driver.cloud.VMType)
driver.cloud.DisableAvailabilitySetNodes = false
}

if d.cloud.VMType == consts.VMTypeVMSS && !d.cloud.DisableAvailabilitySetNodes && d.disableAVSetNodes {
if driver.cloud.VMType == consts.VMTypeVMSS && !driver.cloud.DisableAvailabilitySetNodes && driver.disableAVSetNodes {
klog.V(2).Infof("DisableAvailabilitySetNodes for controller since current VMType is vmss")
d.cloud.DisableAvailabilitySetNodes = true
driver.cloud.DisableAvailabilitySetNodes = true
}
klog.V(2).Infof("cloud: %s, location: %s, rg: %s, VMType: %s, PrimaryScaleSetName: %s, PrimaryAvailabilitySetName: %s, DisableAvailabilitySetNodes: %v", d.cloud.Cloud, d.cloud.Location, d.cloud.ResourceGroup, d.cloud.VMType, d.cloud.PrimaryScaleSetName, d.cloud.PrimaryAvailabilitySetName, d.cloud.DisableAvailabilitySetNodes)
klog.V(2).Infof("cloud: %s, location: %s, rg: %s, VMType: %s, PrimaryScaleSetName: %s, PrimaryAvailabilitySetName: %s, DisableAvailabilitySetNodes: %v", driver.cloud.Cloud, driver.cloud.Location, driver.cloud.ResourceGroup, driver.cloud.VMType, driver.cloud.PrimaryScaleSetName, driver.cloud.PrimaryAvailabilitySetName, driver.cloud.DisableAvailabilitySetNodes)
}
}

d.deviceHelper = optimization.NewSafeDeviceHelper()
driver.deviceHelper = optimization.NewSafeDeviceHelper()

if d.getPerfOptimizationEnabled() {
d.nodeInfo, err = optimization.NewNodeInfo(context.TODO(), d.getCloud(), d.NodeID)
if driver.getPerfOptimizationEnabled() {
driver.nodeInfo, err = optimization.NewNodeInfo(context.TODO(), driver.getCloud(), driver.NodeID)
if err != nil {
klog.Errorf("Failed to get node info. Error: %v", err)
}
}

d.mounter, err = mounter.NewSafeMounter(d.enableWindowsHostProcess, d.useCSIProxyGAInterface)
driver.mounter, err = mounter.NewSafeMounter(driver.enableWindowsHostProcess, driver.useCSIProxyGAInterface)
if err != nil {
klog.Fatalf("Failed to get safe mounter. Error: %v", err)
}

d.AddControllerServiceCapabilities(
driver.AddControllerServiceCapabilities(
[]csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
Expand All @@ -158,19 +147,30 @@ func (d *DriverV2) Run(ctx context.Context) error {
csi.ControllerServiceCapability_RPC_LIST_VOLUMES_PUBLISHED_NODES,
csi.ControllerServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
})
d.AddVolumeCapabilityAccessModes(
driver.AddVolumeCapabilityAccessModes(
[]csi.VolumeCapability_AccessMode_Mode{
csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY,
csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER,
csi.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER,
})
d.AddNodeServiceCapabilities([]csi.NodeServiceCapability_RPC_Type{
driver.AddNodeServiceCapabilities([]csi.NodeServiceCapability_RPC_Type{
csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
csi.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
})
return &driver
}

// Run driver initialization
func (d *DriverV2) Run(ctx context.Context) error {
versionMeta, err := GetVersionYAML(d.Name)
if err != nil {
klog.Fatalf("%v", err)
}
klog.Infof("\nDRIVER INFORMATION:\n-------------------\n%s\n\nStreaming logs below:", versionMeta)

grpcInterceptor := grpc.UnaryInterceptor(csicommon.LogGRPC)
if d.enableOtelTracing {
grpcInterceptor = grpc.ChainUnaryInterceptor(csicommon.LogGRPC, otelgrpc.UnaryServerInterceptor())
Expand Down
Loading

0 comments on commit ee47913

Please sign in to comment.