Skip to content

Commit

Permalink
Network Policy: Process local pods in bulk
Browse files Browse the repository at this point in the history
This adds a factory sync function for network policy that bulk-sets all
pods on policy create. This means that creating a network policy that
selects a large number of pods is much more efficient.

Additionally, this means that ovn-kube startup time for large policies
should be much faster.

Signed-off-by: Casey Callendrello <cdc@redhat.com>
  • Loading branch information
squeed committed Jun 7, 2021
1 parent ced8a1f commit 028d6ba
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 36 deletions.
1 change: 1 addition & 0 deletions go-controller/pkg/ovn/ovn.go
Expand Up @@ -301,6 +301,7 @@ func (oc *Controller) Run(wg *sync.WaitGroup, nodeName string) error {

oc.WatchPods()

// WatchNetworkPolicy depends on WatchPods and WatchNamespaces
oc.WatchNetworkPolicy()

if config.OVNKubernetesFeature.EnableEgressIP {
Expand Down
198 changes: 162 additions & 36 deletions go-controller/pkg/ovn/policy.go
Expand Up @@ -566,8 +566,12 @@ func podDeleteAllowMulticastPolicy(ovnNBClient goovn.Client, ns string, portInfo
return deleteFromPortGroup(ovnNBClient, hashedPortGroup(ns), portInfo)
}

// localPodAddDefaultDeny ensures ports (i.e. pods) are in the correct
// default-deny portgroups. Whether or not pods are in default-deny depends
// on whether or not any policies select this pod, so there is a reference
// count to ensure we don't accidentally open up a pod.
func (oc *Controller) localPodAddDefaultDeny(nsInfo *namespaceInfo,
policy *knet.NetworkPolicy, portInfo *lpInfo) {
policy *knet.NetworkPolicy, ports ...*lpInfo) {
oc.lspMutex.Lock()

// Default deny rule.
Expand All @@ -581,89 +585,140 @@ func (oc *Controller) localPodAddDefaultDeny(nsInfo *namespaceInfo,
// the PolicyTypes has 'egress' in it, we add a default
// egress deny rule.

addIngressDeny := false
addEgressDeny := false
addIngressPorts := []*lpInfo{}
addEgressPorts := []*lpInfo{}

// Handle condition 1 above.
if !(len(policy.Spec.PolicyTypes) == 1 && policy.Spec.PolicyTypes[0] == knet.PolicyTypeEgress) {
if oc.lspIngressDenyCache[portInfo.name] == 0 {
addIngressDeny = true
for _, portInfo := range ports {
// if this is the first NP referencing this pod, then we
// need to add it to the port group.
if oc.lspIngressDenyCache[portInfo.name] == 0 {
addIngressPorts = append(addIngressPorts, portInfo)
}

// increment the reference count.
oc.lspIngressDenyCache[portInfo.name]++
}
oc.lspIngressDenyCache[portInfo.name]++
}

// Handle condition 2 above.
if (len(policy.Spec.PolicyTypes) == 1 && policy.Spec.PolicyTypes[0] == knet.PolicyTypeEgress) ||
len(policy.Spec.Egress) > 0 || len(policy.Spec.PolicyTypes) == 2 {
if oc.lspEgressDenyCache[portInfo.name] == 0 {
addEgressDeny = true
for _, portInfo := range ports {
if oc.lspEgressDenyCache[portInfo.name] == 0 {
// again, reference count is 0, so add to port
addEgressPorts = append(addEgressPorts, portInfo)
}

// bump reference count
oc.lspEgressDenyCache[portInfo.name]++
}
oc.lspEgressDenyCache[portInfo.name]++
}

// we're done with the lsp cache - release the lock before transacting
oc.lspMutex.Unlock()

if addIngressDeny {
if err := addToPortGroup(oc.ovnNBClient, nsInfo.portGroupIngressDenyName, portInfo); err != nil {
klog.Warningf("Failed to add port %s to ingress deny ACL: %v", portInfo.name, err)
// Generate a single OVN transaction that adds all ports to the
// appropriate port groups.
commands := make([]*goovn.OvnCommand, 0, len(addIngressPorts)+len(addEgressPorts))

for _, portInfo := range addIngressPorts {
cmd, err := oc.ovnNBClient.PortGroupAddPort(nsInfo.portGroupIngressDenyName, portInfo.uuid)
if err != nil {
klog.Warningf("Failed to create command: add port %s to ingress deny portgroup %s: %v",
portInfo.name, nsInfo.portGroupIngressDenyName, err)
continue
}
commands = append(commands, cmd)
}

if addEgressDeny {
if err := addToPortGroup(oc.ovnNBClient, nsInfo.portGroupEgressDenyName, portInfo); err != nil {
klog.Warningf("Failed to add port %s to egress deny ACL: %v", portInfo.name, err)
for _, portInfo := range addEgressPorts {
cmd, err := oc.ovnNBClient.PortGroupAddPort(nsInfo.portGroupEgressDenyName, portInfo.uuid)
if err != nil {
klog.Warningf("Failed to create command: add port %s to egress deny portgroup %s: %v",
portInfo.name, nsInfo.portGroupEgressDenyName, err)
continue
}
commands = append(commands, cmd)
}

err := oc.ovnNBClient.Execute(commands...)
if err != nil {
klog.Warningf("Failed to execute add-to-default-deny-portgroup transaction: %v", err)
}
}

// localPodDelDefaultDeny decrements a pod's policy reference count and removes a pod
// from the default-deny portgroups if the reference count for the pod is 0
func (oc *Controller) localPodDelDefaultDeny(
np *networkPolicy, nsInfo *namespaceInfo, portInfo *lpInfo) {
np *networkPolicy, nsInfo *namespaceInfo, ports ...*lpInfo) {
oc.lspMutex.Lock()

deleteFromIngress := false
deleteFromEgress := false
delIngressPorts := []*lpInfo{}
delEgressPorts := []*lpInfo{}

// Remove port from ingress deny port-group for [Ingress] and [ingress,egress] PolicyTypes
// If NOT [egress] PolicyType
if !(len(np.policyTypes) == 1 && np.policyTypes[0] == knet.PolicyTypeEgress) {
if oc.lspIngressDenyCache[portInfo.name] > 0 {
oc.lspIngressDenyCache[portInfo.name]--
if oc.lspIngressDenyCache[portInfo.name] == 0 {
deleteFromIngress = true
delete(oc.lspIngressDenyCache, portInfo.name)
for _, portInfo := range ports {
if oc.lspIngressDenyCache[portInfo.name] > 0 {
oc.lspIngressDenyCache[portInfo.name]--
if oc.lspIngressDenyCache[portInfo.name] == 0 {
delIngressPorts = append(delIngressPorts, portInfo)
delete(oc.lspIngressDenyCache, portInfo.name)
}
}
}
}

// Remove port from egress deny port group for [egress] and [ingress,egress] PolicyTypes
// if [egress] PolicyType OR there are any egress rules OR [ingress,egress] PolicyType
if (len(np.policyTypes) == 1 && np.policyTypes[0] == knet.PolicyTypeEgress) ||
len(np.egressPolicies) > 0 || len(np.policyTypes) == 2 {
if oc.lspEgressDenyCache[portInfo.name] > 0 {
oc.lspEgressDenyCache[portInfo.name]--
if oc.lspEgressDenyCache[portInfo.name] == 0 {
deleteFromEgress = true
delete(oc.lspEgressDenyCache, portInfo.name)
for _, portInfo := range ports {
if oc.lspEgressDenyCache[portInfo.name] > 0 {
oc.lspEgressDenyCache[portInfo.name]--
if oc.lspEgressDenyCache[portInfo.name] == 0 {
delEgressPorts = append(delEgressPorts, portInfo)
delete(oc.lspEgressDenyCache, portInfo.name)
}
}
}
}
oc.lspMutex.Unlock()

if deleteFromIngress {
if err := deleteFromPortGroup(oc.ovnNBClient, nsInfo.portGroupIngressDenyName, portInfo); err != nil {
klog.Warningf("Failed to remove port %s from ingress deny ACL: %v", portInfo.name, err)
commands := make([]*goovn.OvnCommand, 0, len(delIngressPorts)+len(delEgressPorts))

for _, portInfo := range delIngressPorts {
cmd, err := oc.ovnNBClient.PortGroupRemovePort(nsInfo.portGroupIngressDenyName, portInfo.uuid)
if err != nil {
klog.Warningf("Failed to create command: remove port %s from ingress deny portgroup %s: %v",
portInfo.name, nsInfo.portGroupIngressDenyName, err)
continue
}
commands = append(commands, cmd)
}

if deleteFromEgress {
if err := deleteFromPortGroup(oc.ovnNBClient, nsInfo.portGroupEgressDenyName, portInfo); err != nil {
klog.Warningf("Failed to remove port %s from egress deny ACL: %v", portInfo.name, err)
for _, portInfo := range delEgressPorts {
cmd, err := oc.ovnNBClient.PortGroupRemovePort(nsInfo.portGroupEgressDenyName, portInfo.uuid)
if err != nil {
klog.Warningf("Failed to create command: remove port %s from egress deny portgroup %s: %v",
portInfo.name, nsInfo.portGroupEgressDenyName, err)
continue
}
commands = append(commands, cmd)
}

err := oc.ovnNBClient.Execute(commands...)
if err != nil {
klog.Warningf("Failed to execute add-to-default-deny-portgroup transaction: %v", err)
}
}

// handleLocalPodSelectorAddFunc adds a new pod to an existing NetworkPolicy
//
// THIS MUST BE KEPT CONSISTENT WITH handleLocalPodSelectorSetPods!
func (oc *Controller) handleLocalPodSelectorAddFunc(
policy *knet.NetworkPolicy, np *networkPolicy, nsInfo *namespaceInfo,
obj interface{}) {
Expand Down Expand Up @@ -708,6 +763,72 @@ func (oc *Controller) handleLocalPodSelectorAddFunc(
np.localPods.Store(logicalPort, portInfo)
}

// handleLocalPodSelectorSetPods is a more efficient way of
// bulk-setting the local pods in a newly-created network policy
//
// THIS MUST BE KEPT CONSISTENT WITH AddPod!
func (oc *Controller) handleLocalPodSelectorSetPods(
policy *knet.NetworkPolicy, np *networkPolicy, nsInfo *namespaceInfo,
objs []interface{}) {

// Take the write lock since this is called once and we will want to bulk-update
// localPods
np.Lock()
defer np.Unlock()
if np.deleted {
return
}

klog.Infof("Setting NetworkPolicy %s/%s to have %d local pods...",
np.namespace, np.name, len(objs))

// get list of pods and their logical ports to add
// theoretically this should never filter any pods but it's always good to be
// paranoid.
portsToAdd := make([]*lpInfo, 0, len(objs))
for _, obj := range objs {
pod := obj.(*kapi.Pod)

if pod.Spec.NodeName == "" {
continue
}

portInfo, err := oc.logicalPortCache.get(podLogicalPortName(pod))
// pod is not yet handled
// no big deal, we'll get the update when it is.
if err != nil {
continue
}

// this pod is somehow already added to this policy, then skip
if _, ok := np.localPods.Load(portInfo.name); ok {
continue
}

portsToAdd = append(portsToAdd, portInfo)
}

// add all ports to default deny
oc.localPodAddDefaultDeny(nsInfo, policy, portsToAdd...)

if np.portGroupUUID == "" {
return
}

err := setPortGroup(oc.ovnNBClient, np.portGroupName, portsToAdd...)
if err != nil {
klog.Errorf("Failed to set ports in PortGroup for network policy %s/%s: %v", np.namespace, np.name, err)
}

for _, portInfo := range portsToAdd {
np.localPods.Store(portInfo.name, portInfo)
}

klog.Infof("Done setting NetworkPolicy %s/%s local pods",
np.namespace, np.name)

}

func (oc *Controller) handleLocalPodSelectorDelFunc(
policy *knet.NetworkPolicy, np *networkPolicy, nsInfo *namespaceInfo,
obj interface{}) {
Expand Down Expand Up @@ -765,7 +886,9 @@ func (oc *Controller) handleLocalPodSelector(
UpdateFunc: func(oldObj, newObj interface{}) {
oc.handleLocalPodSelectorAddFunc(policy, np, nsInfo, newObj)
},
}, nil)
}, func(objs []interface{}) {
oc.handleLocalPodSelectorSetPods(policy, np, nsInfo, objs)
})

np.podHandlerList = append(np.podHandlerList, h)
}
Expand Down Expand Up @@ -971,12 +1094,15 @@ func (oc *Controller) destroyNetworkPolicy(np *networkPolicy, nsInfo *namespaceI
np.deleted = true
oc.shutdownHandlers(np)

ports := []*lpInfo{}
np.localPods.Range(func(_, value interface{}) bool {
portInfo := value.(*lpInfo)
oc.localPodDelDefaultDeny(np, nsInfo, portInfo)
ports = append(ports, portInfo)
return true
})

oc.localPodDelDefaultDeny(np, nsInfo, ports...)

if len(nsInfo.networkPolicies) == 0 {
err := deletePortGroup(oc.ovnNBClient, nsInfo.portGroupIngressDenyName)
if err != nil {
Expand Down

0 comments on commit 028d6ba

Please sign in to comment.