Skip to content

Commit

Permalink
Merge pull request #226 from danwinship/networkpolicy-perf
Browse files Browse the repository at this point in the history
Bug 1896958: NetworkPolicy performance (pod caching)
  • Loading branch information
openshift-merge-robot committed Dec 3, 2020
2 parents 060eaa2 + 7aae913 commit 9075977
Show file tree
Hide file tree
Showing 239 changed files with 19,490 additions and 149 deletions.
158 changes: 95 additions & 63 deletions pkg/network/node/networkpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
networkv1 "github.com/openshift/api/network/v1"
"github.com/openshift/library-go/pkg/network/networkutils"
"github.com/openshift/sdn/pkg/network/common"
"github.com/openshift/sdn/pkg/network/node/ovs"
)

type networkPolicyPlugin struct {
Expand All @@ -43,16 +44,18 @@ type networkPolicyPlugin struct {
namespaces map[uint32]*npNamespace
// nsMatchCache caches matches for namespaceSelectors; see selectNamespaceInternal
nsMatchCache map[string]*npCacheEntry

pods map[ktypes.UID]corev1.Pod
}

// npNamespace tracks NetworkPolicy-related data for a Namespace
type npNamespace struct {
name string
vnid uint32
inUse bool
dirty bool

// mustRecalculate is true if we need to recalculate policy .flows/.selectedIPs
mustRecalculate bool
// mustSync is true if we need to push updated flows to OVS
mustSync bool

labels map[string]string
policies map[ktypes.UID]*npPolicy
Expand All @@ -65,7 +68,8 @@ type npNamespace struct {
type npPolicy struct {
policy networkingv1.NetworkPolicy
watchesNamespaces bool
watchesPods bool
watchesAllPods bool
watchesOwnPods bool

flows []string
selectedIPs []string
Expand All @@ -77,18 +81,10 @@ type npCacheEntry struct {
matches map[string]uint32
}

type refreshForType string

const (
refreshForPods refreshForType = "pods"
refreshForNamespaces refreshForType = "namespaces"
)

func NewNetworkPolicyPlugin() osdnPolicy {
return &networkPolicyPlugin{
namespaces: make(map[uint32]*npNamespace),
namespacesByName: make(map[string]*npNamespace),
pods: make(map[ktypes.UID]corev1.Pod),

nsMatchCache: make(map[string]*npCacheEntry),
}
Expand Down Expand Up @@ -200,7 +196,7 @@ func (np *networkPolicyPlugin) AddNetNamespace(netns *networkv1.NetNamespace) {
npns.gotNetNamespace = true
if npns.gotNamespace {
np.updateMatchCache(npns)
np.refreshNetworkPolicies(refreshForNamespaces)
np.refreshNamespaceNetworkPolicies()
}
}

Expand All @@ -223,9 +219,8 @@ func (np *networkPolicyPlugin) DeleteNetNamespace(netns *networkv1.NetNamespace)

if npns.inUse {
npns.inUse = false
// We call syncNamespaceFlows() not syncNamespace() because it
// needs to happen before we forget about the namespace.
np.syncNamespaceFlows(npns)
// This needs to happen before we forget about the namespace.
np.syncNamespaceImmediately(npns)
}
delete(np.namespaces, netns.NetID)
npns.gotNetNamespace = false
Expand All @@ -249,27 +244,56 @@ func (np *networkPolicyPlugin) GetMulticastEnabled(vnid uint32) bool {
}

func (np *networkPolicyPlugin) syncNamespace(npns *npNamespace) {
if !npns.dirty {
npns.dirty = true
if !npns.mustSync {
npns.mustSync = true
np.runner.Run()
}
}

func (np *networkPolicyPlugin) syncNamespaceImmediately(npns *npNamespace) {
otx := np.node.oc.NewTransaction()
np.generateNamespaceFlows(otx, npns)
if err := otx.Commit(); err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing OVS flows for namespace %q: %v", npns.name, err))
}
}

// This is the entry point for the BoundedFrequencyRunner
func (np *networkPolicyPlugin) syncFlows() {
np.lock.Lock()
defer np.lock.Unlock()

np.recalculate()

// Push internal data to OVS (for namespaces that have changed)
otx := np.node.oc.NewTransaction()
for _, npns := range np.namespaces {
if npns.mustSync {
np.generateNamespaceFlows(otx, npns)
npns.mustSync = false
}
}
if err := otx.Commit(); err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing OVS flows: %v", err))
}
}

// Update internal data to reflect recent pod/namespace changes
func (np *networkPolicyPlugin) recalculate() {
for _, npns := range np.namespaces {
if npns.dirty {
np.syncNamespaceFlows(npns)
npns.dirty = false
if npns.mustRecalculate {
for _, npp := range npns.policies {
if np.updateNetworkPolicy(npns, &npp.policy) {
npns.mustSync = true
}
}
npns.mustRecalculate = false
}
}
}

func (np *networkPolicyPlugin) syncNamespaceFlows(npns *npNamespace) {
func (np *networkPolicyPlugin) generateNamespaceFlows(otx ovs.Transaction, npns *npNamespace) {
klog.V(5).Infof("syncNamespace %d", npns.vnid)
otx := np.node.oc.NewTransaction()
otx.DeleteFlows("table=80, reg1=%d", npns.vnid)
if npns.inUse {
allPodsSelected := false
Expand Down Expand Up @@ -307,9 +331,6 @@ func (np *networkPolicyPlugin) syncNamespaceFlows(npns *npNamespace) {
otx.AddFlow("table=80, priority=50, reg1=%d, actions=output:NXM_NX_REG2[]", npns.vnid)
}
}
if err := otx.Commit(); err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing OVS flows for VNID: %v", err))
}
}

func (np *networkPolicyPlugin) EnsureVNIDRules(vnid uint32) {
Expand Down Expand Up @@ -401,13 +422,17 @@ func (np *networkPolicyPlugin) selectPodsFromNamespaces(nsLabelSel, podLabelSel
return nil
}

namespaces := np.selectNamespacesInternal(nsSel)
for _, pod := range np.pods {
vnid, exists := namespaces[pod.Namespace]
if exists && podSel.Matches(labels.Set(pod.Labels)) {
nsLister := np.node.kubeInformers.Core().V1().Pods().Lister()
for namespace, vnid := range np.selectNamespacesInternal(nsSel) {
pods, err := nsLister.Pods(namespace).List(podSel)
if err != nil {
// Shouldn't happen
utilruntime.HandleError(fmt.Errorf("Could not find matching pods in namespace %q: %v", namespace, err))
continue
}
for _, pod := range pods {
peerFlows = append(peerFlows, fmt.Sprintf("reg0=%d, ip, nw_src=%s, ", vnid, pod.Status.PodIP))
}

}

return peerFlows
Expand Down Expand Up @@ -437,10 +462,15 @@ func (np *networkPolicyPlugin) selectPods(npns *npNamespace, lsel *metav1.LabelS
utilruntime.HandleError(fmt.Errorf("ValidateNetworkPolicy() failure! Invalid PodSelector: %v", err))
return ips
}
for _, pod := range np.pods {
if (npns.name == pod.Namespace) && sel.Matches(labels.Set(pod.Labels)) {
ips = append(ips, pod.Status.PodIP)
}

pods, err := np.node.kubeInformers.Core().V1().Pods().Lister().Pods(npns.name).List(sel)
if err != nil {
// Shouldn't happen
utilruntime.HandleError(fmt.Errorf("Could not find matching pods in namespace %q: %v", npns.name, err))
return ips
}
for _, pod := range pods {
ips = append(ips, pod.Status.PodIP)
}
return ips
}
Expand Down Expand Up @@ -469,7 +499,7 @@ func (np *networkPolicyPlugin) parseNetworkPolicy(npns *npNamespace, policy *net

var destFlows []string
if len(policy.Spec.PodSelector.MatchLabels) > 0 || len(policy.Spec.PodSelector.MatchExpressions) > 0 {
npp.watchesPods = true
npp.watchesOwnPods = true
npp.selectedIPs = np.selectPods(npns, &policy.Spec.PodSelector)
for _, ip := range npp.selectedIPs {
destFlows = append(destFlows, fmt.Sprintf("ip, nw_dst=%s, ", ip))
Expand Down Expand Up @@ -517,7 +547,7 @@ func (np *networkPolicyPlugin) parseNetworkPolicy(npns *npNamespace, policy *net
// The PodSelector is empty, meaning it selects all pods in this namespace
peerFlows = append(peerFlows, fmt.Sprintf("reg0=%d, ", npns.vnid))
} else {
npp.watchesPods = true
npp.watchesOwnPods = true
for _, ip := range np.selectPods(npns, peer.PodSelector) {
peerFlows = append(peerFlows, fmt.Sprintf("reg0=%d, ip, nw_src=%s, ", npns.vnid, ip))
}
Expand All @@ -532,7 +562,7 @@ func (np *networkPolicyPlugin) parseNetworkPolicy(npns *npNamespace, policy *net
}
} else {
npp.watchesNamespaces = true
npp.watchesPods = true
npp.watchesAllPods = true
peerFlows = append(peerFlows, np.selectPodsFromNamespaces(peer.NamespaceSelector, peer.PodSelector)...)
}

Expand Down Expand Up @@ -641,7 +671,7 @@ func (np *networkPolicyPlugin) watchPods() {
np.node.kubeInformers.Core().V1().Pods().Informer().AddEventHandler(funcs)
}

func (np *networkPolicyPlugin) handleAddOrUpdatePod(obj, _ interface{}, eventType watch.EventType) {
func (np *networkPolicyPlugin) handleAddOrUpdatePod(obj, old interface{}, eventType watch.EventType) {
pod := obj.(*corev1.Pod)
klog.V(5).Infof("Watch %s event for Pod %q", eventType, getPodFullName(pod))

Expand All @@ -654,35 +684,27 @@ func (np *networkPolicyPlugin) handleAddOrUpdatePod(obj, _ interface{}, eventTyp
return
}

// We don't want to grab np.Lock for every Pod.Status change...
// But it's safe to look up oldPod without locking here because no other
// threads modify this map.
oldPod, podExisted := np.pods[pod.UID]
if podExisted && oldPod.Status.PodIP == pod.Status.PodIP && reflect.DeepEqual(oldPod.Labels, pod.Labels) {
return
if old != nil {
oldPod := old.(*corev1.Pod)
if oldPod.Status.PodIP == pod.Status.PodIP && reflect.DeepEqual(oldPod.Labels, pod.Labels) {
return
}
}

np.lock.Lock()
defer np.lock.Unlock()

np.pods[pod.UID] = *pod
np.refreshNetworkPolicies(refreshForPods)
np.refreshPodNetworkPolicies(pod)
}

func (np *networkPolicyPlugin) handleDeletePod(obj interface{}) {
pod := obj.(*corev1.Pod)
klog.V(5).Infof("Watch %s event for Pod %q", watch.Deleted, getPodFullName(pod))

_, podExisted := np.pods[pod.UID]
if !podExisted {
return
}

np.lock.Lock()
defer np.lock.Unlock()

delete(np.pods, pod.UID)
np.refreshNetworkPolicies(refreshForPods)
np.refreshPodNetworkPolicies(pod)
}

func (np *networkPolicyPlugin) watchNamespaces() {
Expand Down Expand Up @@ -711,7 +733,7 @@ func (np *networkPolicyPlugin) handleAddOrUpdateNamespace(obj, _ interface{}, ev
npns.gotNamespace = true
if npns.gotNetNamespace {
np.updateMatchCache(npns)
np.refreshNetworkPolicies(refreshForNamespaces)
np.refreshNamespaceNetworkPolicies()
}
}

Expand All @@ -736,18 +758,28 @@ func (np *networkPolicyPlugin) handleDeleteNamespace(obj interface{}) {
np.updateMatchCache(npns)
}

func (np *networkPolicyPlugin) refreshNetworkPolicies(refreshFor refreshForType) {
func (np *networkPolicyPlugin) refreshNamespaceNetworkPolicies() {
for _, npns := range np.namespaces {
changed := false
for _, npp := range npns.policies {
if ((refreshFor == refreshForNamespaces) && npp.watchesNamespaces) ||
((refreshFor == refreshForPods) && npp.watchesPods) {
if np.updateNetworkPolicy(npns, &npp.policy) {
changed = true
}
if npp.watchesNamespaces {
npns.mustRecalculate = true
}
}
if npns.mustRecalculate && npns.inUse {
np.syncNamespace(npns)
}
}
}

func (np *networkPolicyPlugin) refreshPodNetworkPolicies(pod *corev1.Pod) {
podNs := np.namespacesByName[pod.Namespace]
for _, npns := range np.namespaces {
for _, npp := range npns.policies {
if (npp.watchesOwnPods && npns == podNs) || npp.watchesAllPods {
npns.mustRecalculate = true
}
}
if changed && npns.inUse {
if npns.mustRecalculate && npns.inUse {
np.syncNamespace(npns)
}
}
Expand Down

0 comments on commit 9075977

Please sign in to comment.