Skip to content

Commit

Permalink
Merge pull request #996 from feiskyer/cp1.23
Browse files Browse the repository at this point in the history
cherry-pick of #986 to release-1.23: reduce node LIST APIs in cloud-node-manager
  • Loading branch information
k8s-ci-robot committed Jan 12, 2022
2 parents 588f6fa + cb66d65 commit fda9c59
Show file tree
Hide file tree
Showing 6 changed files with 336 additions and 643 deletions.
25 changes: 14 additions & 11 deletions cmd/cloud-node-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
apiserveroptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -178,17 +179,6 @@ func (o *CloudNodeManagerOptions) ApplyTo(c *cloudnodeconfig.Config, userAgent s
return err
}

c.EventRecorder = createRecorder(c.Client, userAgent)
c.ClientBuilder = clientbuilder.SimpleControllerClientBuilder{
ClientConfig: c.Kubeconfig,
}
c.VersionedClient = c.ClientBuilder.ClientOrDie("shared-informers")
// TODO(feiskyer): filter watch by node name whenever it's supported.
c.SharedInformers = informers.NewSharedInformerFactory(c.VersionedClient, resyncPeriod(c)())
c.NodeStatusUpdateFrequency = o.NodeStatusUpdateFrequency
c.UseInstanceMetadata = o.UseInstanceMetadata
c.CloudConfigFilePath = o.CloudConfigFilePath

// Default NodeName is hostname.
c.NodeName = strings.ToLower(o.NodeName)
if c.NodeName == "" {
Expand All @@ -200,6 +190,19 @@ func (o *CloudNodeManagerOptions) ApplyTo(c *cloudnodeconfig.Config, userAgent s
c.NodeName = strings.ToLower(hostname)
}

c.EventRecorder = createRecorder(c.Client, userAgent)
c.ClientBuilder = clientbuilder.SimpleControllerClientBuilder{
ClientConfig: c.Kubeconfig,
}
c.VersionedClient = c.ClientBuilder.ClientOrDie("shared-informers")
// Only need to watch the node itself. There is no need to set up a watch on the nodes in the cluster.
c.SharedInformers = informers.NewSharedInformerFactoryWithOptions(c.VersionedClient, resyncPeriod(c)(), informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", c.NodeName).String()
}))
c.NodeStatusUpdateFrequency = o.NodeStatusUpdateFrequency
c.UseInstanceMetadata = o.UseInstanceMetadata
c.CloudConfigFilePath = o.CloudConfigFilePath

c.WindowsService = o.WindowsService

return nil
Expand Down
72 changes: 33 additions & 39 deletions pkg/nodemanager/nodemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/strategicpatch"
Expand Down Expand Up @@ -176,39 +175,30 @@ func (cnc *CloudNodeController) Run(stopCh <-chan struct{}) {

// UpdateNodeStatus updates the node status, such as node addresses
func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) {
nodes, err := cnc.kubeClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{
ResourceVersion: "0",
FieldSelector: fields.OneTermEqualSelector("metadata.name", cnc.nodeName).String(),
})
node, err := cnc.nodeInformer.Lister().Get(cnc.nodeName)
if err != nil {
klog.Errorf("Error monitoring node status: %v", err)
// If node not found, just ignore it.
if apierrors.IsNotFound(err) {
return
}

klog.Errorf("Error getting node %q from informer, err: %v", cnc.nodeName, err)
return
}

for i := range nodes.Items {
cnc.updateNodeAddress(ctx, &nodes.Items[i])
err = cnc.updateNodeAddress(ctx, node)
if err != nil {
klog.Errorf("Error reconciling node address for node %q, err: %v", node.Name, err)
}

for _, node := range nodes.Items {
err = cnc.reconcileNodeLabels(node.Name)
if err != nil {
klog.Errorf("Error reconciling node labels for node %q, err: %v", node.Name, err)
}
err = cnc.reconcileNodeLabels(node)
if err != nil {
klog.Errorf("Error reconciling node labels for node %q, err: %v", node.Name, err)
}
}

// reconcileNodeLabels reconciles node labels transitioning from beta to GA
func (cnc *CloudNodeController) reconcileNodeLabels(nodeName string) error {
node, err := cnc.nodeInformer.Lister().Get(nodeName)
if err != nil {
// If node not found, just ignore it.
if apierrors.IsNotFound(err) {
return nil
}

return err
}

func (cnc *CloudNodeController) reconcileNodeLabels(node *v1.Node) error {
if node.Labels == nil {
// Nothing to reconcile.
return nil
Expand Down Expand Up @@ -248,33 +238,32 @@ func (cnc *CloudNodeController) reconcileNodeLabels(nodeName string) error {
}

// UpdateNodeAddress updates the nodeAddress of a single node
func (cnc *CloudNodeController) updateNodeAddress(ctx context.Context, node *v1.Node) {
func (cnc *CloudNodeController) updateNodeAddress(ctx context.Context, node *v1.Node) error {
// Do not process nodes that are still tainted
cloudTaint := getCloudTaint(node.Spec.Taints)
if cloudTaint != nil {
klog.V(5).Infof("This node %s is still tainted. Will not process.", node.Name)
return
return nil
}

// Node that isn't present according to the cloud provider shouldn't have its address updated
exists, err := cnc.ensureNodeExistsByProviderID(ctx, node)
if err != nil {
// Continue to update node address when not sure the node is not exists
klog.Error(err)
klog.Warningf("ensureNodeExistsByProviderID (node %s) reported an error (%v), continue to update its address", node.Name, err)
} else if !exists {
klog.V(4).Infof("The node %s is no longer present according to the cloud provider, do not process.", node.Name)
return
return nil
}

nodeAddresses, err := cnc.getNodeAddressesByName(ctx, node)
if err != nil {
klog.Errorf("Error getting node addresses for node %q: %v", node.Name, err)
return
return fmt.Errorf("Error getting node addresses for node %q: %v", node.Name, err)
}

if len(nodeAddresses) == 0 {
klog.V(5).Infof("Skipping node address update for node %q since cloud provider did not return any", node.Name)
return
return nil
}

// Check if a hostname address exists in the cloud provided addresses
Expand All @@ -298,19 +287,21 @@ func (cnc *CloudNodeController) updateNodeAddress(ctx context.Context, node *v1.
// it can be found in the cloud as well (consistent with the behaviour in kubelet)
if nodeIP, ok := ensureNodeProvidedIPExists(node, nodeAddresses); ok {
if nodeIP == nil {
klog.Errorf("Specified Node IP not found in cloudprovider for node %q", node.Name)
return
return fmt.Errorf("specified Node IP %s not found in cloudprovider for node %q", nodeAddresses, node.Name)
}
}
if !nodeAddressesChangeDetected(node.Status.Addresses, nodeAddresses) {
return
return nil
}

newNode := node.DeepCopy()
newNode.Status.Addresses = nodeAddresses
_, _, err = PatchNodeStatus(cnc.kubeClient.CoreV1(), types.NodeName(node.Name), node, newNode)
if err != nil {
klog.Errorf("Error patching node with cloud ip addresses = [%v]", err)
return fmt.Errorf("Error patching node with cloud ip addresses = [%v]", err)
}

return nil
}

// nodeModifier is used to carry changes to node objects across multiple attempts to update them
Expand Down Expand Up @@ -360,7 +351,7 @@ func (cnc *CloudNodeController) AddCloudNode(ctx context.Context, obj interface{
// This processes nodes that were added into the cluster, and cloud initialize them if appropriate
func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Node) {
klog.Infof("Initializing node %s with cloud provider", node.Name)
curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(context.TODO(), node.Name, metav1.GetOptions{})
curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(ctx, node.Name, metav1.GetOptions{})
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to get node %s: %w", node.Name, err))
return
Expand Down Expand Up @@ -394,7 +385,7 @@ func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Nod
})

err = clientretry.RetryOnConflict(UpdateNodeSpecBackoff, func() error {
curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(context.TODO(), node.Name, metav1.GetOptions{})
curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(ctx, node.Name, metav1.GetOptions{})
if err != nil {
return err
}
Expand All @@ -403,14 +394,17 @@ func (cnc *CloudNodeController) initializeNode(ctx context.Context, node *v1.Nod
modify(curNode)
}

_, err = cnc.kubeClient.CoreV1().Nodes().Update(context.TODO(), curNode, metav1.UpdateOptions{})
_, err = cnc.kubeClient.CoreV1().Nodes().Update(ctx, curNode, metav1.UpdateOptions{})
if err != nil {
return err
}

// After adding, call UpdateNodeAddress to set the CloudProvider provided IPAddresses
// So that users do not see any significant delay in IP addresses being filled into the node
cnc.updateNodeAddress(ctx, curNode)
err = cnc.updateNodeAddress(ctx, curNode)
if err != nil {
return err
}

klog.Infof("Successfully initialized node %s with cloud provider", node.Name)
return nil
Expand Down

0 comments on commit fda9c59

Please sign in to comment.