Skip to content

Commit

Permalink
Merge pull request #89809 from jsafrane/automated-cherry-pick-of-#895…
Browse files Browse the repository at this point in the history
…89-upstream-release-1.18

Automated cherry pick of #89589: Wait for APIServer 'ok' forever during CSINode
  • Loading branch information
k8s-ci-robot committed Apr 17, 2020
2 parents 7b6b53c + 4860841 commit e97c570
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 18 deletions.
57 changes: 44 additions & 13 deletions pkg/volume/csi/csi_plugin.go
Expand Up @@ -17,15 +17,14 @@ limitations under the License.
package csi

import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"time"

"context"

"k8s.io/klog"

api "k8s.io/api/core/v1"
Expand Down Expand Up @@ -227,10 +226,10 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {

if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) &&
utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) {
// This function prevents Kubelet from posting Ready status until CSINodeInfo
// This function prevents Kubelet from posting Ready status until CSINode
// is both installed and initialized
if err := initializeCSINode(host); err != nil {
return errors.New(log("failed to initialize CSINodeInfo: %v", err))
return errors.New(log("failed to initialize CSINode: %v", err))
}
}

Expand All @@ -240,21 +239,28 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
func initializeCSINode(host volume.VolumeHost) error {
kvh, ok := host.(volume.KubeletVolumeHost)
if !ok {
klog.V(4).Info("Cast from VolumeHost to KubeletVolumeHost failed. Skipping CSINodeInfo initialization, not running on kubelet")
klog.V(4).Info("Cast from VolumeHost to KubeletVolumeHost failed. Skipping CSINode initialization, not running on kubelet")
return nil
}
kubeClient := host.GetKubeClient()
if kubeClient == nil {
// Kubelet running in standalone mode. Skip CSINodeInfo initialization
klog.Warning("Skipping CSINodeInfo initialization, kubelet running in standalone mode")
// Kubelet running in standalone mode. Skip CSINode initialization
klog.Warning("Skipping CSINode initialization, kubelet running in standalone mode")
return nil
}

kvh.SetKubeletError(errors.New("CSINodeInfo is not yet initialized"))
kvh.SetKubeletError(errors.New("CSINode is not yet initialized"))

go func() {
defer utilruntime.HandleCrash()

// First wait indefinitely to talk to Kube APIServer
nodeName := host.GetNodeName()
err := waitForAPIServerForever(kubeClient, nodeName)
if err != nil {
klog.Fatalf("Failed to initialize CSINode while waiting for API server to report ok: %v", err)
}

// Backoff parameters tuned to retry over 140 seconds. Will fail and restart the Kubelet
// after max retry steps.
initBackoff := wait.Backoff{
Expand All @@ -263,12 +269,12 @@ func initializeCSINode(host volume.VolumeHost) error {
Factor: 6.0,
Jitter: 0.1,
}
err := wait.ExponentialBackoff(initBackoff, func() (bool, error) {
klog.V(4).Infof("Initializing migrated drivers on CSINodeInfo")
err = wait.ExponentialBackoff(initBackoff, func() (bool, error) {
klog.V(4).Infof("Initializing migrated drivers on CSINode")
err := nim.InitializeCSINodeWithAnnotation()
if err != nil {
kvh.SetKubeletError(fmt.Errorf("Failed to initialize CSINodeInfo: %v", err))
klog.Errorf("Failed to initialize CSINodeInfo: %v", err)
kvh.SetKubeletError(fmt.Errorf("Failed to initialize CSINode: %v", err))
klog.Errorf("Failed to initialize CSINode: %v", err)
return false, nil
}

Expand All @@ -282,7 +288,7 @@ func initializeCSINode(host volume.VolumeHost) error {
// using CSI for all Migrated volume plugins. Then all the CSINode initialization
// code can be dropped from Kubelet.
// Kill the Kubelet process and allow it to restart to retry initialization
klog.Fatalf("Failed to initialize CSINodeInfo after retrying")
klog.Fatalf("Failed to initialize CSINode after retrying: %v", err)
}
}()
return nil
Expand Down Expand Up @@ -914,3 +920,28 @@ func highestSupportedVersion(versions []string) (*utilversion.Version, error) {
}
return highestSupportedVersion, nil
}

// waitForAPIServerForever waits forever to get a CSINode instance as a proxy
// for a healthy APIServer
func waitForAPIServerForever(client clientset.Interface, nodeName types.NodeName) error {
var lastErr error
err := wait.PollImmediateInfinite(time.Second, func() (bool, error) {
// Get a CSINode from API server to make sure 1) kubelet can reach API server
// and 2) it has enough permissions. Kubelet may have restricted permissions
// when it's bootstrapping TLS.
// https://kubernetes.io/docs/reference/command-line-tools-reference/kubelet-tls-bootstrapping/
_, lastErr = client.StorageV1().CSINodes().Get(context.TODO(), string(nodeName), meta.GetOptions{})
if lastErr == nil || apierrors.IsNotFound(lastErr) {
// API server contacted
return true, nil
}
klog.V(2).Infof("Failed to contact API server when waiting for CSINode publishing: %s", lastErr)
return false, nil
})
if err != nil {
// In theory this is unreachable, but just in case:
return fmt.Errorf("%v: %v", err, lastErr)
}

return nil
}
8 changes: 4 additions & 4 deletions pkg/volume/csi/nodeinfomanager/nodeinfomanager.go
Expand Up @@ -397,16 +397,16 @@ func (nim *nodeInfoManager) InitializeCSINodeWithAnnotation() error {
return goerrors.New("error getting CSI client")
}

var updateErrs []error
var lastErr error
err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
if err := nim.tryInitializeCSINodeWithAnnotation(csiKubeClient); err != nil {
updateErrs = append(updateErrs, err)
if lastErr = nim.tryInitializeCSINodeWithAnnotation(csiKubeClient); lastErr != nil {
klog.V(2).Infof("Failed to publish CSINode: %v", lastErr)
return false, nil
}
return true, nil
})
if err != nil {
return fmt.Errorf("error updating CSINode annotation: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
return fmt.Errorf("error updating CSINode annotation: %v; caused by: %v", err, lastErr)
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/volume/testing/testing.go
Expand Up @@ -1870,7 +1870,7 @@ func (f *fakeVolumeHost) WaitForCacheSync() error {
}

func (f *fakeVolumeHost) WaitForKubeletErrNil() error {
return wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) {
return wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
f.mux.Lock()
defer f.mux.Unlock()
return f.kubeletErr == nil, nil
Expand Down

0 comments on commit e97c570

Please sign in to comment.