Skip to content

Commit

Permalink
Merge pull request openshift#241 from danwinship/revert-networkpolicy…
Browse files Browse the repository at this point in the history
…-perf-fixes-46

Bug 1915007: Revert NetworkPolicy performance fixes (4.6)
  • Loading branch information
openshift-merge-robot committed Jan 11, 2021
2 parents adcf1b2 + be75f6f commit 7106dab
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 143 deletions.
158 changes: 63 additions & 95 deletions pkg/network/node/networkpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
networkv1 "github.com/openshift/api/network/v1"
"github.com/openshift/library-go/pkg/network/networkutils"
"github.com/openshift/sdn/pkg/network/common"
"github.com/openshift/sdn/pkg/network/node/ovs"
)

type networkPolicyPlugin struct {
Expand All @@ -44,18 +43,16 @@ type networkPolicyPlugin struct {
namespaces map[uint32]*npNamespace
// nsMatchCache caches matches for namespaceSelectors; see selectNamespaceInternal
nsMatchCache map[string]*npCacheEntry

pods map[ktypes.UID]corev1.Pod
}

// npNamespace tracks NetworkPolicy-related data for a Namespace
type npNamespace struct {
name string
vnid uint32
inUse bool

// mustRecalculate is true if we need to recalculate policy .flows/.selectedIPs
mustRecalculate bool
// mustSync is true if we need to push updated flows to OVS
mustSync bool
dirty bool

labels map[string]string
policies map[ktypes.UID]*npPolicy
Expand All @@ -68,8 +65,7 @@ type npNamespace struct {
type npPolicy struct {
policy networkingv1.NetworkPolicy
watchesNamespaces bool
watchesAllPods bool
watchesOwnPods bool
watchesPods bool

flows []string
selectedIPs []string
Expand All @@ -81,10 +77,18 @@ type npCacheEntry struct {
matches map[string]uint32
}

type refreshForType string

const (
refreshForPods refreshForType = "pods"
refreshForNamespaces refreshForType = "namespaces"
)

func NewNetworkPolicyPlugin() osdnPolicy {
return &networkPolicyPlugin{
namespaces: make(map[uint32]*npNamespace),
namespacesByName: make(map[string]*npNamespace),
pods: make(map[ktypes.UID]corev1.Pod),

nsMatchCache: make(map[string]*npCacheEntry),
}
Expand Down Expand Up @@ -196,7 +200,7 @@ func (np *networkPolicyPlugin) AddNetNamespace(netns *networkv1.NetNamespace) {
npns.gotNetNamespace = true
if npns.gotNamespace {
np.updateMatchCache(npns)
np.refreshNamespaceNetworkPolicies()
np.refreshNetworkPolicies(refreshForNamespaces)
}
}

Expand All @@ -219,8 +223,9 @@ func (np *networkPolicyPlugin) DeleteNetNamespace(netns *networkv1.NetNamespace)

if npns.inUse {
npns.inUse = false
// This needs to happen before we forget about the namespace.
np.syncNamespaceImmediately(npns)
// We call syncNamespaceFlows() not syncNamespace() because it
// needs to happen before we forget about the namespace.
np.syncNamespaceFlows(npns)
}
delete(np.namespaces, netns.NetID)
npns.gotNetNamespace = false
Expand All @@ -244,56 +249,27 @@ func (np *networkPolicyPlugin) GetMulticastEnabled(vnid uint32) bool {
}

func (np *networkPolicyPlugin) syncNamespace(npns *npNamespace) {
if !npns.mustSync {
npns.mustSync = true
if !npns.dirty {
npns.dirty = true
np.runner.Run()
}
}

func (np *networkPolicyPlugin) syncNamespaceImmediately(npns *npNamespace) {
otx := np.node.oc.NewTransaction()
np.generateNamespaceFlows(otx, npns)
if err := otx.Commit(); err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing OVS flows for namespace %q: %v", npns.name, err))
}
}

// This is the entry point for the BoundedFrequencyRunner
func (np *networkPolicyPlugin) syncFlows() {
np.lock.Lock()
defer np.lock.Unlock()

np.recalculate()

// Push internal data to OVS (for namespaces that have changed)
otx := np.node.oc.NewTransaction()
for _, npns := range np.namespaces {
if npns.mustSync {
np.generateNamespaceFlows(otx, npns)
npns.mustSync = false
}
}
if err := otx.Commit(); err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing OVS flows: %v", err))
}
}

// Update internal data to reflect recent pod/namespace changes
func (np *networkPolicyPlugin) recalculate() {
for _, npns := range np.namespaces {
if npns.mustRecalculate {
for _, npp := range npns.policies {
if np.updateNetworkPolicy(npns, &npp.policy) {
npns.mustSync = true
}
}
npns.mustRecalculate = false
if npns.dirty {
np.syncNamespaceFlows(npns)
npns.dirty = false
}
}
}

func (np *networkPolicyPlugin) generateNamespaceFlows(otx ovs.Transaction, npns *npNamespace) {
func (np *networkPolicyPlugin) syncNamespaceFlows(npns *npNamespace) {
klog.V(5).Infof("syncNamespace %d", npns.vnid)
otx := np.node.oc.NewTransaction()
otx.DeleteFlows("table=80, reg1=%d", npns.vnid)
if npns.inUse {
allPodsSelected := false
Expand Down Expand Up @@ -331,6 +307,9 @@ func (np *networkPolicyPlugin) generateNamespaceFlows(otx ovs.Transaction, npns
otx.AddFlow("table=80, priority=50, reg1=%d, actions=output:NXM_NX_REG2[]", npns.vnid)
}
}
if err := otx.Commit(); err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing OVS flows for VNID: %v", err))
}
}

func (np *networkPolicyPlugin) EnsureVNIDRules(vnid uint32) {
Expand Down Expand Up @@ -422,17 +401,13 @@ func (np *networkPolicyPlugin) selectPodsFromNamespaces(nsLabelSel, podLabelSel
return nil
}

nsLister := np.node.kubeInformers.Core().V1().Pods().Lister()
for namespace, vnid := range np.selectNamespacesInternal(nsSel) {
pods, err := nsLister.Pods(namespace).List(podSel)
if err != nil {
// Shouldn't happen
utilruntime.HandleError(fmt.Errorf("Could not find matching pods in namespace %q: %v", namespace, err))
continue
}
for _, pod := range pods {
namespaces := np.selectNamespacesInternal(nsSel)
for _, pod := range np.pods {
vnid, exists := namespaces[pod.Namespace]
if exists && podSel.Matches(labels.Set(pod.Labels)) {
peerFlows = append(peerFlows, fmt.Sprintf("reg0=%d, ip, nw_src=%s, ", vnid, pod.Status.PodIP))
}

}

return peerFlows
Expand Down Expand Up @@ -462,15 +437,10 @@ func (np *networkPolicyPlugin) selectPods(npns *npNamespace, lsel *metav1.LabelS
utilruntime.HandleError(fmt.Errorf("ValidateNetworkPolicy() failure! Invalid PodSelector: %v", err))
return ips
}

pods, err := np.node.kubeInformers.Core().V1().Pods().Lister().Pods(npns.name).List(sel)
if err != nil {
// Shouldn't happen
utilruntime.HandleError(fmt.Errorf("Could not find matching pods in namespace %q: %v", npns.name, err))
return ips
}
for _, pod := range pods {
ips = append(ips, pod.Status.PodIP)
for _, pod := range np.pods {
if (npns.name == pod.Namespace) && sel.Matches(labels.Set(pod.Labels)) {
ips = append(ips, pod.Status.PodIP)
}
}
return ips
}
Expand Down Expand Up @@ -499,7 +469,7 @@ func (np *networkPolicyPlugin) parseNetworkPolicy(npns *npNamespace, policy *net

var destFlows []string
if len(policy.Spec.PodSelector.MatchLabels) > 0 || len(policy.Spec.PodSelector.MatchExpressions) > 0 {
npp.watchesOwnPods = true
npp.watchesPods = true
npp.selectedIPs = np.selectPods(npns, &policy.Spec.PodSelector)
for _, ip := range npp.selectedIPs {
destFlows = append(destFlows, fmt.Sprintf("ip, nw_dst=%s, ", ip))
Expand Down Expand Up @@ -547,7 +517,7 @@ func (np *networkPolicyPlugin) parseNetworkPolicy(npns *npNamespace, policy *net
// The PodSelector is empty, meaning it selects all pods in this namespace
peerFlows = append(peerFlows, fmt.Sprintf("reg0=%d, ", npns.vnid))
} else {
npp.watchesOwnPods = true
npp.watchesPods = true
for _, ip := range np.selectPods(npns, peer.PodSelector) {
peerFlows = append(peerFlows, fmt.Sprintf("reg0=%d, ip, nw_src=%s, ", npns.vnid, ip))
}
Expand All @@ -562,7 +532,7 @@ func (np *networkPolicyPlugin) parseNetworkPolicy(npns *npNamespace, policy *net
}
} else {
npp.watchesNamespaces = true
npp.watchesAllPods = true
npp.watchesPods = true
peerFlows = append(peerFlows, np.selectPodsFromNamespaces(peer.NamespaceSelector, peer.PodSelector)...)
}

Expand Down Expand Up @@ -671,7 +641,7 @@ func (np *networkPolicyPlugin) watchPods() {
np.node.kubeInformers.Core().V1().Pods().Informer().AddEventHandler(funcs)
}

func (np *networkPolicyPlugin) handleAddOrUpdatePod(obj, old interface{}, eventType watch.EventType) {
func (np *networkPolicyPlugin) handleAddOrUpdatePod(obj, _ interface{}, eventType watch.EventType) {
pod := obj.(*corev1.Pod)
klog.V(5).Infof("Watch %s event for Pod %q", eventType, getPodFullName(pod))

Expand All @@ -684,27 +654,35 @@ func (np *networkPolicyPlugin) handleAddOrUpdatePod(obj, old interface{}, eventT
return
}

if old != nil {
oldPod := old.(*corev1.Pod)
if oldPod.Status.PodIP == pod.Status.PodIP && reflect.DeepEqual(oldPod.Labels, pod.Labels) {
return
}
// We don't want to grab np.Lock for every Pod.Status change...
// But it's safe to look up oldPod without locking here because no other
// threads modify this map.
oldPod, podExisted := np.pods[pod.UID]
if podExisted && oldPod.Status.PodIP == pod.Status.PodIP && reflect.DeepEqual(oldPod.Labels, pod.Labels) {
return
}

np.lock.Lock()
defer np.lock.Unlock()

np.refreshPodNetworkPolicies(pod)
np.pods[pod.UID] = *pod
np.refreshNetworkPolicies(refreshForPods)
}

func (np *networkPolicyPlugin) handleDeletePod(obj interface{}) {
pod := obj.(*corev1.Pod)
klog.V(5).Infof("Watch %s event for Pod %q", watch.Deleted, getPodFullName(pod))

_, podExisted := np.pods[pod.UID]
if !podExisted {
return
}

np.lock.Lock()
defer np.lock.Unlock()

np.refreshPodNetworkPolicies(pod)
delete(np.pods, pod.UID)
np.refreshNetworkPolicies(refreshForPods)
}

func (np *networkPolicyPlugin) watchNamespaces() {
Expand Down Expand Up @@ -733,7 +711,7 @@ func (np *networkPolicyPlugin) handleAddOrUpdateNamespace(obj, _ interface{}, ev
npns.gotNamespace = true
if npns.gotNetNamespace {
np.updateMatchCache(npns)
np.refreshNamespaceNetworkPolicies()
np.refreshNetworkPolicies(refreshForNamespaces)
}
}

Expand All @@ -758,28 +736,18 @@ func (np *networkPolicyPlugin) handleDeleteNamespace(obj interface{}) {
np.updateMatchCache(npns)
}

func (np *networkPolicyPlugin) refreshNamespaceNetworkPolicies() {
func (np *networkPolicyPlugin) refreshNetworkPolicies(refreshFor refreshForType) {
for _, npns := range np.namespaces {
changed := false
for _, npp := range npns.policies {
if npp.watchesNamespaces {
npns.mustRecalculate = true
}
}
if npns.mustRecalculate && npns.inUse {
np.syncNamespace(npns)
}
}
}

func (np *networkPolicyPlugin) refreshPodNetworkPolicies(pod *corev1.Pod) {
podNs := np.namespacesByName[pod.Namespace]
for _, npns := range np.namespaces {
for _, npp := range npns.policies {
if (npp.watchesOwnPods && npns == podNs) || npp.watchesAllPods {
npns.mustRecalculate = true
if ((refreshFor == refreshForNamespaces) && npp.watchesNamespaces) ||
((refreshFor == refreshForPods) && npp.watchesPods) {
if np.updateNetworkPolicy(npns, &npp.policy) {
changed = true
}
}
}
if npns.mustRecalculate && npns.inUse {
if changed && npns.inUse {
np.syncNamespace(npns)
}
}
Expand Down

0 comments on commit 7106dab

Please sign in to comment.