Skip to content

Commit

Permalink
networkpolicy: use the informer's pod cache rather than having our own
Browse files Browse the repository at this point in the history
Especially, we were previously copying all of the pods rather than
just keeping pointers to the objects in the cache (probably a leftover
from very old pre-shared-informer code).

This may also fix leaks when pods are deleted and recreated, since
informers apparently compress events based on namespace+name, not UID,
so a delete+recreate would be compressed to an update, and we'd never
get a delete for the old UID.
  • Loading branch information
danwinship authored and openshift-cherrypick-robot committed Dec 4, 2020
1 parent da1c28f commit 2b936ba
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 27 deletions.
50 changes: 24 additions & 26 deletions pkg/network/node/networkpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ type networkPolicyPlugin struct {
namespaces map[uint32]*npNamespace
// nsMatchCache caches matches for namespaceSelectors; see selectNamespaceInternal
nsMatchCache map[string]*npCacheEntry

pods map[ktypes.UID]corev1.Pod
}

// npNamespace tracks NetworkPolicy-related data for a Namespace
Expand Down Expand Up @@ -88,7 +86,6 @@ func NewNetworkPolicyPlugin() osdnPolicy {
return &networkPolicyPlugin{
namespaces: make(map[uint32]*npNamespace),
namespacesByName: make(map[string]*npNamespace),
pods: make(map[ktypes.UID]corev1.Pod),

nsMatchCache: make(map[string]*npCacheEntry),
}
Expand Down Expand Up @@ -401,13 +398,17 @@ func (np *networkPolicyPlugin) selectPodsFromNamespaces(nsLabelSel, podLabelSel
return nil
}

namespaces := np.selectNamespacesInternal(nsSel)
for _, pod := range np.pods {
vnid, exists := namespaces[pod.Namespace]
if exists && podSel.Matches(labels.Set(pod.Labels)) {
nsLister := np.node.kubeInformers.Core().V1().Pods().Lister()
for namespace, vnid := range np.selectNamespacesInternal(nsSel) {
pods, err := nsLister.Pods(namespace).List(podSel)
if err != nil {
// Shouldn't happen
utilruntime.HandleError(fmt.Errorf("Could not find matching pods in namespace %q: %v", namespace, err))
continue
}
for _, pod := range pods {
peerFlows = append(peerFlows, fmt.Sprintf("reg0=%d, ip, nw_src=%s, ", vnid, pod.Status.PodIP))
}

}

return peerFlows
Expand Down Expand Up @@ -437,10 +438,15 @@ func (np *networkPolicyPlugin) selectPods(npns *npNamespace, lsel *metav1.LabelS
utilruntime.HandleError(fmt.Errorf("ValidateNetworkPolicy() failure! Invalid PodSelector: %v", err))
return ips
}
for _, pod := range np.pods {
if (npns.name == pod.Namespace) && sel.Matches(labels.Set(pod.Labels)) {
ips = append(ips, pod.Status.PodIP)
}

pods, err := np.node.kubeInformers.Core().V1().Pods().Lister().Pods(npns.name).List(sel)
if err != nil {
// Shouldn't happen
utilruntime.HandleError(fmt.Errorf("Could not find matching pods in namespace %q: %v", npns.name, err))
return ips
}
for _, pod := range pods {
ips = append(ips, pod.Status.PodIP)
}
return ips
}
Expand Down Expand Up @@ -641,7 +647,7 @@ func (np *networkPolicyPlugin) watchPods() {
np.node.kubeInformers.Core().V1().Pods().Informer().AddEventHandler(funcs)
}

func (np *networkPolicyPlugin) handleAddOrUpdatePod(obj, _ interface{}, eventType watch.EventType) {
func (np *networkPolicyPlugin) handleAddOrUpdatePod(obj, old interface{}, eventType watch.EventType) {
pod := obj.(*corev1.Pod)
klog.V(5).Infof("Watch %s event for Pod %q", eventType, getPodFullName(pod))

Expand All @@ -654,34 +660,26 @@ func (np *networkPolicyPlugin) handleAddOrUpdatePod(obj, _ interface{}, eventTyp
return
}

// We don't want to grab np.Lock for every Pod.Status change...
// But it's safe to look up oldPod without locking here because no other
// threads modify this map.
oldPod, podExisted := np.pods[pod.UID]
if podExisted && oldPod.Status.PodIP == pod.Status.PodIP && reflect.DeepEqual(oldPod.Labels, pod.Labels) {
return
if old != nil {
oldPod := old.(*corev1.Pod)
if oldPod.Status.PodIP == pod.Status.PodIP && reflect.DeepEqual(oldPod.Labels, pod.Labels) {
return
}
}

np.lock.Lock()
defer np.lock.Unlock()

np.pods[pod.UID] = *pod
np.refreshNetworkPolicies(refreshForPods)
}

func (np *networkPolicyPlugin) handleDeletePod(obj interface{}) {
pod := obj.(*corev1.Pod)
klog.V(5).Infof("Watch %s event for Pod %q", watch.Deleted, getPodFullName(pod))

_, podExisted := np.pods[pod.UID]
if !podExisted {
return
}

np.lock.Lock()
defer np.lock.Unlock()

delete(np.pods, pod.UID)
np.refreshNetworkPolicies(refreshForPods)
}

Expand Down
1 change: 0 additions & 1 deletion pkg/network/node/networkpolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ func newTestNPP() *networkPolicyPlugin {

namespaces: make(map[uint32]*npNamespace),
namespacesByName: make(map[string]*npNamespace),
pods: make(map[ktypes.UID]corev1.Pod),
nsMatchCache: make(map[string]*npCacheEntry),
}
np.vnids = newNodeVNIDMap(np, nil)
Expand Down

0 comments on commit 2b936ba

Please sign in to comment.