Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase timeout of csi-provisioner and csi attacher #93

Merged
merged 3 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cmd/kubevirt-csi-driver/kubevirt-csi-driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (
)

var (
endpoint = flag.String("endpoint", "unix:/csi/csi.sock", "CSI endpoint")
nodeName = flag.String("node-name", "", "The node name - the node this pods runs on")
infraClusterNamespace = flag.String("infra-cluster-namespace", "", "The infra-cluster namespace")
infraClusterKubeconfig = flag.String("infra-cluster-kubeconfig", "", "the infra-cluster kubeconfig file. If not set, defaults to in cluster config.")
infraClusterLabels = flag.String("infra-cluster-labels", "", "The infra-cluster labels to use when creating resources in infra cluster. 'name=value' fields separated by a comma")
endpoint = flag.String("endpoint", "unix:/csi/csi.sock", "CSI endpoint")
nodeName = flag.String("node-name", "", "The node name - the node this pods runs on")
infraClusterNamespace = flag.String("infra-cluster-namespace", "", "The infra-cluster namespace")
infraClusterKubeconfig = flag.String("infra-cluster-kubeconfig", "", "the infra-cluster kubeconfig file. If not set, defaults to in cluster config.")
infraClusterLabels = flag.String("infra-cluster-labels", "", "The infra-cluster labels to use when creating resources in infra cluster. 'name=value' fields separated by a comma")
// infraStorageClassEnforcement = flag.String("infra-storage-class-enforcement", "", "A string encoded yaml that represents the policy of enforcing which infra storage classes are allowed in persistentVolume of type kubevirt")
infraStorageClassEnforcement = os.Getenv("INFRA_STORAGE_CLASS_ENFORCEMENT")

Expand Down
26 changes: 15 additions & 11 deletions deploy/controller-infra/base/deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ spec:
- "--tenant-cluster-kubeconfig=/var/run/secrets/tenantcluster/value"
- "--run-node-service=false"
- "--run-controller-service=true"
- --v=5
- "--v=5"
ports:
- name: healthz
containerPort: 10301
Expand Down Expand Up @@ -76,10 +76,12 @@ spec:
- name: csi-provisioner
image: quay.io/openshift/origin-csi-external-provisioner:latest
args:
- --csi-address=$(ADDRESS)
- --default-fstype=ext4
- --kubeconfig=/var/run/secrets/tenantcluster/value
- --v=5
- "--csi-address=$(ADDRESS)"
- "--default-fstype=ext4"
- "--kubeconfig=/var/run/secrets/tenantcluster/value"
- "--v=5"
- "--timeout=3m"
- "--retry-interval-max=1m"
env:
- name: ADDRESS
value: /var/lib/csi/sockets/pluginproxy/csi.sock
Expand All @@ -91,9 +93,11 @@ spec:
- name: csi-attacher
image: quay.io/openshift/origin-csi-external-attacher:latest
args:
- --csi-address=$(ADDRESS)
- --kubeconfig=/var/run/secrets/tenantcluster/value
- --v=5
- "--csi-address=$(ADDRESS)"
- "--kubeconfig=/var/run/secrets/tenantcluster/value"
- "--v=5"
- "--timeout=3m"
- "--retry-interval-max=1m"
env:
- name: ADDRESS
value: /var/lib/csi/sockets/pluginproxy/csi.sock
Expand All @@ -109,9 +113,9 @@ spec:
- name: csi-liveness-probe
image: quay.io/openshift/origin-csi-livenessprobe:latest
args:
- --csi-address=/csi/csi.sock
- --probe-timeout=3s
- --health-port=10301
- "--csi-address=/csi/csi.sock"
- "--probe-timeout=3s"
- "--health-port=10301"
volumeMounts:
- name: socket-dir
mountPath: /csi
Expand Down
13 changes: 7 additions & 6 deletions deploy/tenant/base/deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ spec:
- "--node-name=$(KUBE_NODE_NAME)"
- "--run-node-service=true"
- "--run-controller-service=false"
- "--v=5"
env:
- name: KUBE_NODE_NAME
valueFrom:
Expand Down Expand Up @@ -203,9 +204,9 @@ spec:
- name: csi-node-driver-registrar
image: quay.io/openshift/origin-csi-node-driver-registrar:latest
args:
- --csi-address=$(ADDRESS)
- --kubelet-registration-path=$(DRIVER_REG_SOCK_PATH)
- --v=5
- "--csi-address=$(ADDRESS)"
- "--kubelet-registration-path=$(DRIVER_REG_SOCK_PATH)"
- "--v=5"
lifecycle:
preStop:
exec:
Expand All @@ -227,9 +228,9 @@ spec:
- name: csi-liveness-probe
image: quay.io/openshift/origin-csi-livenessprobe:latest
args:
- --csi-address=/csi/csi.sock
- --probe-timeout=3s
- --health-port=10300
- "--csi-address=/csi/csi.sock"
- "--probe-timeout=3s"
- "--health-port=10300"
volumeMounts:
- name: plugin-dir
mountPath: /csi
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubevirt/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (c *client) RemoveVolumeFromVM(namespace string, vmName string, hotPlugRequ
// EnsureVolumeAvailable checks to make sure the volume is available in the node before returning, checks for 2 minutes
func (c *client) EnsureVolumeAvailable(namespace, vmName, volumeName string, timeout time.Duration) error {
return wait.PollImmediate(time.Second, timeout, func() (done bool, err error) {
vmi, err := c.virtClient.VirtualMachineInstance(namespace).Get(context.TODO(), vmName, &metav1.GetOptions{})
vmi, err := c.GetVirtualMachine(namespace, vmName)
if err != nil {
return false, err
}
Expand All @@ -85,7 +85,7 @@ func (c *client) EnsureVolumeAvailable(namespace, vmName, volumeName string, tim
// EnsureVolumeAvailable checks to make sure the volume is available in the node before returning, checks for 2 minutes
func (c *client) EnsureVolumeRemoved(namespace, vmName, volumeName string, timeout time.Duration) error {
return wait.PollImmediate(time.Second, timeout, func() (done bool, err error) {
vmi, err := c.virtClient.VirtualMachineInstance(namespace).Get(context.TODO(), vmName, &metav1.GetOptions{})
vmi, err := c.GetVirtualMachine(namespace, vmName)
if err != nil {
return false, err
}
Expand Down
113 changes: 83 additions & 30 deletions pkg/service/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/wait"
kubevirtv1 "kubevirt.io/api/core/v1"

"github.com/container-storage-interface/spec/lib/go/csi"
Expand Down Expand Up @@ -195,7 +196,7 @@ func (c *ControllerService) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, err
}
dvName := req.VolumeId
klog.Infof("Removing data volume with %s", dvName)
klog.V(3).Infof("Removing data volume with %s", dvName)

err := c.virtClient.DeleteDataVolume(c.infraClusterNamespace, dvName)
if err != nil {
Expand Down Expand Up @@ -231,7 +232,7 @@ func (c *ControllerService) ControllerPublishVolume(
}
dvName := req.GetVolumeId()

klog.Infof("Attaching DataVolume %s to Node ID %s", dvName, req.NodeId)
klog.V(3).Infof("Attaching DataVolume %s to Node ID %s", dvName, req.NodeId)

// Get VM name
vmName, err := c.getVMNameByCSINodeID(req.NodeId)
Expand All @@ -247,7 +248,7 @@ func (c *ControllerService) ControllerPublishVolume(
bus := req.VolumeContext[busParameter]

// hotplug DataVolume to VM
klog.Infof("Start attaching DataVolume %s to VM %s. Volume name: %s. Serial: %s. Bus: %s", dvName, vmName, dvName, serial, bus)
klog.V(3).Infof("Start attaching DataVolume %s to VM %s. Volume name: %s. Serial: %s. Bus: %s", dvName, vmName, dvName, serial, bus)

addVolumeOptions := &kubevirtv1.AddVolumeOptions{
Name: dvName,
Expand All @@ -266,32 +267,58 @@ func (c *ControllerService) ControllerPublishVolume(
},
}

volumeFound := false
vm, err := c.virtClient.GetVirtualMachine(c.infraClusterNamespace, vmName)
if err := wait.ExponentialBackoff(wait.Backoff{
Duration: time.Second,
Steps: 5,
Factor: 2,
Cap: time.Second * 30,
}, func() (bool, error) {
if err := c.addVolumeToVm(dvName, vmName, addVolumeOptions); err != nil {
klog.Infof("failed adding volume %s to VM %s, retrying, err: %v", dvName, vmName, err)
return false, nil
}
return true, nil
}); err != nil {
return nil, err
}

// Ensure that the csi-attacher and csi-provisioner --timeout values are > the timeout specified here so we don't get
// odd failures with detaching volumes.
err = c.virtClient.EnsureVolumeAvailable(c.infraClusterNamespace, vmName, dvName, time.Minute*2)
if err != nil {
klog.Errorf("volume %s failed to be ready in time (2m) in VM %s, %v", dvName, vmName, err)
return nil, err
}

klog.V(3).Infof("Successfully attached volume %s to VM %s", dvName, vmName)
return &csi.ControllerPublishVolumeResponse{}, nil
}

func (c *ControllerService) isVolumeAttached(dvName, vmName string) (bool, error) {
vm, err := c.virtClient.GetVirtualMachine(c.infraClusterNamespace, vmName)
if err != nil {
return false, err
}
for _, volumeStatus := range vm.Status.VolumeStatus {
if volumeStatus.Name == dvName {
volumeFound = true
break
return true, nil
}
}
return false, nil
}

func (c *ControllerService) addVolumeToVm(dvName, vmName string, addVolumeOptions *kubevirtv1.AddVolumeOptions) error {
volumeFound, err := c.isVolumeAttached(dvName, vmName)
if err != nil {
return err
}
if !volumeFound {
err = c.virtClient.AddVolumeToVM(c.infraClusterNamespace, vmName, addVolumeOptions)
if err != nil {
klog.Errorf("failed adding volume %s to VM %s, %v", dvName, vmName, err)
return nil, err
return err
}
}

err = c.virtClient.EnsureVolumeAvailable(c.infraClusterNamespace, vmName, dvName, time.Minute*2)
if err != nil {
klog.Errorf("volume %s failed to be ready in time in VM %s, %v", dvName, vmName, err)
return nil, err
}

return &csi.ControllerPublishVolumeResponse{}, nil
return nil
}

func (c *ControllerService) validateControllerUnpublishVolumeRequest(req *csi.ControllerUnpublishVolumeRequest) error {
Expand All @@ -314,19 +341,44 @@ func (c *ControllerService) ControllerUnpublishVolume(ctx context.Context, req *
return nil, err
}
dvName := req.VolumeId
klog.Infof("Detaching DataVolume %s from Node ID %s", dvName, req.NodeId)
klog.V(3).Infof("Detaching DataVolume %s from Node ID %s", dvName, req.NodeId)

// Get VM name
vmName, err := c.getVMNameByCSINodeID(req.NodeId)
if err != nil {
return nil, err
}

vm, err := c.virtClient.GetVirtualMachine(c.infraClusterNamespace, vmName)
if err := wait.ExponentialBackoff(wait.Backoff{
Duration: time.Second,
Steps: 5,
Factor: 2,
Cap: time.Second * 30,
}, func() (bool, error) {
if err := c.removeVolumeFromVm(dvName, vmName); err != nil {
klog.Infof("failed removing volume %s from VM %s, err: %v", dvName, vmName, err)
return false, nil
}
return true, nil
}); err != nil {
return nil, err
}

err = c.virtClient.EnsureVolumeRemoved(c.infraClusterNamespace, vmName, dvName, time.Minute*2)
if err != nil {
klog.Error("failed getting virtual machine " + vmName)
klog.Errorf("volume %s failed to be removed in time (2m) from VM %s, %v", dvName, vmName, err)
return nil, err
}

klog.V(3).Infof("Successfully unpublished volume %s from VM %s", dvName, vmName)
return &csi.ControllerUnpublishVolumeResponse{}, nil
}

func (c *ControllerService) removeVolumeFromVm(dvName, vmName string) error {
vm, err := c.virtClient.GetVirtualMachine(c.infraClusterNamespace, vmName)
if err != nil {
return err
}
removePossible := false
for _, volumeStatus := range vm.Status.VolumeStatus {
if volumeStatus.HotplugVolume != nil && volumeStatus.Name == dvName {
Expand All @@ -337,17 +389,10 @@ func (c *ControllerService) ControllerUnpublishVolume(ctx context.Context, req *
// Detach DataVolume from VM
err = c.virtClient.RemoveVolumeFromVM(c.infraClusterNamespace, vmName, &kubevirtv1.RemoveVolumeOptions{Name: dvName})
if err != nil {
klog.Error("failed removing volume " + dvName + " from VM " + vmName)
return nil, err
return err
}
}
err = c.virtClient.EnsureVolumeRemoved(c.infraClusterNamespace, vmName, dvName, time.Minute*2)
if err != nil {
klog.Errorf("volume %s failed to be removed in time from VM %s, %v", dvName, vmName, err)
return nil, err
}

return &csi.ControllerUnpublishVolumeResponse{}, nil
return nil
}

// ValidateVolumeCapabilities unimplemented
Expand All @@ -359,19 +404,27 @@ func (c *ControllerService) ValidateVolumeCapabilities(ctx context.Context, req
if len(req.VolumeCapabilities) == 0 {
return nil, status.Errorf(codes.InvalidArgument, "volumeCapabilities not provided for %s", req.VolumeId)
}

klog.V(3).Info("Calling volume capabilities")
for _, cap := range req.GetVolumeCapabilities() {
if cap.GetMount() == nil {
return nil, status.Error(codes.InvalidArgument, "mount type is undefined")
}
}
dvName := req.GetVolumeId()
klog.V(3).Infof("DataVolume name %s", dvName)
if _, err := c.virtClient.GetDataVolume(c.infraClusterNamespace, dvName); errors.IsNotFound(err) {
return nil, status.Errorf(codes.NotFound, "volume %s not found", req.GetVolumeId())
} else if err != nil {
return nil, err
}

klog.V(5).Info("Returning capabilities %v", &csi.ValidateVolumeCapabilitiesResponse{
Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
VolumeContext: req.GetVolumeContext(),
VolumeCapabilities: req.GetVolumeCapabilities(),
Parameters: req.GetParameters(),
},
})
return &csi.ValidateVolumeCapabilitiesResponse{
Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
VolumeContext: req.GetVolumeContext(),
Expand Down
Loading