Skip to content

Commit

Permalink
Merge pull request #110920 from jluhrsen/automated-cherry-pick-of-#11…
Browse files Browse the repository at this point in the history
…0639-upstream-release-1.24

Automated cherry pick of #110639: fix a bug on endpointslices tests comparing the wrong
  • Loading branch information
k8s-ci-robot committed Jul 7, 2022
2 parents a7d7a12 + 62eeccb commit 7ca3297
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 10 deletions.
7 changes: 7 additions & 0 deletions pkg/controller/endpointslice/metrics/cache.go
Expand Up @@ -91,6 +91,10 @@ func (spc *ServicePortCache) totals(maxEndpointsPerSlice int) (int, int, int) {
actualSlices += eInfo.Slices
desiredSlices += numDesiredSlices(eInfo.Endpoints, maxEndpointsPerSlice)
}
// there is always a placeholder slice
if desiredSlices == 0 {
desiredSlices = 1
}
return actualSlices, desiredSlices, endpoints
}

Expand Down Expand Up @@ -148,6 +152,9 @@ func (c *Cache) updateMetrics() {
// numDesiredSlices calculates the number of EndpointSlices that would exist
// with ideal endpoint distribution.
func numDesiredSlices(numEndpoints, maxEndpointsPerSlice int) int {
if numEndpoints == 0 {
return 0
}
if numEndpoints <= maxEndpointsPerSlice {
return 1
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/controller/endpointslice/metrics/cache_test.go
Expand Up @@ -60,6 +60,23 @@ func TestNumEndpointsAndSlices(t *testing.T) {
expectNumEndpointsAndSlices(t, c, 4, 4, 160)
}

func TestPlaceHolderSlice(t *testing.T) {
c := NewCache(int32(100))

p80 := int32(80)
p443 := int32(443)

pmKey80443 := endpointutil.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}, {Port: &p443}})
pmKey80 := endpointutil.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}})

sp := NewServicePortCache()
sp.Set(pmKey80, EfficiencyInfo{Endpoints: 0, Slices: 1})
sp.Set(pmKey80443, EfficiencyInfo{Endpoints: 0, Slices: 1})

c.UpdateServicePortCache(types.NamespacedName{Namespace: "ns1", Name: "svc1"}, sp)
expectNumEndpointsAndSlices(t, c, 1, 2, 0)
}

func expectNumEndpointsAndSlices(t *testing.T, c *Cache, desired int, actual int, numEndpoints int) {
t.Helper()
if c.numSlicesDesired != desired {
Expand Down
34 changes: 28 additions & 6 deletions pkg/controller/endpointslice/reconciler.go
Expand Up @@ -130,6 +130,7 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
// slices (by address type) for the given service. It creates, updates, or deletes endpoint slices
// to ensure the desired set of pods are represented by endpoint slices.
func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time, addressType discovery.AddressType) error {
errs := []error{}

slicesToCreate := []*discovery.EndpointSlice{}
slicesToUpdate := []*discovery.EndpointSlice{}
Expand Down Expand Up @@ -174,7 +175,23 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor

node, err := r.nodeLister.Get(pod.Spec.NodeName)
if err != nil {
return err
// we are getting the information from the local informer,
// an error different than IsNotFound should not happen
if !errors.IsNotFound(err) {
return err
}
// If the Node specified by the Pod doesn't exist we want to requeue the Service so we
// retry later, but also update the EndpointSlice without the problematic Pod.
// Theoretically, the pod Garbage Collector will remove the Pod, but we want to avoid
// situations where a reference from a Pod to a missing node can leave the EndpointSlice
// stuck forever.
// On the other side, if the service.Spec.PublishNotReadyAddresses is set we just add the
// Pod, since the user is explicitly indicating that the Pod address should be published.
if !service.Spec.PublishNotReadyAddresses {
klog.Warningf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName)
errs = append(errs, fmt.Errorf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName))
continue
}
}
endpoint := podToEndpoint(pod, node, service, addressType)
if len(endpoint.Addresses) > 0 {
Expand Down Expand Up @@ -225,11 +242,11 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor
slicesToDelete = slicesToDelete[:0]
} else {
slicesToCreate = append(slicesToCreate, placeholderSlice)
spMetrics.Set(endpointutil.NewPortMapKey(placeholderSlice.Ports), metrics.EfficiencyInfo{
Endpoints: 0,
Slices: 1,
})
}
spMetrics.Set(endpointutil.NewPortMapKey(placeholderSlice.Ports), metrics.EfficiencyInfo{
Endpoints: 0,
Slices: 1,
})
}

metrics.EndpointsAddedPerSync.WithLabelValues().Observe(float64(totalAdded))
Expand Down Expand Up @@ -257,7 +274,12 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor
slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si)
}

return r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)
err := r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)
if err != nil {
errs = append(errs, err)
}
return utilerrors.NewAggregate(errs)

}

// placeholderSliceCompare is a conversion func for comparing two placeholder endpoint slices.
Expand Down
196 changes: 192 additions & 4 deletions pkg/controller/endpointslice/reconciler_test.go
Expand Up @@ -481,7 +481,7 @@ func TestReconcile1EndpointSlice(t *testing.T) {
{
desc: "Existing placeholder that's the same",
existing: newEndpointSlice(&svc, &endpointMeta{Ports: []discovery.EndpointPort{}, AddressType: discovery.AddressTypeIPv4}),
wantMetrics: expectedMetrics{desiredSlices: 0, actualSlices: 0, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 0, numUpdated: 0, numDeleted: 0, slicesChangedPerSync: 0},
wantMetrics: expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 0, numUpdated: 0, numDeleted: 0, slicesChangedPerSync: 0},
},
{
desc: "Existing placeholder that's different",
Expand Down Expand Up @@ -1438,6 +1438,191 @@ func TestReconcilerFinalizeSvcDeletionTimestamp(t *testing.T) {
}
}

// When a Pod references a Node that is not present in the informer cache the
// EndpointSlices will continue to allow create, updates and deletes through.
// The Pod with the missing reference will be published or retried depending of
// the service.spec.PublishNotReadyAddresses.
// The test considers two Pods on different nodes, one of the Nodes is not present.
func TestReconcilerPodMissingNode(t *testing.T) {
namespace := "test"

nodes := make([]*corev1.Node, 2)
nodes[0] = &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-0"}}
nodes[1] = &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}

pods := make([]*corev1.Pod, 2)
pods[0] = newPod(0, namespace, true, 1, false)
pods[0].Spec.NodeName = nodes[0].Name
pods[1] = newPod(1, namespace, true, 1, false)
pods[1].Spec.NodeName = nodes[1].Name

service, endpointMeta := newServiceAndEndpointMeta("foo", namespace)

testCases := []struct {
name string
publishNotReady bool
existingNodes []*corev1.Node
existingSlice func() *discovery.EndpointSlice
expectedMetrics expectedMetrics
expectError bool
}{{
name: "Create and publishNotReady false",
publishNotReady: false,
existingNodes: []*corev1.Node{nodes[0]},
expectedMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 1,
addedPerSync: 1,
removedPerSync: 0,
numCreated: 1,
numUpdated: 0,
numDeleted: 0,
slicesChangedPerSync: 1,
},
expectError: true,
}, {
name: "Create and publishNotReady true",
publishNotReady: true,
existingNodes: []*corev1.Node{nodes[0]},
expectedMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 2,
addedPerSync: 2,
removedPerSync: 0,
numCreated: 1,
numUpdated: 0,
numDeleted: 0,
slicesChangedPerSync: 1,
},
expectError: false,
}, {
name: "Update and publishNotReady false",
publishNotReady: false,
existingNodes: []*corev1.Node{nodes[0]},
existingSlice: func() *discovery.EndpointSlice {
slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service)
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4))
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4))
return slice
},
expectedMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 1,
addedPerSync: 0,
removedPerSync: 1,
numCreated: 0,
numUpdated: 1,
numDeleted: 0,
slicesChangedPerSync: 1,
},
expectError: true,
}, {
name: "Update and publishNotReady true and all nodes missing",
publishNotReady: true,
existingSlice: func() *discovery.EndpointSlice {
slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service)
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4))
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4))
return slice
},
expectedMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 2,
addedPerSync: 0,
removedPerSync: 0,
numCreated: 0,
numUpdated: 0,
numDeleted: 0,
slicesChangedPerSync: 0,
},
expectError: false,
}, {
name: "Update and publishNotReady true",
publishNotReady: true,
existingNodes: []*corev1.Node{nodes[0]},
existingSlice: func() *discovery.EndpointSlice {
slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service)
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4))
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4))
return slice
},
expectedMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 2,
addedPerSync: 0,
removedPerSync: 0,
numCreated: 0,
numUpdated: 0,
numDeleted: 0,
slicesChangedPerSync: 0,
},
expectError: false,
}, {
name: "Update if publishNotReady false and no nodes are present",
publishNotReady: false,
existingNodes: []*corev1.Node{},
existingSlice: func() *discovery.EndpointSlice {
slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service)
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4))
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4))
return slice
},
expectedMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 0,
addedPerSync: 0,
removedPerSync: 2,
numCreated: 0,
// the slice is updated not deleted
numUpdated: 1,
numDeleted: 0,
slicesChangedPerSync: 1,
},
expectError: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
client := newClientset()
setupMetrics()
r := newReconciler(client, tc.existingNodes, defaultMaxEndpointsPerSlice)

svc := service.DeepCopy()
svc.Spec.PublishNotReadyAddresses = tc.publishNotReady
existingSlices := []*discovery.EndpointSlice{}
if tc.existingSlice != nil {
slice := tc.existingSlice()
existingSlices = append(existingSlices, slice)
_, createErr := client.DiscoveryV1().EndpointSlices(namespace).Create(context.TODO(), slice, metav1.CreateOptions{})
if createErr != nil {
t.Errorf("Expected no error creating endpoint slice")
}
}
err := r.reconcile(svc, pods, existingSlices, time.Now())
if err == nil && tc.expectError {
t.Errorf("Expected error but no error received")
}
if err != nil && !tc.expectError {
t.Errorf("Unexpected error: %v", err)
}

fetchedSlices := fetchEndpointSlices(t, client, namespace)
if len(fetchedSlices) != tc.expectedMetrics.actualSlices {
t.Fatalf("Actual slices %d doesn't match metric %d", len(fetchedSlices), tc.expectedMetrics.actualSlices)
}
expectMetrics(t, tc.expectedMetrics)

})
}
}

func TestReconcileTopology(t *testing.T) {
ns := "testing"
svc, endpointMeta := newServiceAndEndpointMeta("foo", ns)
Expand Down Expand Up @@ -1594,7 +1779,7 @@ func TestReconcileTopology(t *testing.T) {
expectedCrossZoneHints: 1,
expectedMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
actualSlices: 2,
desiredEndpoints: 9,
addedPerSync: 0,
removedPerSync: 0,
Expand All @@ -1617,7 +1802,7 @@ func TestReconcileTopology(t *testing.T) {
expectedCrossZoneHints: 0,
expectedMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
actualSlices: 2,
desiredEndpoints: 9,
addedPerSync: 0,
removedPerSync: 0,
Expand Down Expand Up @@ -1650,6 +1835,9 @@ func TestReconcileTopology(t *testing.T) {
cmc.Check(t)
expectMetrics(t, tc.expectedMetrics)
fetchedSlices := fetchEndpointSlices(t, client, ns)
if len(fetchedSlices) != tc.expectedMetrics.actualSlices {
t.Fatalf("Actual slices %d doesn't match metric %d", len(fetchedSlices), tc.expectedMetrics.actualSlices)
}

if tc.expectedHints == nil {
for _, slice := range fetchedSlices {
Expand Down Expand Up @@ -1864,7 +2052,7 @@ func expectMetrics(t *testing.T, em expectedMetrics) {

actualNumSlices, err := testutil.GetGaugeMetricValue(metrics.NumEndpointSlices.WithLabelValues())
handleErr(t, err, "numEndpointSlices")
if actualDesiredSlices != float64(em.desiredSlices) {
if actualNumSlices != float64(em.actualSlices) {
t.Errorf("Expected numEndpointSlices to be %d, got %v", em.actualSlices, actualNumSlices)
}

Expand Down

0 comments on commit 7ca3297

Please sign in to comment.