Skip to content

Commit 7084651

Browse files
authoredJul 14, 2023
fix sync period handling, headless svc (#15)
Fix policy event handler to refer resourceRevision do not include headless service in the endpoints policyendpoints manager cleanup
1 parent 13c5352 commit 7084651

File tree

8 files changed

+158
-28
lines changed

8 files changed

+158
-28
lines changed
 

‎internal/eventhandlers/policy.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (h *enqueueRequestForPolicyEvent) Update(_ context.Context, e event.UpdateE
6161
newPolicy := e.ObjectNew.(*networking.NetworkPolicy)
6262

6363
h.logger.V(1).Info("Handling update event", "policy", k8s.NamespacedName(newPolicy))
64-
if oldPolicy.Generation != newPolicy.Generation && equality.Semantic.DeepEqual(oldPolicy.Spec, newPolicy.Spec) &&
64+
if !equality.Semantic.DeepEqual(newPolicy.ResourceVersion, oldPolicy.ResourceVersion) && equality.Semantic.DeepEqual(oldPolicy.Spec, newPolicy.Spec) &&
6565
equality.Semantic.DeepEqual(oldPolicy.DeletionTimestamp.IsZero(), newPolicy.DeletionTimestamp.IsZero()) {
6666
return
6767
}

‎pkg/k8s/service_utils.go

+8
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,11 @@ func LookupListenPortFromPodSpec(svc *corev1.Service, pod *corev1.Pod, port ints
4242
}
4343
return 0, errors.Errorf("unable to find listener port for port %s on service %s", port.String(), NamespacedName(svc))
4444
}
45+
46+
// IsServiceHeadless returns true if the service is headless
47+
func IsServiceHeadless(svc *corev1.Service) bool {
48+
if svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" {
49+
return true
50+
}
51+
return false
52+
}

‎pkg/k8s/service_utils_test.go

+39
Original file line numberDiff line numberDiff line change
@@ -305,3 +305,42 @@ func Test_LookupListenPortFromPodSpec(t *testing.T) {
305305
})
306306
}
307307
}
308+
309+
func Test_IsServiceHeadless(t *testing.T) {
310+
tests := []struct {
311+
name string
312+
svc *corev1.Service
313+
want bool
314+
}{
315+
{
316+
name: "headless service",
317+
svc: &corev1.Service{
318+
Spec: corev1.ServiceSpec{
319+
ClusterIP: corev1.ClusterIPNone,
320+
},
321+
},
322+
want: true,
323+
},
324+
{
325+
name: "empty IP",
326+
svc: &corev1.Service{
327+
Spec: corev1.ServiceSpec{},
328+
},
329+
want: true,
330+
},
331+
{
332+
name: "some cluster IP",
333+
svc: &corev1.Service{
334+
Spec: corev1.ServiceSpec{
335+
ClusterIP: "10.100.0.209",
336+
},
337+
},
338+
},
339+
}
340+
for _, tt := range tests {
341+
t.Run(tt.name, func(t *testing.T) {
342+
assert.Equal(t, tt.want, IsServiceHeadless(tt.svc))
343+
})
344+
}
345+
346+
}

‎pkg/policyendpoints/manager.go

+8-25
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/sha256"
66
"encoding/hex"
7+
"golang.org/x/exp/maps"
78
"strconv"
89

910
"github.com/go-logr/logr"
@@ -67,7 +68,7 @@ func (m *policyEndpointsManager) Reconcile(ctx context.Context, policy *networki
6768
if err != nil {
6869
return err
6970
}
70-
m.logger.V(1).Info("Got policy endpoints lists", "create", len(createList), "update", len(updateList), "delete", len(deleteList))
71+
m.logger.Info("Got policy endpoints lists", "create", len(createList), "update", len(updateList), "delete", len(deleteList))
7172
for _, policyEndpoint := range createList {
7273
if err := m.k8sClient.Create(ctx, &policyEndpoint); err != nil {
7374
return err
@@ -139,31 +140,28 @@ func (m *policyEndpointsManager) computePolicyEndpoints(policy *networking.Netwo
139140
// Go over the existing endpoints, and remove entries that are no longer needed
140141
var modifiedEndpoints []policyinfo.PolicyEndpoint
141142
var potentialDeletes []policyinfo.PolicyEndpoint
142-
usedIngressRuleKeys := sets.Set[string]{}
143-
usedEgressRulesKeys := sets.Set[string]{}
144-
usedPodEndpoints := sets.Set[policyinfo.PodEndpoint]{}
145143
for i := range existingPolicyEndpoints {
146144
ingEndpointList := make([]policyinfo.EndpointInfo, 0, len(existingPolicyEndpoints[i].Spec.Ingress))
147145
for _, ingRule := range existingPolicyEndpoints[i].Spec.Ingress {
148146
ruleKey := m.getEndpointInfoKey(ingRule)
149147
if _, exists := ingressEndpointsMap[ruleKey]; exists {
150148
ingEndpointList = append(ingEndpointList, ingRule)
151-
usedIngressRuleKeys.Insert(ruleKey)
149+
delete(ingressEndpointsMap, ruleKey)
152150
}
153151
}
154152
egEndpointList := make([]policyinfo.EndpointInfo, 0, len(existingPolicyEndpoints[i].Spec.Egress))
155153
for _, egRule := range existingPolicyEndpoints[i].Spec.Egress {
156154
ruleKey := m.getEndpointInfoKey(egRule)
157155
if _, exists := egressEndpointsMap[ruleKey]; exists {
158156
egEndpointList = append(egEndpointList, egRule)
159-
usedEgressRulesKeys.Insert(ruleKey)
157+
delete(egressEndpointsMap, ruleKey)
160158
}
161159
}
162160
podSelectorEndpointList := make([]policyinfo.PodEndpoint, 0, len(existingPolicyEndpoints[i].Spec.PodSelectorEndpoints))
163161
for _, ps := range existingPolicyEndpoints[i].Spec.PodSelectorEndpoints {
164162
if podSelectorEndpointSet.Has(ps) {
165163
podSelectorEndpointList = append(podSelectorEndpointList, ps)
166-
usedPodEndpoints.Insert(ps)
164+
podSelectorEndpointSet.Delete(ps)
167165
}
168166
}
169167
policyEndpointChanged := false
@@ -188,22 +186,7 @@ func (m *policyEndpointsManager) computePolicyEndpoints(policy *networking.Netwo
188186
}
189187
}
190188

191-
remainingIngressRuleKeys := sets.Set[string]{}
192-
remainingEgressRulesKeys := sets.Set[string]{}
193-
remainingPodEndpoints := podSelectorEndpointSet.Difference(usedPodEndpoints)
194-
195-
for key := range ingressEndpointsMap {
196-
if !usedIngressRuleKeys.Has(key) {
197-
remainingIngressRuleKeys.Insert(key)
198-
}
199-
}
200-
for key := range egressEndpointsMap {
201-
if !usedEgressRulesKeys.Has(key) {
202-
remainingEgressRulesKeys.Insert(key)
203-
}
204-
}
205-
206-
ingressRuleChunks := lo.Chunk(remainingIngressRuleKeys.UnsortedList(), m.endpointChunkSize)
189+
ingressRuleChunks := lo.Chunk(maps.Keys(ingressEndpointsMap), m.endpointChunkSize)
207190
doNotDelete := sets.Set[types.NamespacedName]{}
208191
for _, chunk := range ingressRuleChunks {
209192
// check in the existing lists if chunk fits, otherwise allocate a new ep
@@ -228,7 +211,7 @@ func (m *policyEndpointsManager) computePolicyEndpoints(policy *networking.Netwo
228211
createPolicyEndpoints = append(createPolicyEndpoints, newEP)
229212
}
230213

231-
egressRuleChunks := lo.Chunk(remainingEgressRulesKeys.UnsortedList(), m.endpointChunkSize)
214+
egressRuleChunks := lo.Chunk(maps.Keys(egressEndpointsMap), m.endpointChunkSize)
232215
for _, chunk := range egressRuleChunks {
233216
// check in the existing to-update/to-delete list if chunk fits, otherwise allocate a new ep
234217
var assigned bool
@@ -251,7 +234,7 @@ func (m *policyEndpointsManager) computePolicyEndpoints(policy *networking.Netwo
251234
newEP := m.newPolicyEndpoint(policy, nil, m.getListOfEndpointInfoFromHash(chunk, egressEndpointsMap), nil)
252235
createPolicyEndpoints = append(createPolicyEndpoints, newEP)
253236
}
254-
podEndpointChunks := lo.Chunk(remainingPodEndpoints.UnsortedList(), m.endpointChunkSize)
237+
podEndpointChunks := lo.Chunk(podSelectorEndpointSet.UnsortedList(), m.endpointChunkSize)
255238
for _, chunk := range podEndpointChunks {
256239
var assigned bool
257240
for _, sliceToCheck := range [][]policyinfo.PolicyEndpoint{createPolicyEndpoints, modifiedEndpoints, potentialDeletes} {

‎pkg/policyendpoints/manager_test.go

+70
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,76 @@ func Test_policyEndpointsManager_computePolicyEndpoints(t *testing.T) {
372372
updateCount: 2,
373373
},
374374
},
375+
{
376+
name: "cleanup endpoints with same entries",
377+
fields: fields{
378+
endpointChunkSize: 3,
379+
},
380+
args: args{
381+
policy: &networking.NetworkPolicy{
382+
ObjectMeta: metav1.ObjectMeta{
383+
Namespace: "policy-namespace",
384+
Name: "policy-name",
385+
},
386+
Spec: networking.NetworkPolicySpec{},
387+
},
388+
policyEndpoints: []policyinfo.PolicyEndpoint{
389+
{
390+
ObjectMeta: metav1.ObjectMeta{
391+
Namespace: "policy-namespace",
392+
Name: "policy-name-1",
393+
},
394+
Spec: policyinfo.PolicyEndpointSpec{
395+
Ingress: getEPInfoHelper([]policyinfo.NetworkAddress{"1.2.3.4", "1.2.3.5"}, nil, 3),
396+
Egress: getEPInfoHelper([]policyinfo.NetworkAddress{"2.2.0.0/16", "2.3.0.0/16"}, []policyinfo.NetworkAddress{"2.2.3.4"}, 1),
397+
PodSelectorEndpoints: []policyinfo.PodEndpoint{
398+
{
399+
Name: "pod1",
400+
},
401+
{
402+
Name: "pod2",
403+
},
404+
},
405+
},
406+
},
407+
{
408+
ObjectMeta: metav1.ObjectMeta{
409+
Namespace: "policy-namespace",
410+
Name: "policy-name-dup",
411+
},
412+
Spec: policyinfo.PolicyEndpointSpec{
413+
Ingress: getEPInfoHelper([]policyinfo.NetworkAddress{"1.2.3.5", "1.2.3.4"}, nil, 3),
414+
Egress: getEPInfoHelper([]policyinfo.NetworkAddress{"2.3.0.0/16", "2.2.0.0/16"}, []policyinfo.NetworkAddress{"2.2.3.4"}, 1),
415+
PodSelectorEndpoints: []policyinfo.PodEndpoint{
416+
{
417+
Name: "pod1",
418+
},
419+
{
420+
Name: "pod2",
421+
},
422+
},
423+
},
424+
},
425+
},
426+
ingressRules: getEPInfoHelper([]policyinfo.NetworkAddress{"1.2.3.4", "1.2.3.5"}, nil, 3),
427+
egressRules: getEPInfoHelper([]policyinfo.NetworkAddress{"2.2.0.0/16", "2.3.0.0/16"}, []policyinfo.NetworkAddress{"2.2.3.4"}, 1),
428+
podselectorEndpoints: []policyinfo.PodEndpoint{
429+
{
430+
Name: "pod1",
431+
},
432+
{
433+
Name: "pod2",
434+
},
435+
},
436+
epValidator: func(_ *networking.NetworkPolicy, _ *policyinfo.PolicyEndpoint) bool {
437+
return true
438+
},
439+
},
440+
want: want{
441+
deleteCount: 1,
442+
updateCount: 1,
443+
},
444+
},
375445
}
376446
for _, tt := range tests {
377447
t.Run(tt.name, func(t *testing.T) {

‎pkg/resolvers/endpoints.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ func (r *defaultEndpointsResolver) getMatchingServiceClusterIPs(ctx context.Cont
339339
return nil
340340
}
341341
for _, svc := range svcList.Items {
342-
if !svcSelector.Matches(labels.Set(svc.Spec.Selector)) {
342+
if k8s.IsServiceHeadless(&svc) || !svcSelector.Matches(labels.Set(svc.Spec.Selector)) {
343343
continue
344344
}
345345
var portList []policyinfo.Port

‎pkg/resolvers/endpoints_test.go

+30
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,36 @@ func TestEndpointsResolver_Resolve(t *testing.T) {
372372
{PodIP: "1.0.0.3", Name: "pod3", Namespace: "ns"},
373373
},
374374
},
375+
{
376+
name: "exclude headless service",
377+
args: args{
378+
netpol: egressPolicy,
379+
podListCalls: []podListCall{
380+
{
381+
pods: []corev1.Pod{pod2, podNoIP, pod3},
382+
},
383+
},
384+
serviceListCalls: []serviceListCall{
385+
{
386+
services: []corev1.Service{
387+
{
388+
Spec: corev1.ServiceSpec{
389+
ClusterIP: "None",
390+
},
391+
},
392+
},
393+
},
394+
},
395+
},
396+
wantEgressEndpoints: []policyinfo.EndpointInfo{
397+
{CIDR: "1.0.0.2"},
398+
{CIDR: "1.0.0.3"},
399+
},
400+
wantPodEndpoints: []policyinfo.PodEndpoint{
401+
{PodIP: "1.0.0.2", Name: "pod2", Namespace: "ns"},
402+
{PodIP: "1.0.0.3", Name: "pod3", Namespace: "ns"},
403+
},
404+
},
375405
{
376406
name: "resolve network peers, ingress/egress",
377407
args: args{

‎pkg/resolvers/policies_for_service.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616

1717
// getReferredPoliciesForService returns the list of policies that refer to the service.
1818
func (r *defaultPolicyReferenceResolver) getReferredPoliciesForService(ctx context.Context, svc, svcOld *corev1.Service) ([]networking.NetworkPolicy, error) {
19-
if svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" {
19+
if k8s.IsServiceHeadless(svc) {
2020
r.logger.V(1).Info("Ignoring headless service", "svc", k8s.NamespacedName(svc))
2121
return nil, nil
2222
}

0 commit comments

Comments
 (0)
Failed to load comments.