Skip to content

Commit

Permalink
networkpolicy: do policy recalculation in the bfr thread to avoid blo…
Browse files Browse the repository at this point in the history
…cking

In large clusters, recalculating networkpolicies after pod/namespace
changes may take a lot of effort. Additionally, in some cases we may
end up unnecessarily recalculating multiple times before pushing
changes to OVS. Fix this by moving the recalculating step into the
BoundedFrequencyRunner's thread, doing it just before we push the
updates to OVS.
  • Loading branch information
danwinship committed Dec 1, 2020
1 parent 5f25cd0 commit 7aae913
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 51 deletions.
82 changes: 54 additions & 28 deletions pkg/network/node/networkpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ type npNamespace struct {
name string
vnid uint32
inUse bool
dirty bool

// mustRecalculate is true if we need to recalculate policy .flows/.selectedIPs
mustRecalculate bool
// mustSync is true if we need to push updated flows to OVS
mustSync bool

labels map[string]string
policies map[ktypes.UID]*npPolicy
Expand All @@ -64,7 +68,8 @@ type npNamespace struct {
type npPolicy struct {
policy networkingv1.NetworkPolicy
watchesNamespaces bool
watchesPods bool
watchesAllPods bool
watchesOwnPods bool

flows []string
selectedIPs []string
Expand All @@ -76,13 +81,6 @@ type npCacheEntry struct {
matches map[string]uint32
}

type refreshForType string

const (
refreshForPods refreshForType = "pods"
refreshForNamespaces refreshForType = "namespaces"
)

func NewNetworkPolicyPlugin() osdnPolicy {
return &networkPolicyPlugin{
namespaces: make(map[uint32]*npNamespace),
Expand Down Expand Up @@ -198,7 +196,7 @@ func (np *networkPolicyPlugin) AddNetNamespace(netns *networkv1.NetNamespace) {
npns.gotNetNamespace = true
if npns.gotNamespace {
np.updateMatchCache(npns)
np.refreshNetworkPolicies(refreshForNamespaces)
np.refreshNamespaceNetworkPolicies()
}
}

Expand Down Expand Up @@ -246,8 +244,8 @@ func (np *networkPolicyPlugin) GetMulticastEnabled(vnid uint32) bool {
}

func (np *networkPolicyPlugin) syncNamespace(npns *npNamespace) {
if !npns.dirty {
npns.dirty = true
if !npns.mustSync {
npns.mustSync = true
np.runner.Run()
}
}
Expand All @@ -260,22 +258,40 @@ func (np *networkPolicyPlugin) syncNamespaceImmediately(npns *npNamespace) {
}
}

// This is the entry point for the BoundedFrequencyRunner
func (np *networkPolicyPlugin) syncFlows() {
np.lock.Lock()
defer np.lock.Unlock()

np.recalculate()

// Push internal data to OVS (for namespaces that have changed)
otx := np.node.oc.NewTransaction()
for _, npns := range np.namespaces {
if npns.dirty {
if npns.mustSync {
np.generateNamespaceFlows(otx, npns)
npns.dirty = false
npns.mustSync = false
}
}
if err := otx.Commit(); err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing OVS flows: %v", err))
}
}

// Update internal data to reflect recent pod/namespace changes
func (np *networkPolicyPlugin) recalculate() {
for _, npns := range np.namespaces {
if npns.mustRecalculate {
for _, npp := range npns.policies {
if np.updateNetworkPolicy(npns, &npp.policy) {
npns.mustSync = true
}
}
npns.mustRecalculate = false
}
}
}

func (np *networkPolicyPlugin) generateNamespaceFlows(otx ovs.Transaction, npns *npNamespace) {
klog.V(5).Infof("syncNamespace %d", npns.vnid)
otx.DeleteFlows("table=80, reg1=%d", npns.vnid)
Expand Down Expand Up @@ -483,7 +499,7 @@ func (np *networkPolicyPlugin) parseNetworkPolicy(npns *npNamespace, policy *net

var destFlows []string
if len(policy.Spec.PodSelector.MatchLabels) > 0 || len(policy.Spec.PodSelector.MatchExpressions) > 0 {
npp.watchesPods = true
npp.watchesOwnPods = true
npp.selectedIPs = np.selectPods(npns, &policy.Spec.PodSelector)
for _, ip := range npp.selectedIPs {
destFlows = append(destFlows, fmt.Sprintf("ip, nw_dst=%s, ", ip))
Expand Down Expand Up @@ -531,7 +547,7 @@ func (np *networkPolicyPlugin) parseNetworkPolicy(npns *npNamespace, policy *net
// The PodSelector is empty, meaning it selects all pods in this namespace
peerFlows = append(peerFlows, fmt.Sprintf("reg0=%d, ", npns.vnid))
} else {
npp.watchesPods = true
npp.watchesOwnPods = true
for _, ip := range np.selectPods(npns, peer.PodSelector) {
peerFlows = append(peerFlows, fmt.Sprintf("reg0=%d, ip, nw_src=%s, ", npns.vnid, ip))
}
Expand All @@ -546,7 +562,7 @@ func (np *networkPolicyPlugin) parseNetworkPolicy(npns *npNamespace, policy *net
}
} else {
npp.watchesNamespaces = true
npp.watchesPods = true
npp.watchesAllPods = true
peerFlows = append(peerFlows, np.selectPodsFromNamespaces(peer.NamespaceSelector, peer.PodSelector)...)
}

Expand Down Expand Up @@ -678,7 +694,7 @@ func (np *networkPolicyPlugin) handleAddOrUpdatePod(obj, old interface{}, eventT
np.lock.Lock()
defer np.lock.Unlock()

np.refreshNetworkPolicies(refreshForPods)
np.refreshPodNetworkPolicies(pod)
}

func (np *networkPolicyPlugin) handleDeletePod(obj interface{}) {
Expand All @@ -688,7 +704,7 @@ func (np *networkPolicyPlugin) handleDeletePod(obj interface{}) {
np.lock.Lock()
defer np.lock.Unlock()

np.refreshNetworkPolicies(refreshForPods)
np.refreshPodNetworkPolicies(pod)
}

func (np *networkPolicyPlugin) watchNamespaces() {
Expand Down Expand Up @@ -717,7 +733,7 @@ func (np *networkPolicyPlugin) handleAddOrUpdateNamespace(obj, _ interface{}, ev
npns.gotNamespace = true
if npns.gotNetNamespace {
np.updateMatchCache(npns)
np.refreshNetworkPolicies(refreshForNamespaces)
np.refreshNamespaceNetworkPolicies()
}
}

Expand All @@ -742,18 +758,28 @@ func (np *networkPolicyPlugin) handleDeleteNamespace(obj interface{}) {
np.updateMatchCache(npns)
}

func (np *networkPolicyPlugin) refreshNetworkPolicies(refreshFor refreshForType) {
func (np *networkPolicyPlugin) refreshNamespaceNetworkPolicies() {
for _, npns := range np.namespaces {
changed := false
for _, npp := range npns.policies {
if ((refreshFor == refreshForNamespaces) && npp.watchesNamespaces) ||
((refreshFor == refreshForPods) && npp.watchesPods) {
if np.updateNetworkPolicy(npns, &npp.policy) {
changed = true
}
if npp.watchesNamespaces {
npns.mustRecalculate = true
}
}
if npns.mustRecalculate && npns.inUse {
np.syncNamespace(npns)
}
}
}

func (np *networkPolicyPlugin) refreshPodNetworkPolicies(pod *corev1.Pod) {
podNs := np.namespacesByName[pod.Namespace]
for _, npns := range np.namespaces {
for _, npp := range npns.policies {
if (npp.watchesOwnPods && npns == podNs) || npp.watchesAllPods {
npns.mustRecalculate = true
}
}
if changed && npns.inUse {
if npns.mustRecalculate && npns.inUse {
np.syncNamespace(npns)
}
}
Expand Down

0 comments on commit 7aae913

Please sign in to comment.