From 028d6ba537478c884d6276457faaec9c203ef45b Mon Sep 17 00:00:00 2001 From: Casey Callendrello Date: Mon, 7 Jun 2021 16:07:09 +0200 Subject: [PATCH] Network Policy: Process local pods in bulk This adds a factory sync function for network policy that bulk-sets all pods on policy create. This means that creating a network policy that selects a large number of pods is much more efficient. Additionally, this means that ovn-kube startup time for large policies should be much faster. Signed-off-by: Casey Callendrello --- go-controller/pkg/ovn/ovn.go | 1 + go-controller/pkg/ovn/policy.go | 198 ++++++++++++++++++++++++++------ 2 files changed, 163 insertions(+), 36 deletions(-) diff --git a/go-controller/pkg/ovn/ovn.go b/go-controller/pkg/ovn/ovn.go index bc4b4f66e8f..b57c1425a5c 100644 --- a/go-controller/pkg/ovn/ovn.go +++ b/go-controller/pkg/ovn/ovn.go @@ -301,6 +301,7 @@ func (oc *Controller) Run(wg *sync.WaitGroup, nodeName string) error { oc.WatchPods() + // WatchNetworkPolicy depends on WatchPods and WatchNamespaces oc.WatchNetworkPolicy() if config.OVNKubernetesFeature.EnableEgressIP { diff --git a/go-controller/pkg/ovn/policy.go b/go-controller/pkg/ovn/policy.go index cbe67a6bd6e..9ceaac89265 100644 --- a/go-controller/pkg/ovn/policy.go +++ b/go-controller/pkg/ovn/policy.go @@ -566,8 +566,12 @@ func podDeleteAllowMulticastPolicy(ovnNBClient goovn.Client, ns string, portInfo return deleteFromPortGroup(ovnNBClient, hashedPortGroup(ns), portInfo) } +// localPodAddDefaultDeny ensures ports (i.e. pods) are in the correct +// default-deny portgroups. Whether or not pods are in default-deny depends +// on whether or not any policies select this pod, so there is a reference +// count to ensure we don't accidentally open up a pod. func (oc *Controller) localPodAddDefaultDeny(nsInfo *namespaceInfo, - policy *knet.NetworkPolicy, portInfo *lpInfo) { + policy *knet.NetworkPolicy, ports ...*lpInfo) { oc.lspMutex.Lock() // Default deny rule. @@ -581,89 +585,140 @@ func (oc *Controller) localPodAddDefaultDeny(nsInfo *namespaceInfo, // the PolicyTypes has 'egress' in it, we add a default // egress deny rule. - addIngressDeny := false - addEgressDeny := false + addIngressPorts := []*lpInfo{} + addEgressPorts := []*lpInfo{} // Handle condition 1 above. if !(len(policy.Spec.PolicyTypes) == 1 && policy.Spec.PolicyTypes[0] == knet.PolicyTypeEgress) { - if oc.lspIngressDenyCache[portInfo.name] == 0 { - addIngressDeny = true + for _, portInfo := range ports { + // if this is the first NP referencing this pod, then we + // need to add it to the port group. + if oc.lspIngressDenyCache[portInfo.name] == 0 { + addIngressPorts = append(addIngressPorts, portInfo) + } + + // increment the reference count. + oc.lspIngressDenyCache[portInfo.name]++ } - oc.lspIngressDenyCache[portInfo.name]++ } // Handle condition 2 above. if (len(policy.Spec.PolicyTypes) == 1 && policy.Spec.PolicyTypes[0] == knet.PolicyTypeEgress) || len(policy.Spec.Egress) > 0 || len(policy.Spec.PolicyTypes) == 2 { - if oc.lspEgressDenyCache[portInfo.name] == 0 { - addEgressDeny = true + for _, portInfo := range ports { + if oc.lspEgressDenyCache[portInfo.name] == 0 { + // again, reference count is 0, so add to port + addEgressPorts = append(addEgressPorts, portInfo) + } + + // bump reference count + oc.lspEgressDenyCache[portInfo.name]++ } - oc.lspEgressDenyCache[portInfo.name]++ } // we're done with the lsp cache - release the lock before transacting oc.lspMutex.Unlock() - if addIngressDeny { - if err := addToPortGroup(oc.ovnNBClient, nsInfo.portGroupIngressDenyName, portInfo); err != nil { - klog.Warningf("Failed to add port %s to ingress deny ACL: %v", portInfo.name, err) + // Generate a single OVN transaction that adds all ports to the + // appropriate port groups. + commands := make([]*goovn.OvnCommand, 0, len(addIngressPorts)+len(addEgressPorts)) + + for _, portInfo := range addIngressPorts { + cmd, err := oc.ovnNBClient.PortGroupAddPort(nsInfo.portGroupIngressDenyName, portInfo.uuid) + if err != nil { + klog.Warningf("Failed to create command: add port %s to ingress deny portgroup %s: %v", + portInfo.name, nsInfo.portGroupIngressDenyName, err) + continue } + commands = append(commands, cmd) } - if addEgressDeny { - if err := addToPortGroup(oc.ovnNBClient, nsInfo.portGroupEgressDenyName, portInfo); err != nil { - klog.Warningf("Failed to add port %s to egress deny ACL: %v", portInfo.name, err) + for _, portInfo := range addEgressPorts { + cmd, err := oc.ovnNBClient.PortGroupAddPort(nsInfo.portGroupEgressDenyName, portInfo.uuid) + if err != nil { + klog.Warningf("Failed to create command: add port %s to egress deny portgroup %s: %v", + portInfo.name, nsInfo.portGroupEgressDenyName, err) + continue } + commands = append(commands, cmd) } + err := oc.ovnNBClient.Execute(commands...) + if err != nil { + klog.Warningf("Failed to execute add-to-default-deny-portgroup transaction: %v", err) + } } +// localPodDelDefaultDeny decrements a pod's policy reference count and removes a pod +// from the default-deny portgroups if the reference count for the pod is 0 func (oc *Controller) localPodDelDefaultDeny( - np *networkPolicy, nsInfo *namespaceInfo, portInfo *lpInfo) { + np *networkPolicy, nsInfo *namespaceInfo, ports ...*lpInfo) { oc.lspMutex.Lock() - deleteFromIngress := false - deleteFromEgress := false + delIngressPorts := []*lpInfo{} + delEgressPorts := []*lpInfo{} // Remove port from ingress deny port-group for [Ingress] and [ingress,egress] PolicyTypes // If NOT [egress] PolicyType if !(len(np.policyTypes) == 1 && np.policyTypes[0] == knet.PolicyTypeEgress) { - if oc.lspIngressDenyCache[portInfo.name] > 0 { - oc.lspIngressDenyCache[portInfo.name]-- - if oc.lspIngressDenyCache[portInfo.name] == 0 { - deleteFromIngress = true - delete(oc.lspIngressDenyCache, portInfo.name) + for _, portInfo := range ports { + if oc.lspIngressDenyCache[portInfo.name] > 0 { + oc.lspIngressDenyCache[portInfo.name]-- + if oc.lspIngressDenyCache[portInfo.name] == 0 { + delIngressPorts = append(delIngressPorts, portInfo) + delete(oc.lspIngressDenyCache, portInfo.name) + } } } } + // Remove port from egress deny port group for [egress] and [ingress,egress] PolicyTypes // if [egress] PolicyType OR there are any egress rules OR [ingress,egress] PolicyType if (len(np.policyTypes) == 1 && np.policyTypes[0] == knet.PolicyTypeEgress) || len(np.egressPolicies) > 0 || len(np.policyTypes) == 2 { - if oc.lspEgressDenyCache[portInfo.name] > 0 { - oc.lspEgressDenyCache[portInfo.name]-- - if oc.lspEgressDenyCache[portInfo.name] == 0 { - deleteFromEgress = true - delete(oc.lspEgressDenyCache, portInfo.name) + for _, portInfo := range ports { + if oc.lspEgressDenyCache[portInfo.name] > 0 { + oc.lspEgressDenyCache[portInfo.name]-- + if oc.lspEgressDenyCache[portInfo.name] == 0 { + delEgressPorts = append(delEgressPorts, portInfo) + delete(oc.lspEgressDenyCache, portInfo.name) + } } } } oc.lspMutex.Unlock() - if deleteFromIngress { - if err := deleteFromPortGroup(oc.ovnNBClient, nsInfo.portGroupIngressDenyName, portInfo); err != nil { - klog.Warningf("Failed to remove port %s from ingress deny ACL: %v", portInfo.name, err) + commands := make([]*goovn.OvnCommand, 0, len(delIngressPorts)+len(delEgressPorts)) + + for _, portInfo := range delIngressPorts { + cmd, err := oc.ovnNBClient.PortGroupRemovePort(nsInfo.portGroupIngressDenyName, portInfo.uuid) + if err != nil { + klog.Warningf("Failed to create command: remove port %s from ingress deny portgroup %s: %v", + portInfo.name, nsInfo.portGroupIngressDenyName, err) + continue } + commands = append(commands, cmd) } - if deleteFromEgress { - if err := deleteFromPortGroup(oc.ovnNBClient, nsInfo.portGroupEgressDenyName, portInfo); err != nil { - klog.Warningf("Failed to remove port %s from egress deny ACL: %v", portInfo.name, err) + for _, portInfo := range delEgressPorts { + cmd, err := oc.ovnNBClient.PortGroupRemovePort(nsInfo.portGroupEgressDenyName, portInfo.uuid) + if err != nil { + klog.Warningf("Failed to create command: remove port %s from egress deny portgroup %s: %v", + portInfo.name, nsInfo.portGroupEgressDenyName, err) + continue } + commands = append(commands, cmd) } + err := oc.ovnNBClient.Execute(commands...) + if err != nil { + klog.Warningf("Failed to execute add-to-default-deny-portgroup transaction: %v", err) + } } +// handleLocalPodSelectorAddFunc adds a new pod to an existing NetworkPolicy +// +// THIS MUST BE KEPT CONSISTENT WITH handleLocalPodSelectorSetPods! func (oc *Controller) handleLocalPodSelectorAddFunc( policy *knet.NetworkPolicy, np *networkPolicy, nsInfo *namespaceInfo, obj interface{}) { @@ -708,6 +763,72 @@ func (oc *Controller) handleLocalPodSelectorAddFunc( np.localPods.Store(logicalPort, portInfo) } +// handleLocalPodSelectorSetPods is a more efficient way of +// bulk-setting the local pods in a newly-created network policy +// +// THIS MUST BE KEPT CONSISTENT WITH AddPod! +func (oc *Controller) handleLocalPodSelectorSetPods( + policy *knet.NetworkPolicy, np *networkPolicy, nsInfo *namespaceInfo, + objs []interface{}) { + + // Take the write lock since this is called once and we will want to bulk-update + // localPods + np.Lock() + defer np.Unlock() + if np.deleted { + return + } + + klog.Infof("Setting NetworkPolicy %s/%s to have %d local pods...", + np.namespace, np.name, len(objs)) + + // get list of pods and their logical ports to add + // theoretically this should never filter any pods but it's always good to be + // paranoid. + portsToAdd := make([]*lpInfo, 0, len(objs)) + for _, obj := range objs { + pod := obj.(*kapi.Pod) + + if pod.Spec.NodeName == "" { + continue + } + + portInfo, err := oc.logicalPortCache.get(podLogicalPortName(pod)) + // pod is not yet handled + // no big deal, we'll get the update when it is. + if err != nil { + continue + } + + // this pod is somehow already added to this policy, then skip + if _, ok := np.localPods.Load(portInfo.name); ok { + continue + } + + portsToAdd = append(portsToAdd, portInfo) + } + + // add all ports to default deny + oc.localPodAddDefaultDeny(nsInfo, policy, portsToAdd...) + + if np.portGroupUUID == "" { + return + } + + err := setPortGroup(oc.ovnNBClient, np.portGroupName, portsToAdd...) + if err != nil { + klog.Errorf("Failed to set ports in PortGroup for network policy %s/%s: %v", np.namespace, np.name, err) + } + + for _, portInfo := range portsToAdd { + np.localPods.Store(portInfo.name, portInfo) + } + + klog.Infof("Done setting NetworkPolicy %s/%s local pods", + np.namespace, np.name) + +} + func (oc *Controller) handleLocalPodSelectorDelFunc( policy *knet.NetworkPolicy, np *networkPolicy, nsInfo *namespaceInfo, obj interface{}) { @@ -765,7 +886,9 @@ func (oc *Controller) handleLocalPodSelector( UpdateFunc: func(oldObj, newObj interface{}) { oc.handleLocalPodSelectorAddFunc(policy, np, nsInfo, newObj) }, - }, nil) + }, func(objs []interface{}) { + oc.handleLocalPodSelectorSetPods(policy, np, nsInfo, objs) + }) np.podHandlerList = append(np.podHandlerList, h) } @@ -971,12 +1094,15 @@ func (oc *Controller) destroyNetworkPolicy(np *networkPolicy, nsInfo *namespaceI np.deleted = true oc.shutdownHandlers(np) + ports := []*lpInfo{} np.localPods.Range(func(_, value interface{}) bool { portInfo := value.(*lpInfo) - oc.localPodDelDefaultDeny(np, nsInfo, portInfo) + ports = append(ports, portInfo) return true }) + oc.localPodDelDefaultDeny(np, nsInfo, ports...) + if len(nsInfo.networkPolicies) == 0 { err := deletePortGroup(oc.ovnNBClient, nsInfo.portGroupIngressDenyName) if err != nil {