Skip to content

Commit

Permalink
EGW: Delete stale conntrack entries
Browse files Browse the repository at this point in the history
This commit adds logic to delete the conntrack entries
that contain src MAC address in the "labels" field when
using ECMP routes on the GR.

Logic:

1) annotate the namespace each time an exgw is added/deleted with list of ips
2) add new informer for namespace on node side checking only if gw ip annotation OR external-gws annotation changed
3) ovnkube node on namespace change, iterates through all the ips and initiates an arp request via ovnk and collects the MACs
4) once all the responses come back, we have all the known macs
5) we search for ct entries for any pod ip belonging to the namespace, if ct_label is loaded with a mac not in our list we flush it
6) we run the above in a goroutine as well set which will run every 5mins looping through all relevant namespaces.

Signed-off-by: Surya Seetharaman <suryaseetharaman.9@gmail.com>
(cherry picked from commit 5a3a8b8)
(cherry picked from commit c3f54d0)
  • Loading branch information
tssurya committed Jul 13, 2022
1 parent cd37c50 commit f78c8b6
Show file tree
Hide file tree
Showing 17 changed files with 584 additions and 59 deletions.
1 change: 1 addition & 0 deletions dist/templates/ovn-setup.yaml.j2
Expand Up @@ -73,6 +73,7 @@ rules:
- apiGroups:
- ""
resources:
- namespaces
- nodes
- pods
verbs: ["patch", "update"]
Expand Down
2 changes: 1 addition & 1 deletion go-controller/pkg/cni/helper_linux.go
Expand Up @@ -544,7 +544,7 @@ func (pr *PodRequest) deletePodConntrack() {
continue
}
}
err = util.DeleteConntrack(ip.Address.IP.String(), 0, "", netlink.ConntrackReplyAnyIP)
err = util.DeleteConntrack(ip.Address.IP.String(), 0, "", netlink.ConntrackReplyAnyIP, nil)
if err != nil {
klog.Errorf("Failed to delete Conntrack Entry for %s: %v", ip.Address.IP.String(), err)
continue
Expand Down
12 changes: 12 additions & 0 deletions go-controller/pkg/factory/factory.go
Expand Up @@ -300,7 +300,19 @@ func NewNodeWatchFactory(ovnClientset *util.OVNClientset, nodeName string) (*Wat
})
})

// For namespaces
wf.iFactory.InformerFor(&kapi.Namespace{}, func(c kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return v1coreinformers.NewNamespaceInformer(
c,
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
})

var err error
wf.informers[NamespaceType], err = newInformer(NamespaceType, wf.iFactory.Core().V1().Namespaces().Informer())
if err != nil {
return nil, err
}
wf.informers[PodType], err = newQueuedInformer(PodType, wf.iFactory.Core().V1().Pods().Informer(), wf.stopChan,
defaultNumEventQueues)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions go-controller/pkg/factory/types.go
Expand Up @@ -42,9 +42,14 @@ type NodeWatchFactory interface {
AddPodHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error)
RemovePodHandler(handler *Handler)

AddNamespaceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{}) error) (*Handler, error)
RemoveNamespaceHandler(handler *Handler)

NodeInformer() cache.SharedIndexInformer
LocalPodInformer() cache.SharedIndexInformer

GetPods(namespace string) ([]*kapi.Pod, error)
GetNamespaces() ([]*kapi.Namespace, error)
GetNode(name string) (*kapi.Node, error)
GetNodes() ([]*kapi.Node, error)
ListNodes(selector labels.Selector) ([]*kapi.Node, error)
Expand Down
4 changes: 2 additions & 2 deletions go-controller/pkg/node/gateway_shared_intf.go
Expand Up @@ -542,7 +542,7 @@ func (npw *nodePortWatcher) UpdateService(old, new *kapi.Service) {
func deleteConntrackForServiceVIP(svcVIPs []string, svcPorts []kapi.ServicePort, ns, name string) error {
for _, svcVIP := range svcVIPs {
for _, svcPort := range svcPorts {
err := util.DeleteConntrack(svcVIP, svcPort.Port, svcPort.Protocol, netlink.ConntrackOrigDstIP)
err := util.DeleteConntrack(svcVIP, svcPort.Port, svcPort.Protocol, netlink.ConntrackOrigDstIP, nil)
if err != nil {
return fmt.Errorf("failed to delete conntrack entry for service %s/%s with svcVIP %s, svcPort %d, protocol %s: %v",
ns, name, svcVIP, svcPort.Port, svcPort.Protocol, err)
Expand All @@ -564,7 +564,7 @@ func (npw *nodePortWatcher) deleteConntrackForService(service *kapi.Service) err
nodeIPs := npw.nodeIPManager.ListAddresses()
for _, nodeIP := range nodeIPs {
for _, svcPort := range service.Spec.Ports {
err := util.DeleteConntrack(nodeIP.String(), svcPort.NodePort, svcPort.Protocol, netlink.ConntrackOrigDstIP)
err := util.DeleteConntrack(nodeIP.String(), svcPort.NodePort, svcPort.Protocol, netlink.ConntrackOrigDstIP, nil)
if err != nil {
return fmt.Errorf("failed to delete conntrack entry for service %s/%s with nodeIP %s, nodePort %d, protocol %s: %v",
service.Namespace, service.Name, nodeIP, svcPort.Port, svcPort.Protocol, err)
Expand Down
120 changes: 117 additions & 3 deletions go-controller/pkg/node/node.go
Expand Up @@ -541,7 +541,16 @@ func (n *OvnNode) Start(ctx context.Context, wg *sync.WaitGroup) error {
go wait.Until(func() {
checkForStaleOVSInterfaces(n.name, n.watchFactory.(*factory.WatchFactory))
}, time.Minute, n.stopChan)
err := n.WatchEndpoints()
util.SetARPTimeout()
err := n.WatchNamespaces()
if err != nil {
return fmt.Errorf("failed to watch namespaces: %w", err)
}
// every minute cleanup stale conntrack entries if any
go wait.Until(func() {
n.checkAndDeleteStaleConntrackEntries()
}, time.Minute*1, n.stopChan)
err = n.WatchEndpoints()
if err != nil {
return fmt.Errorf("failed to watch endpoints: %w", err)
}
Expand Down Expand Up @@ -577,7 +586,7 @@ func (n *OvnNode) WatchEndpoints() error {
newEpAddressMap := buildEndpointAddressMap(epNew.Subsets)
for item := range buildEndpointAddressMap(epOld.Subsets) {
if _, ok := newEpAddressMap[item]; !ok && item.protocol == kapi.ProtocolUDP { // flush conntrack only for UDP
err := util.DeleteConntrack(item.ip, item.port, item.protocol, netlink.ConntrackReplyAnyIP)
err := util.DeleteConntrack(item.ip, item.port, item.protocol, netlink.ConntrackReplyAnyIP, nil)
if err != nil {
klog.Errorf("Failed to delete conntrack entry for %s: %v", item.ip, err)
}
Expand All @@ -588,7 +597,7 @@ func (n *OvnNode) WatchEndpoints() error {
ep := obj.(*kapi.Endpoints)
for item := range buildEndpointAddressMap(ep.Subsets) {
if item.protocol == kapi.ProtocolUDP { // flush conntrack only for UDP
err := util.DeleteConntrack(item.ip, item.port, item.protocol, netlink.ConntrackReplyAnyIP)
err := util.DeleteConntrack(item.ip, item.port, item.protocol, netlink.ConntrackReplyAnyIP, nil)
if err != nil {
klog.Errorf("Failed to delete conntrack entry for %s: %v", item.ip, err)
}
Expand All @@ -599,6 +608,111 @@ func (n *OvnNode) WatchEndpoints() error {
return err
}

func exGatewayPodsAnnotationsChanged(oldNs, newNs *kapi.Namespace) bool {
// In reality we only care about exgw pod deletions, however since the list of IPs is not expected to change
// that often, let's check for *any* changes to these annotations compared to their previous state and trigger
// the logic for checking if we need to delete any conntrack entries
return (oldNs.Annotations[util.ExternalGatewayPodIPsAnnotation] != newNs.Annotations[util.ExternalGatewayPodIPsAnnotation]) ||
(oldNs.Annotations[util.RoutingExternalGWsAnnotation] != newNs.Annotations[util.RoutingExternalGWsAnnotation])
}

func (n *OvnNode) checkAndDeleteStaleConntrackEntries() {
namespaces, err := n.watchFactory.GetNamespaces()
if err != nil {
klog.Errorf("Unable to get pods from informer: %v", err)
}
for _, namespace := range namespaces {
_, foundRoutingExternalGWsAnnotation := namespace.Annotations[util.RoutingExternalGWsAnnotation]
_, foundExternalGatewayPodIPsAnnotation := namespace.Annotations[util.ExternalGatewayPodIPsAnnotation]
if foundRoutingExternalGWsAnnotation || foundExternalGatewayPodIPsAnnotation {
pods, err := n.watchFactory.GetPods(namespace.Name)
if err != nil {
klog.Warningf("Unable to get pods from informer for namespace %s: %v", namespace.Name, err)
}
if len(pods) > 0 || err != nil {
// we only need to proceed if there is at least one pod in this namespace on this node
// OR if we couldn't fetch the pods for some reason at this juncture
n.checkAndDeleteStaleConntrackEntriesForNamespace(namespace)
}
}
}
}

func (n *OvnNode) checkAndDeleteStaleConntrackEntriesForNamespace(newNs *kapi.Namespace) {
// loop through all the IPs on the annotations; ARP for their MACs and form an allowlist
gatewayIPs := strings.Split(newNs.Annotations[util.ExternalGatewayPodIPsAnnotation], ",")
gatewayIPs = append(gatewayIPs, strings.Split(newNs.Annotations[util.RoutingExternalGWsAnnotation], ",")...)
var wg sync.WaitGroup
wg.Add(len(gatewayIPs))
validMACs := sync.Map{}
for _, gwIP := range gatewayIPs {
go func(gwIP string) {
defer wg.Done()
if len(gwIP) > 0 {
if hwAddr, err := util.GetMACAddressFromARP(net.ParseIP(gwIP)); err != nil {
klog.Errorf("Failed to lookup hardware address for gatewayIP %s: %v", gwIP, err)
} else if len(hwAddr) > 0 {
// we need to reverse the mac before passing it to the conntrack filter since OVN saves the MAC in the following format
// +------------------------------------------------------------ +
// | 128 ... 112 ... 96 ... 80 ... 64 ... 48 ... 32 ... 16 ... 0|
// +------------------+-------+--------------------+-------------|
// | | UNUSED| MAC ADDRESS | UNUSED |
// +------------------+-------+--------------------+-------------+
for i, j := 0, len(hwAddr)-1; i < j; i, j = i+1, j-1 {
hwAddr[i], hwAddr[j] = hwAddr[j], hwAddr[i]
}
validMACs.Store(gwIP, []byte(hwAddr))
}
}
}(gwIP)
}
wg.Wait()

validNextHopMACs := [][]byte{}
validMACs.Range(func(key interface{}, value interface{}) bool {
validNextHopMACs = append(validNextHopMACs, value.([]byte))
return true
})
// Handle corner case where there are 0 IPs on the annotations OR none of the ARPs were successful; i.e allowMACList={empty}.
// This means we *need to* pass a label > 128 bits that will not match on any conntrack entry labels for these pods.
// That way any remaining entries with labels having MACs set will get purged.
if len(validNextHopMACs) == 0 {
validNextHopMACs = append(validNextHopMACs, []byte("does-not-contain-anything"))
}

pods, err := n.watchFactory.GetPods(newNs.Name)
if err != nil {
klog.Errorf("Unable to get pods from informer: %v", err)
}
for _, pod := range pods {
pod := pod
podIPs, err := util.GetAllPodIPs(pod)
if err != nil {
klog.Errorf("Unable to fetch IP for pod %s/%s: %v", pod.Namespace, pod.Name, err)
}
for _, podIP := range podIPs { // flush conntrack only for UDP
// for this pod, we check if the conntrack entry has a label that is not in the provided allowlist of MACs
// only caveat here is we assume egressGW served pods shouldn't have conntrack entries with other labels set
err := util.DeleteConntrack(podIP.String(), 0, kapi.ProtocolUDP, netlink.ConntrackOrigDstIP, validNextHopMACs)
if err != nil {
klog.Errorf("Failed to delete conntrack entry for pod %s: %v", podIP.String(), err)
}
}
}
}

func (n *OvnNode) WatchNamespaces() error {
_, err := n.watchFactory.AddNamespaceHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, new interface{}) {
oldNs, newNs := old.(*kapi.Namespace), new.(*kapi.Namespace)
if exGatewayPodsAnnotationsChanged(oldNs, newNs) {
n.checkAndDeleteStaleConntrackEntriesForNamespace(newNs)
}
},
}, nil)
return err
}

// validateVTEPInterfaceMTU checks if the MTU of the interface that has ovn-encap-ip is big
// enough to carry the `config.Default.MTU` and the Geneve header. If the MTU is not big
// enough, it will return an error
Expand Down

0 comments on commit f78c8b6

Please sign in to comment.