Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug 1915007: Revert NetworkPolicy performance fixes (4.6) #241

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
158 changes: 63 additions & 95 deletions pkg/network/node/networkpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
networkv1 "github.com/openshift/api/network/v1"
"github.com/openshift/library-go/pkg/network/networkutils"
"github.com/openshift/sdn/pkg/network/common"
"github.com/openshift/sdn/pkg/network/node/ovs"
)

type networkPolicyPlugin struct {
Expand All @@ -44,18 +43,16 @@ type networkPolicyPlugin struct {
namespaces map[uint32]*npNamespace
// nsMatchCache caches matches for namespaceSelectors; see selectNamespaceInternal
nsMatchCache map[string]*npCacheEntry

pods map[ktypes.UID]corev1.Pod
}

// npNamespace tracks NetworkPolicy-related data for a Namespace
type npNamespace struct {
name string
vnid uint32
inUse bool

// mustRecalculate is true if we need to recalculate policy .flows/.selectedIPs
mustRecalculate bool
// mustSync is true if we need to push updated flows to OVS
mustSync bool
dirty bool

labels map[string]string
policies map[ktypes.UID]*npPolicy
Expand All @@ -68,8 +65,7 @@ type npNamespace struct {
type npPolicy struct {
policy networkingv1.NetworkPolicy
watchesNamespaces bool
watchesAllPods bool
watchesOwnPods bool
watchesPods bool

flows []string
selectedIPs []string
Expand All @@ -81,10 +77,18 @@ type npCacheEntry struct {
matches map[string]uint32
}

type refreshForType string

const (
refreshForPods refreshForType = "pods"
refreshForNamespaces refreshForType = "namespaces"
)

func NewNetworkPolicyPlugin() osdnPolicy {
return &networkPolicyPlugin{
namespaces: make(map[uint32]*npNamespace),
namespacesByName: make(map[string]*npNamespace),
pods: make(map[ktypes.UID]corev1.Pod),

nsMatchCache: make(map[string]*npCacheEntry),
}
Expand Down Expand Up @@ -196,7 +200,7 @@ func (np *networkPolicyPlugin) AddNetNamespace(netns *networkv1.NetNamespace) {
npns.gotNetNamespace = true
if npns.gotNamespace {
np.updateMatchCache(npns)
np.refreshNamespaceNetworkPolicies()
np.refreshNetworkPolicies(refreshForNamespaces)
}
}

Expand All @@ -219,8 +223,9 @@ func (np *networkPolicyPlugin) DeleteNetNamespace(netns *networkv1.NetNamespace)

if npns.inUse {
npns.inUse = false
// This needs to happen before we forget about the namespace.
np.syncNamespaceImmediately(npns)
// We call syncNamespaceFlows() not syncNamespace() because it
// needs to happen before we forget about the namespace.
np.syncNamespaceFlows(npns)
}
delete(np.namespaces, netns.NetID)
npns.gotNetNamespace = false
Expand All @@ -244,56 +249,27 @@ func (np *networkPolicyPlugin) GetMulticastEnabled(vnid uint32) bool {
}

func (np *networkPolicyPlugin) syncNamespace(npns *npNamespace) {
if !npns.mustSync {
npns.mustSync = true
if !npns.dirty {
npns.dirty = true
np.runner.Run()
}
}

func (np *networkPolicyPlugin) syncNamespaceImmediately(npns *npNamespace) {
otx := np.node.oc.NewTransaction()
np.generateNamespaceFlows(otx, npns)
if err := otx.Commit(); err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing OVS flows for namespace %q: %v", npns.name, err))
}
}

// This is the entry point for the BoundedFrequencyRunner
func (np *networkPolicyPlugin) syncFlows() {
np.lock.Lock()
defer np.lock.Unlock()

np.recalculate()

// Push internal data to OVS (for namespaces that have changed)
otx := np.node.oc.NewTransaction()
for _, npns := range np.namespaces {
if npns.mustSync {
np.generateNamespaceFlows(otx, npns)
npns.mustSync = false
}
}
if err := otx.Commit(); err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing OVS flows: %v", err))
}
}

// Update internal data to reflect recent pod/namespace changes
func (np *networkPolicyPlugin) recalculate() {
for _, npns := range np.namespaces {
if npns.mustRecalculate {
for _, npp := range npns.policies {
if np.updateNetworkPolicy(npns, &npp.policy) {
npns.mustSync = true
}
}
npns.mustRecalculate = false
if npns.dirty {
np.syncNamespaceFlows(npns)
npns.dirty = false
}
}
}

func (np *networkPolicyPlugin) generateNamespaceFlows(otx ovs.Transaction, npns *npNamespace) {
func (np *networkPolicyPlugin) syncNamespaceFlows(npns *npNamespace) {
klog.V(5).Infof("syncNamespace %d", npns.vnid)
otx := np.node.oc.NewTransaction()
otx.DeleteFlows("table=80, reg1=%d", npns.vnid)
if npns.inUse {
allPodsSelected := false
Expand Down Expand Up @@ -331,6 +307,9 @@ func (np *networkPolicyPlugin) generateNamespaceFlows(otx ovs.Transaction, npns
otx.AddFlow("table=80, priority=50, reg1=%d, actions=output:NXM_NX_REG2[]", npns.vnid)
}
}
if err := otx.Commit(); err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing OVS flows for VNID: %v", err))
}
}

func (np *networkPolicyPlugin) EnsureVNIDRules(vnid uint32) {
Expand Down Expand Up @@ -422,17 +401,13 @@ func (np *networkPolicyPlugin) selectPodsFromNamespaces(nsLabelSel, podLabelSel
return nil
}

nsLister := np.node.kubeInformers.Core().V1().Pods().Lister()
for namespace, vnid := range np.selectNamespacesInternal(nsSel) {
pods, err := nsLister.Pods(namespace).List(podSel)
if err != nil {
// Shouldn't happen
utilruntime.HandleError(fmt.Errorf("Could not find matching pods in namespace %q: %v", namespace, err))
continue
}
for _, pod := range pods {
namespaces := np.selectNamespacesInternal(nsSel)
for _, pod := range np.pods {
vnid, exists := namespaces[pod.Namespace]
if exists && podSel.Matches(labels.Set(pod.Labels)) {
peerFlows = append(peerFlows, fmt.Sprintf("reg0=%d, ip, nw_src=%s, ", vnid, pod.Status.PodIP))
}

}

return peerFlows
Expand Down Expand Up @@ -462,15 +437,10 @@ func (np *networkPolicyPlugin) selectPods(npns *npNamespace, lsel *metav1.LabelS
utilruntime.HandleError(fmt.Errorf("ValidateNetworkPolicy() failure! Invalid PodSelector: %v", err))
return ips
}

pods, err := np.node.kubeInformers.Core().V1().Pods().Lister().Pods(npns.name).List(sel)
if err != nil {
// Shouldn't happen
utilruntime.HandleError(fmt.Errorf("Could not find matching pods in namespace %q: %v", npns.name, err))
return ips
}
for _, pod := range pods {
ips = append(ips, pod.Status.PodIP)
for _, pod := range np.pods {
if (npns.name == pod.Namespace) && sel.Matches(labels.Set(pod.Labels)) {
ips = append(ips, pod.Status.PodIP)
}
}
return ips
}
Expand Down Expand Up @@ -499,7 +469,7 @@ func (np *networkPolicyPlugin) parseNetworkPolicy(npns *npNamespace, policy *net

var destFlows []string
if len(policy.Spec.PodSelector.MatchLabels) > 0 || len(policy.Spec.PodSelector.MatchExpressions) > 0 {
npp.watchesOwnPods = true
npp.watchesPods = true
npp.selectedIPs = np.selectPods(npns, &policy.Spec.PodSelector)
for _, ip := range npp.selectedIPs {
destFlows = append(destFlows, fmt.Sprintf("ip, nw_dst=%s, ", ip))
Expand Down Expand Up @@ -547,7 +517,7 @@ func (np *networkPolicyPlugin) parseNetworkPolicy(npns *npNamespace, policy *net
// The PodSelector is empty, meaning it selects all pods in this namespace
peerFlows = append(peerFlows, fmt.Sprintf("reg0=%d, ", npns.vnid))
} else {
npp.watchesOwnPods = true
npp.watchesPods = true
for _, ip := range np.selectPods(npns, peer.PodSelector) {
peerFlows = append(peerFlows, fmt.Sprintf("reg0=%d, ip, nw_src=%s, ", npns.vnid, ip))
}
Expand All @@ -562,7 +532,7 @@ func (np *networkPolicyPlugin) parseNetworkPolicy(npns *npNamespace, policy *net
}
} else {
npp.watchesNamespaces = true
npp.watchesAllPods = true
npp.watchesPods = true
peerFlows = append(peerFlows, np.selectPodsFromNamespaces(peer.NamespaceSelector, peer.PodSelector)...)
}

Expand Down Expand Up @@ -671,7 +641,7 @@ func (np *networkPolicyPlugin) watchPods() {
np.node.kubeInformers.Core().V1().Pods().Informer().AddEventHandler(funcs)
}

func (np *networkPolicyPlugin) handleAddOrUpdatePod(obj, old interface{}, eventType watch.EventType) {
func (np *networkPolicyPlugin) handleAddOrUpdatePod(obj, _ interface{}, eventType watch.EventType) {
pod := obj.(*corev1.Pod)
klog.V(5).Infof("Watch %s event for Pod %q", eventType, getPodFullName(pod))

Expand All @@ -684,27 +654,35 @@ func (np *networkPolicyPlugin) handleAddOrUpdatePod(obj, old interface{}, eventT
return
}

if old != nil {
oldPod := old.(*corev1.Pod)
if oldPod.Status.PodIP == pod.Status.PodIP && reflect.DeepEqual(oldPod.Labels, pod.Labels) {
return
}
// We don't want to grab np.Lock for every Pod.Status change...
// But it's safe to look up oldPod without locking here because no other
// threads modify this map.
oldPod, podExisted := np.pods[pod.UID]
if podExisted && oldPod.Status.PodIP == pod.Status.PodIP && reflect.DeepEqual(oldPod.Labels, pod.Labels) {
return
}

np.lock.Lock()
defer np.lock.Unlock()

np.refreshPodNetworkPolicies(pod)
np.pods[pod.UID] = *pod
np.refreshNetworkPolicies(refreshForPods)
}

func (np *networkPolicyPlugin) handleDeletePod(obj interface{}) {
pod := obj.(*corev1.Pod)
klog.V(5).Infof("Watch %s event for Pod %q", watch.Deleted, getPodFullName(pod))

_, podExisted := np.pods[pod.UID]
if !podExisted {
return
}

np.lock.Lock()
defer np.lock.Unlock()

np.refreshPodNetworkPolicies(pod)
delete(np.pods, pod.UID)
np.refreshNetworkPolicies(refreshForPods)
}

func (np *networkPolicyPlugin) watchNamespaces() {
Expand Down Expand Up @@ -733,7 +711,7 @@ func (np *networkPolicyPlugin) handleAddOrUpdateNamespace(obj, _ interface{}, ev
npns.gotNamespace = true
if npns.gotNetNamespace {
np.updateMatchCache(npns)
np.refreshNamespaceNetworkPolicies()
np.refreshNetworkPolicies(refreshForNamespaces)
}
}

Expand All @@ -758,28 +736,18 @@ func (np *networkPolicyPlugin) handleDeleteNamespace(obj interface{}) {
np.updateMatchCache(npns)
}

func (np *networkPolicyPlugin) refreshNamespaceNetworkPolicies() {
func (np *networkPolicyPlugin) refreshNetworkPolicies(refreshFor refreshForType) {
for _, npns := range np.namespaces {
changed := false
for _, npp := range npns.policies {
if npp.watchesNamespaces {
npns.mustRecalculate = true
}
}
if npns.mustRecalculate && npns.inUse {
np.syncNamespace(npns)
}
}
}

func (np *networkPolicyPlugin) refreshPodNetworkPolicies(pod *corev1.Pod) {
podNs := np.namespacesByName[pod.Namespace]
for _, npns := range np.namespaces {
for _, npp := range npns.policies {
if (npp.watchesOwnPods && npns == podNs) || npp.watchesAllPods {
npns.mustRecalculate = true
if ((refreshFor == refreshForNamespaces) && npp.watchesNamespaces) ||
((refreshFor == refreshForPods) && npp.watchesPods) {
if np.updateNetworkPolicy(npns, &npp.policy) {
changed = true
}
}
}
if npns.mustRecalculate && npns.inUse {
if changed && npns.inUse {
np.syncNamespace(npns)
}
}
Expand Down