Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds retry mechanism for Node resource #2870

Merged
merged 1 commit into from
Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
222 changes: 201 additions & 21 deletions go-controller/pkg/ovn/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package ovn
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/fields"
"math/rand"
"net"
"os"
"reflect"
"strings"
"sync"
"time"

kapi "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
utilwait "k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -1116,7 +1119,7 @@ func (oc *Controller) allocateNodeSubnets(node *kapi.Node) ([]*net.IPNet, []*net
}

func (oc *Controller) addNode(node *kapi.Node) ([]*net.IPNet, error) {
oc.clearInitialNodeNetworkUnavailableCondition(node, nil)
oc.clearInitialNodeNetworkUnavailableCondition(node)
hostSubnets, allocatedSubnets, err := oc.allocateNodeSubnets(node)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1148,7 +1151,9 @@ func (oc *Controller) addNode(node *kapi.Node) ([]*net.IPNet, error) {
}

// delete stale chassis in SBDB if any
oc.deleteStaleNodeChassis(node)
if err = oc.deleteStaleNodeChassis(node); err != nil {
return nil, err
}

// If node annotation succeeds and subnets were allocated, update the used subnet count
if len(allocatedSubnets) > 0 {
Expand Down Expand Up @@ -1187,20 +1192,21 @@ func (oc *Controller) checkNodeChassisMismatch(node *kapi.Node) (bool, error) {
}

// delete stale chassis in SBDB if system-id of the specific node has changed.
func (oc *Controller) deleteStaleNodeChassis(node *kapi.Node) {
func (oc *Controller) deleteStaleNodeChassis(node *kapi.Node) error {
mismatch, err := oc.checkNodeChassisMismatch(node)
if err != nil {
klog.Errorf("Failed to check if there is any stale chassis for node %s in SBDB: %v", node.Name, err)
return fmt.Errorf("failed to check if there is any stale chassis for node %s in SBDB: %v", node.Name, err)
} else if mismatch {
klog.V(5).Infof("Node %s now has a new chassis ID, delete its stale chassis in SBDB", node.Name)
if err = libovsdbops.DeleteNodeChassis(oc.sbClient, node.Name); err != nil {
// Send an event and Log on failure
oc.recorder.Eventf(node, kapi.EventTypeWarning, "ErrorMismatchChassis",
"Node %s is now with a new chassis ID. Its stale chassis entry is still in the SBDB",
node.Name)
klog.Errorf("Node %s is now with a new chassis ID. Its stale chassis entry is still in the SBDB", node.Name)
return fmt.Errorf("node %s is now with a new chassis ID. Its stale chassis entry is still in the SBDB", node.Name)
}
}
return nil
}

func (oc *Controller) deleteNodeHostSubnet(nodeName string, subnet *net.IPNet) error {
Expand Down Expand Up @@ -1253,33 +1259,32 @@ func (oc *Controller) deleteNodeLogicalNetwork(nodeName string) error {
return nil
}

func (oc *Controller) deleteNode(nodeName string, hostSubnets []*net.IPNet) {
// Clean up as much as we can but don't hard error
func (oc *Controller) deleteNode(nodeName string, hostSubnets []*net.IPNet) error {
for _, hostSubnet := range hostSubnets {
if err := oc.deleteNodeHostSubnet(nodeName, hostSubnet); err != nil {
klog.Errorf("Error deleting node %s HostSubnet %v: %v", nodeName, hostSubnet, err)
} else {
util.UpdateUsedHostSubnetsCount(hostSubnet, &oc.v4HostSubnetsUsed, &oc.v6HostSubnetsUsed, false)
return fmt.Errorf("error deleting node %s HostSubnet %v: %v", nodeName, hostSubnet, err)
}
util.UpdateUsedHostSubnetsCount(hostSubnet, &oc.v4HostSubnetsUsed, &oc.v6HostSubnetsUsed, false)
}
// update metrics
metrics.RecordSubnetUsage(oc.v4HostSubnetsUsed, oc.v6HostSubnetsUsed)

if err := oc.deleteNodeLogicalNetwork(nodeName); err != nil {
klog.Errorf("Error deleting node %s logical network: %v", nodeName, err)
return fmt.Errorf("error deleting node %s logical network: %v", nodeName, err)
}

if err := oc.gatewayCleanup(nodeName); err != nil {
klog.Errorf("Failed to clean up node %s gateway: (%v)", nodeName, err)
return fmt.Errorf("failed to clean up node %s gateway: (%v)", nodeName, err)
}

if err := oc.joinSwIPManager.ReleaseJoinLRPIPs(nodeName); err != nil {
klog.Errorf("Failed to clean up GR LRP IPs for node %s: %v", nodeName, err)
return fmt.Errorf("failed to clean up GR LRP IPs for node %s: %v", nodeName, err)
}

if err := libovsdbops.DeleteNodeChassis(oc.sbClient, nodeName); err != nil {
klog.Errorf("Failed to remove the chassis associated with node %s in the OVN SB Chassis table: %v", nodeName, err)
return fmt.Errorf("failed to remove the chassis associated with node %s in the OVN SB Chassis table: %v", nodeName, err)
}
return nil
}

// OVN uses an overlay and doesn't need GCE Routes, we need to
Expand All @@ -1289,15 +1294,11 @@ func (oc *Controller) deleteNode(nodeName string, hostSubnets []*net.IPNet) {
// TODO: make upstream kubelet more flexible with overlays and GCE so this
// condition doesn't get added for network plugins that don't want it, and then
// we can remove this function.
func (oc *Controller) clearInitialNodeNetworkUnavailableCondition(origNode, newNode *kapi.Node) {
func (oc *Controller) clearInitialNodeNetworkUnavailableCondition(origNode *kapi.Node) {
// If it is not a Cloud Provider node, then nothing to do.
if origNode.Spec.ProviderID == "" {
return
}
// if newNode is not nil, then we are called from UpdateFunc()
if newNode != nil && reflect.DeepEqual(origNode.Status.Conditions, newNode.Status.Conditions) {
return
}

cleared := false
resultErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
Expand Down Expand Up @@ -1481,7 +1482,9 @@ func (oc *Controller) syncNodesRetriable(nodes []interface{}) error {
continue
}

oc.deleteNode(nodeSwitch.Name, subnets)
if err := oc.deleteNode(nodeSwitch.Name, subnets); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I'm a tiny bit confused here, so the PR that @flavio-fernandes did to retry syncNodesRetriable that's a different beast? feels like we are having parallel retries in different situations... since now we return error when deleteNode fails ^ this means that deletion will be retried in syncWithRetry and not in our node retry?

Seems like its the same for pods, sort of startup only finite retry mechanism which is fatal versus our new retry mechanism which is infinite,,,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also L1472 and L1466, is that worth retrying? not sure why they are just warnings?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syncNodesRetriable makes the sync function of the node watcher execute again if it fails. Here we're making the addFunc, deleteFunc and updateFunc execute again if they fail. So yeah, it's a different beast!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct

return fmt.Errorf("failed to delete node:%s, err:%v", nodeSwitch.Name, err)
}
//remove the node from the chassis map so we don't delete it twice
staleChassis.Delete(nodeSwitch.Name)
}
Expand All @@ -1491,3 +1494,180 @@ func (oc *Controller) syncNodesRetriable(nodes []interface{}) error {
}
return nil
}

// nodeSyncs structure contains flags for the different failures
// so the retry logic can control what need to retry based
type nodeSyncs struct {
syncNode bool
syncClusterRouterPort bool
syncMgmtPort bool
syncGw bool
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, can you please add a comment explaining why we need this, I had to look at the whole code to see "oh its to retry only whichever parts needed to be retried" and have a way to control invoking the individual parts within addUpdateNodeEvent. This is just a layer on top of the existing xFailed maps we have to just easily store whether an entry exists in the map or not right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: syncClusterRouterPort to stay consistent with nodeClusterRouterPortFailed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well those flags also can be all set to true on fresh create during update failure or iterate they are set based on syncMap as u indicated to control which part need to be retried


func (oc *Controller) addUpdateNodeEvent(node *kapi.Node, nSyncs *nodeSyncs) error {
var hostSubnets []*net.IPNet
var errs []error
var err error

if noHostSubnet := noHostSubnet(node); noHostSubnet {
err := oc.lsManager.AddNoHostSubnetNode(node.Name)
if err != nil {
return fmt.Errorf("nodeAdd: error adding noHost subnet for node %s: %w", node.Name, err)
}
}

klog.Infof("Adding or Updating Node %q", node.Name)
if nSyncs.syncNode {
if hostSubnets, err = oc.addNode(node); err != nil {
oc.addNodeFailed.Store(node.Name, true)
oc.nodeClusterRouterPortFailed.Store(node.Name, true)
oc.mgmtPortFailed.Store(node.Name, true)
oc.gatewaysFailed.Store(node.Name, true)
return fmt.Errorf("nodeAdd: error creating subnet for node %s: %w", node.Name, err)
}
msherif1234 marked this conversation as resolved.
Show resolved Hide resolved
oc.addNodeFailed.Delete(node.Name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self note: this is to make sure we removed it from addNodeFailed if this was a retry case, for a fresh case this won't matter since we'll never have an entry in nodeFailed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct this function as the name indicate handle both add and update

}

if nSyncs.syncClusterRouterPort {
if err = oc.syncNodeClusterRouterPort(node, nil); err != nil {
errs = append(errs, err)
oc.nodeClusterRouterPortFailed.Store(node.Name, true)
} else {
oc.nodeClusterRouterPortFailed.Delete(node.Name)
}
}

if nSyncs.syncMgmtPort {
err := oc.syncNodeManagementPort(node, hostSubnets)
if err != nil {
errs = append(errs, err)
oc.mgmtPortFailed.Store(node.Name, true)
} else {
oc.mgmtPortFailed.Delete(node.Name)
}
}

// delete stale chassis in SBDB if any
if err := oc.deleteStaleNodeChassis(node); err != nil {
errs = append(errs, err)
}

oc.clearInitialNodeNetworkUnavailableCondition(node)

if nSyncs.syncGw {
err := oc.syncNodeGateway(node, nil)
if err != nil {
errs = append(errs, err)
oc.gatewaysFailed.Store(node.Name, true)
} else {
oc.gatewaysFailed.Delete(node.Name)
}
}

// ensure pods that already exist on this node have their logical ports created
options := metav1.ListOptions{FieldSelector: fields.OneTermEqualSelector("spec.nodeName", node.Name).String()}
pods, err := oc.client.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), options)
if err != nil {
klog.Errorf("Unable to list existing pods on node: %s, existing pods on this node may not function")
} else {
oc.addRetryPods(pods.Items)
oc.requestRetryPods()
}

if len(errs) == 0 {
return nil
}
return kerrors.NewAggregate(errs)
}

func (oc *Controller) deleteNodeEvent(node *kapi.Node) error {
klog.V(5).Infof("Deleting Node %q. Removing the node from "+
"various caches", node.Name)

nodeSubnets, _ := util.ParseNodeHostSubnetAnnotation(node)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't want to return error here if the parse fails? its an error right? what's the use of passing nodeSubnets below if its nil? or is this intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't that mean we have bigger issue if we are at delete node and it has not subnet annotation ? that is probably why the current code didn't check and proceeded to node delete ? I can add it but that is problem for node create note delete and retry delete won't help WDYT ?

if err := oc.deleteNode(node.Name, nodeSubnets); err != nil {
return err
}
oc.lsManager.DeleteNode(node.Name)
oc.addNodeFailed.Delete(node.Name)
oc.mgmtPortFailed.Delete(node.Name)
oc.gatewaysFailed.Delete(node.Name)
oc.nodeClusterRouterPortFailed.Delete(node.Name)
return nil
}

// iterateRetryNodes checks if any outstanding Nodes exists
// then tries to re-add them if so
// updateAll forces all nodes to be attempted to be retried regardless
func (oc *Controller) iterateRetryNodes(updateAll bool) {
oc.retryNodes.retryMutex.Lock()
defer oc.retryNodes.retryMutex.Unlock()
now := time.Now()
for nodeName, entry := range oc.retryNodes.entries {
if entry.ignore {
// neither addition nor deletion is being retried
continue
}
// check if we need to create
if entry.newObj != nil {
n := entry.newObj.(*kapi.Node)
kNode, err := oc.watchFactory.GetNode(n.Name)
if err != nil && errors.IsNotFound(err) {
klog.Infof("%s node not found in the informers cache, not going to retry node setup", nodeName)
entry.newObj = nil
} else {
entry.newObj = kNode
}
}

entry.backoffSec = entry.backoffSec * 2
if entry.backoffSec > 60 {
entry.backoffSec = 60
}
backoff := (entry.backoffSec * time.Second) + (time.Duration(rand.Intn(500)) * time.Millisecond)
nTimer := entry.timeStamp.Add(backoff)
if updateAll || now.After(nTimer) {
klog.Infof("Node Retry: %s retry node setup", nodeName)

// check if we need to delete
if entry.oldObj != nil {
klog.Infof("Node Retry: Removing old Node for %s", nodeName)
node := entry.oldObj.(*kapi.Node)
if err := oc.deleteNodeEvent(node); err != nil {
klog.Infof("Node Retry delete failed for %s, will try again later: %v",
nodeName, err)
entry.timeStamp = time.Now()
continue
}
// successfully cleaned up old node, remove it from the retry cache
entry.oldObj = nil
}

// create new node if needed
if entry.newObj != nil {
klog.Infof("Node Retry: Creating new node for %s", nodeName)
_, nodeSync := oc.addNodeFailed.Load(nodeName)
_, clusterRtrSync := oc.nodeClusterRouterPortFailed.Load(nodeName)
_, mgmtSync := oc.mgmtPortFailed.Load(nodeName)
_, gwSync := oc.gatewaysFailed.Load(nodeName)
if err := oc.addUpdateNodeEvent(entry.newObj.(*kapi.Node),
&nodeSyncs{nodeSync,
clusterRtrSync,
mgmtSync,
gwSync}); err != nil {
klog.Infof("Node Retry create failed for %s, will try again later: %v",
nodeName, err)
entry.timeStamp = time.Now()
continue
}
// successfully create node, remove it from the retry cache
entry.newObj = nil
}

klog.Infof("Node Retry successful for %s", nodeName)
oc.retryNodes.deleteRetryObj(nodeName, false)
} else {
klog.V(5).Infof("%s retry node not after timer yet, time: %s", nodeName, nTimer)
}
}
}