Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug 1904455: NetworkPolicy performance fixes, v2 #249

Merged
merged 2 commits into from Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
174 changes: 106 additions & 68 deletions pkg/network/node/networkpolicy.go
Expand Up @@ -27,6 +27,7 @@ import (
networkv1 "github.com/openshift/api/network/v1"
"github.com/openshift/library-go/pkg/network/networkutils"
"github.com/openshift/sdn/pkg/network/common"
"github.com/openshift/sdn/pkg/network/node/ovs"
)

type networkPolicyPlugin struct {
Expand All @@ -43,16 +44,18 @@ type networkPolicyPlugin struct {
namespaces map[uint32]*npNamespace
// nsMatchCache caches matches for namespaceSelectors; see selectNamespaceInternal
nsMatchCache map[string]*npCacheEntry

pods map[ktypes.UID]corev1.Pod
}

// npNamespace tracks NetworkPolicy-related data for a Namespace
type npNamespace struct {
name string
vnid uint32
inUse bool
dirty bool

// mustRecalculate is true if we need to recalculate policy .flows/.selectedIPs
mustRecalculate bool
// mustSync is true if we need to push updated flows to OVS
mustSync bool

labels map[string]string
policies map[ktypes.UID]*npPolicy
Expand All @@ -65,7 +68,8 @@ type npNamespace struct {
type npPolicy struct {
policy networkingv1.NetworkPolicy
watchesNamespaces bool
watchesPods bool
watchesAllPods bool
watchesOwnPods bool

flows []string
selectedIPs []string
Expand All @@ -77,18 +81,10 @@ type npCacheEntry struct {
matches map[string]uint32
}

type refreshForType string

const (
refreshForPods refreshForType = "pods"
refreshForNamespaces refreshForType = "namespaces"
)

func NewNetworkPolicyPlugin() osdnPolicy {
return &networkPolicyPlugin{
namespaces: make(map[uint32]*npNamespace),
namespacesByName: make(map[string]*npNamespace),
pods: make(map[ktypes.UID]corev1.Pod),

nsMatchCache: make(map[string]*npCacheEntry),
}
Expand Down Expand Up @@ -200,7 +196,7 @@ func (np *networkPolicyPlugin) AddNetNamespace(netns *networkv1.NetNamespace) {
npns.gotNetNamespace = true
if npns.gotNamespace {
np.updateMatchCache(npns)
np.refreshNetworkPolicies(refreshForNamespaces)
np.refreshNamespaceNetworkPolicies()
}
}

Expand All @@ -223,9 +219,8 @@ func (np *networkPolicyPlugin) DeleteNetNamespace(netns *networkv1.NetNamespace)

if npns.inUse {
npns.inUse = false
// We call syncNamespaceFlows() not syncNamespace() because it
// needs to happen before we forget about the namespace.
np.syncNamespaceFlows(npns)
// This needs to happen before we forget about the namespace.
np.syncNamespaceImmediately(npns)
}
delete(np.namespaces, netns.NetID)
npns.gotNetNamespace = false
Expand All @@ -249,27 +244,56 @@ func (np *networkPolicyPlugin) GetMulticastEnabled(vnid uint32) bool {
}

func (np *networkPolicyPlugin) syncNamespace(npns *npNamespace) {
if !npns.dirty {
npns.dirty = true
if !npns.mustSync {
npns.mustSync = true
np.runner.Run()
}
}

func (np *networkPolicyPlugin) syncNamespaceImmediately(npns *npNamespace) {
otx := np.node.oc.NewTransaction()
np.generateNamespaceFlows(otx, npns)
if err := otx.Commit(); err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing OVS flows for namespace %q: %v", npns.name, err))
}
}

// This is the entry point for the BoundedFrequencyRunner
func (np *networkPolicyPlugin) syncFlows() {
np.lock.Lock()
defer np.lock.Unlock()

np.recalculate()

// Push internal data to OVS (for namespaces that have changed)
otx := np.node.oc.NewTransaction()
for _, npns := range np.namespaces {
if npns.dirty {
np.syncNamespaceFlows(npns)
npns.dirty = false
if npns.mustSync {
np.generateNamespaceFlows(otx, npns)
npns.mustSync = false
}
}
if err := otx.Commit(); err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing OVS flows: %v", err))
}
}

func (np *networkPolicyPlugin) syncNamespaceFlows(npns *npNamespace) {
// Update internal data to reflect recent pod/namespace changes
func (np *networkPolicyPlugin) recalculate() {
for _, npns := range np.namespaces {
if npns.mustRecalculate {
for _, npp := range npns.policies {
if np.updateNetworkPolicy(npns, &npp.policy) {
npns.mustSync = true
}
}
npns.mustRecalculate = false
}
}
}

func (np *networkPolicyPlugin) generateNamespaceFlows(otx ovs.Transaction, npns *npNamespace) {
klog.V(5).Infof("syncNamespace %d", npns.vnid)
otx := np.node.oc.NewTransaction()
otx.DeleteFlows("table=80, reg1=%d", npns.vnid)
if npns.inUse {
allPodsSelected := false
Expand Down Expand Up @@ -307,9 +331,6 @@ func (np *networkPolicyPlugin) syncNamespaceFlows(npns *npNamespace) {
otx.AddFlow("table=80, priority=50, reg1=%d, actions=output:NXM_NX_REG2[]", npns.vnid)
}
}
if err := otx.Commit(); err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing OVS flows for VNID: %v", err))
}
}

func (np *networkPolicyPlugin) EnsureVNIDRules(vnid uint32) {
Expand Down Expand Up @@ -401,13 +422,19 @@ func (np *networkPolicyPlugin) selectPodsFromNamespaces(nsLabelSel, podLabelSel
return nil
}

namespaces := np.selectNamespacesInternal(nsSel)
for _, pod := range np.pods {
vnid, exists := namespaces[pod.Namespace]
if exists && podSel.Matches(labels.Set(pod.Labels)) {
peerFlows = append(peerFlows, fmt.Sprintf("reg0=%d, ip, nw_src=%s, ", vnid, pod.Status.PodIP))
nsLister := np.node.kubeInformers.Core().V1().Pods().Lister()
for namespace, vnid := range np.selectNamespacesInternal(nsSel) {
pods, err := nsLister.Pods(namespace).List(podSel)
if err != nil {
// Shouldn't happen
utilruntime.HandleError(fmt.Errorf("Could not find matching pods in namespace %q: %v", namespace, err))
continue
}
for _, pod := range pods {
if isOnPodNetwork(pod) {
peerFlows = append(peerFlows, fmt.Sprintf("reg0=%d, ip, nw_src=%s, ", vnid, pod.Status.PodIP))
}
}

}

return peerFlows
Expand Down Expand Up @@ -437,8 +464,15 @@ func (np *networkPolicyPlugin) selectPods(npns *npNamespace, lsel *metav1.LabelS
utilruntime.HandleError(fmt.Errorf("ValidateNetworkPolicy() failure! Invalid PodSelector: %v", err))
return ips
}
for _, pod := range np.pods {
if (npns.name == pod.Namespace) && sel.Matches(labels.Set(pod.Labels)) {

pods, err := np.node.kubeInformers.Core().V1().Pods().Lister().Pods(npns.name).List(sel)
if err != nil {
// Shouldn't happen
utilruntime.HandleError(fmt.Errorf("Could not find matching pods in namespace %q: %v", npns.name, err))
return ips
}
for _, pod := range pods {
if isOnPodNetwork(pod) {
ips = append(ips, pod.Status.PodIP)
}
}
Expand Down Expand Up @@ -469,7 +503,7 @@ func (np *networkPolicyPlugin) parseNetworkPolicy(npns *npNamespace, policy *net

var destFlows []string
if len(policy.Spec.PodSelector.MatchLabels) > 0 || len(policy.Spec.PodSelector.MatchExpressions) > 0 {
npp.watchesPods = true
npp.watchesOwnPods = true
npp.selectedIPs = np.selectPods(npns, &policy.Spec.PodSelector)
for _, ip := range npp.selectedIPs {
destFlows = append(destFlows, fmt.Sprintf("ip, nw_dst=%s, ", ip))
Expand Down Expand Up @@ -517,7 +551,7 @@ func (np *networkPolicyPlugin) parseNetworkPolicy(npns *npNamespace, policy *net
// The PodSelector is empty, meaning it selects all pods in this namespace
peerFlows = append(peerFlows, fmt.Sprintf("reg0=%d, ", npns.vnid))
} else {
npp.watchesPods = true
npp.watchesOwnPods = true
for _, ip := range np.selectPods(npns, peer.PodSelector) {
peerFlows = append(peerFlows, fmt.Sprintf("reg0=%d, ip, nw_src=%s, ", npns.vnid, ip))
}
Expand All @@ -532,7 +566,7 @@ func (np *networkPolicyPlugin) parseNetworkPolicy(npns *npNamespace, policy *net
}
} else {
npp.watchesNamespaces = true
npp.watchesPods = true
npp.watchesAllPods = true
peerFlows = append(peerFlows, np.selectPodsFromNamespaces(peer.NamespaceSelector, peer.PodSelector)...)
}

Expand Down Expand Up @@ -641,48 +675,42 @@ func (np *networkPolicyPlugin) watchPods() {
np.node.kubeInformers.Core().V1().Pods().Informer().AddEventHandler(funcs)
}

func (np *networkPolicyPlugin) handleAddOrUpdatePod(obj, _ interface{}, eventType watch.EventType) {
func isOnPodNetwork(pod *corev1.Pod) bool {
if pod.Spec.HostNetwork {
return false
}
return pod.Status.PodIP != ""
}

func (np *networkPolicyPlugin) handleAddOrUpdatePod(obj, old interface{}, eventType watch.EventType) {
pod := obj.(*corev1.Pod)
klog.V(5).Infof("Watch %s event for Pod %q", eventType, getPodFullName(pod))

// Ignore pods with HostNetwork=true, SDN is not involved in this case
if pod.Spec.SecurityContext != nil && pod.Spec.HostNetwork {
return
}
if pod.Status.PodIP == "" {
klog.V(5).Infof("PodIP is not set for pod %q; ignoring", getPodFullName(pod))
if !isOnPodNetwork(pod) {
return
}

// We don't want to grab np.Lock for every Pod.Status change...
// But it's safe to look up oldPod without locking here because no other
// threads modify this map.
oldPod, podExisted := np.pods[pod.UID]
if podExisted && oldPod.Status.PodIP == pod.Status.PodIP && reflect.DeepEqual(oldPod.Labels, pod.Labels) {
return
if old != nil {
oldPod := old.(*corev1.Pod)
if oldPod.Status.PodIP == pod.Status.PodIP && reflect.DeepEqual(oldPod.Labels, pod.Labels) {
return
}
}

np.lock.Lock()
defer np.lock.Unlock()

np.pods[pod.UID] = *pod
np.refreshNetworkPolicies(refreshForPods)
np.refreshPodNetworkPolicies(pod)
}

func (np *networkPolicyPlugin) handleDeletePod(obj interface{}) {
pod := obj.(*corev1.Pod)
klog.V(5).Infof("Watch %s event for Pod %q", watch.Deleted, getPodFullName(pod))

_, podExisted := np.pods[pod.UID]
if !podExisted {
return
}

np.lock.Lock()
defer np.lock.Unlock()

delete(np.pods, pod.UID)
np.refreshNetworkPolicies(refreshForPods)
np.refreshPodNetworkPolicies(pod)
}

func (np *networkPolicyPlugin) watchNamespaces() {
Expand Down Expand Up @@ -711,7 +739,7 @@ func (np *networkPolicyPlugin) handleAddOrUpdateNamespace(obj, _ interface{}, ev
npns.gotNamespace = true
if npns.gotNetNamespace {
np.updateMatchCache(npns)
np.refreshNetworkPolicies(refreshForNamespaces)
np.refreshNamespaceNetworkPolicies()
}
}

Expand All @@ -736,18 +764,28 @@ func (np *networkPolicyPlugin) handleDeleteNamespace(obj interface{}) {
np.updateMatchCache(npns)
}

func (np *networkPolicyPlugin) refreshNetworkPolicies(refreshFor refreshForType) {
func (np *networkPolicyPlugin) refreshNamespaceNetworkPolicies() {
for _, npns := range np.namespaces {
changed := false
for _, npp := range npns.policies {
if ((refreshFor == refreshForNamespaces) && npp.watchesNamespaces) ||
((refreshFor == refreshForPods) && npp.watchesPods) {
if np.updateNetworkPolicy(npns, &npp.policy) {
changed = true
}
if npp.watchesNamespaces {
npns.mustRecalculate = true
}
}
if npns.mustRecalculate && npns.inUse {
np.syncNamespace(npns)
}
}
}

func (np *networkPolicyPlugin) refreshPodNetworkPolicies(pod *corev1.Pod) {
podNs := np.namespacesByName[pod.Namespace]
for _, npns := range np.namespaces {
for _, npp := range npns.policies {
if (npp.watchesOwnPods && npns == podNs) || npp.watchesAllPods {
npns.mustRecalculate = true
}
}
if changed && npns.inUse {
if npns.mustRecalculate && npns.inUse {
np.syncNamespace(npns)
}
}
Expand Down