diff --git a/pkg/controller/endpointslice/metrics/cache.go b/pkg/controller/endpointslice/metrics/cache.go index 86a67f8a54be..2bd22316167f 100644 --- a/pkg/controller/endpointslice/metrics/cache.go +++ b/pkg/controller/endpointslice/metrics/cache.go @@ -91,6 +91,10 @@ func (spc *ServicePortCache) totals(maxEndpointsPerSlice int) (int, int, int) { actualSlices += eInfo.Slices desiredSlices += numDesiredSlices(eInfo.Endpoints, maxEndpointsPerSlice) } + // there is always a placeholder slice + if desiredSlices == 0 { + desiredSlices = 1 + } return actualSlices, desiredSlices, endpoints } @@ -148,6 +152,9 @@ func (c *Cache) updateMetrics() { // numDesiredSlices calculates the number of EndpointSlices that would exist // with ideal endpoint distribution. func numDesiredSlices(numEndpoints, maxEndpointsPerSlice int) int { + if numEndpoints == 0 { + return 0 + } if numEndpoints <= maxEndpointsPerSlice { return 1 } diff --git a/pkg/controller/endpointslice/metrics/cache_test.go b/pkg/controller/endpointslice/metrics/cache_test.go index df88ec183253..5cfd58bad9e3 100644 --- a/pkg/controller/endpointslice/metrics/cache_test.go +++ b/pkg/controller/endpointslice/metrics/cache_test.go @@ -60,6 +60,23 @@ func TestNumEndpointsAndSlices(t *testing.T) { expectNumEndpointsAndSlices(t, c, 4, 4, 160) } +func TestPlaceHolderSlice(t *testing.T) { + c := NewCache(int32(100)) + + p80 := int32(80) + p443 := int32(443) + + pmKey80443 := endpointutil.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}, {Port: &p443}}) + pmKey80 := endpointutil.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}}) + + sp := NewServicePortCache() + sp.Set(pmKey80, EfficiencyInfo{Endpoints: 0, Slices: 1}) + sp.Set(pmKey80443, EfficiencyInfo{Endpoints: 0, Slices: 1}) + + c.UpdateServicePortCache(types.NamespacedName{Namespace: "ns1", Name: "svc1"}, sp) + expectNumEndpointsAndSlices(t, c, 1, 2, 0) +} + func expectNumEndpointsAndSlices(t *testing.T, c *Cache, desired int, actual int, numEndpoints int) { t.Helper() if c.numSlicesDesired != desired { diff --git a/pkg/controller/endpointslice/reconciler.go b/pkg/controller/endpointslice/reconciler.go index d3ff4228a4b5..3b40cdef395b 100644 --- a/pkg/controller/endpointslice/reconciler.go +++ b/pkg/controller/endpointslice/reconciler.go @@ -130,6 +130,7 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis // slices (by address type) for the given service. It creates, updates, or deletes endpoint slices // to ensure the desired set of pods are represented by endpoint slices. func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time, addressType discovery.AddressType) error { + errs := []error{} slicesToCreate := []*discovery.EndpointSlice{} slicesToUpdate := []*discovery.EndpointSlice{} @@ -174,7 +175,23 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor node, err := r.nodeLister.Get(pod.Spec.NodeName) if err != nil { - return err + // we are getting the information from the local informer, + // an error different than IsNotFound should not happen + if !errors.IsNotFound(err) { + return err + } + // If the Node specified by the Pod doesn't exist we want to requeue the Service so we + // retry later, but also update the EndpointSlice without the problematic Pod. + // Theoretically, the pod Garbage Collector will remove the Pod, but we want to avoid + // situations where a reference from a Pod to a missing node can leave the EndpointSlice + // stuck forever. + // On the other side, if the service.Spec.PublishNotReadyAddresses is set we just add the + // Pod, since the user is explicitly indicating that the Pod address should be published. + if !service.Spec.PublishNotReadyAddresses { + klog.Warningf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName) + errs = append(errs, fmt.Errorf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName)) + continue + } } endpoint := podToEndpoint(pod, node, service, addressType) if len(endpoint.Addresses) > 0 { @@ -225,11 +242,11 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor slicesToDelete = slicesToDelete[:0] } else { slicesToCreate = append(slicesToCreate, placeholderSlice) - spMetrics.Set(endpointutil.NewPortMapKey(placeholderSlice.Ports), metrics.EfficiencyInfo{ - Endpoints: 0, - Slices: 1, - }) } + spMetrics.Set(endpointutil.NewPortMapKey(placeholderSlice.Ports), metrics.EfficiencyInfo{ + Endpoints: 0, + Slices: 1, + }) } metrics.EndpointsAddedPerSync.WithLabelValues().Observe(float64(totalAdded)) @@ -257,7 +274,12 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si) } - return r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime) + err := r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime) + if err != nil { + errs = append(errs, err) + } + return utilerrors.NewAggregate(errs) + } // placeholderSliceCompare is a conversion func for comparing two placeholder endpoint slices. diff --git a/pkg/controller/endpointslice/reconciler_test.go b/pkg/controller/endpointslice/reconciler_test.go index 6c31a2145d1b..ff5f99abe2e6 100644 --- a/pkg/controller/endpointslice/reconciler_test.go +++ b/pkg/controller/endpointslice/reconciler_test.go @@ -481,7 +481,7 @@ func TestReconcile1EndpointSlice(t *testing.T) { { desc: "Existing placeholder that's the same", existing: newEndpointSlice(&svc, &endpointMeta{Ports: []discovery.EndpointPort{}, AddressType: discovery.AddressTypeIPv4}), - wantMetrics: expectedMetrics{desiredSlices: 0, actualSlices: 0, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 0, numUpdated: 0, numDeleted: 0, slicesChangedPerSync: 0}, + wantMetrics: expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 0, numUpdated: 0, numDeleted: 0, slicesChangedPerSync: 0}, }, { desc: "Existing placeholder that's different", @@ -1438,6 +1438,191 @@ func TestReconcilerFinalizeSvcDeletionTimestamp(t *testing.T) { } } +// When a Pod references a Node that is not present in the informer cache the +// EndpointSlices will continue to allow create, updates and deletes through. +// The Pod with the missing reference will be published or retried depending of +// the service.spec.PublishNotReadyAddresses. +// The test considers two Pods on different nodes, one of the Nodes is not present. +func TestReconcilerPodMissingNode(t *testing.T) { + namespace := "test" + + nodes := make([]*corev1.Node, 2) + nodes[0] = &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-0"}} + nodes[1] = &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}} + + pods := make([]*corev1.Pod, 2) + pods[0] = newPod(0, namespace, true, 1, false) + pods[0].Spec.NodeName = nodes[0].Name + pods[1] = newPod(1, namespace, true, 1, false) + pods[1].Spec.NodeName = nodes[1].Name + + service, endpointMeta := newServiceAndEndpointMeta("foo", namespace) + + testCases := []struct { + name string + publishNotReady bool + existingNodes []*corev1.Node + existingSlice func() *discovery.EndpointSlice + expectedMetrics expectedMetrics + expectError bool + }{{ + name: "Create and publishNotReady false", + publishNotReady: false, + existingNodes: []*corev1.Node{nodes[0]}, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 1, + addedPerSync: 1, + removedPerSync: 0, + numCreated: 1, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: 1, + }, + expectError: true, + }, { + name: "Create and publishNotReady true", + publishNotReady: true, + existingNodes: []*corev1.Node{nodes[0]}, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 2, + addedPerSync: 2, + removedPerSync: 0, + numCreated: 1, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: 1, + }, + expectError: false, + }, { + name: "Update and publishNotReady false", + publishNotReady: false, + existingNodes: []*corev1.Node{nodes[0]}, + existingSlice: func() *discovery.EndpointSlice { + slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4)) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4)) + return slice + }, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 1, + addedPerSync: 0, + removedPerSync: 1, + numCreated: 0, + numUpdated: 1, + numDeleted: 0, + slicesChangedPerSync: 1, + }, + expectError: true, + }, { + name: "Update and publishNotReady true and all nodes missing", + publishNotReady: true, + existingSlice: func() *discovery.EndpointSlice { + slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4)) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4)) + return slice + }, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 2, + addedPerSync: 0, + removedPerSync: 0, + numCreated: 0, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: 0, + }, + expectError: false, + }, { + name: "Update and publishNotReady true", + publishNotReady: true, + existingNodes: []*corev1.Node{nodes[0]}, + existingSlice: func() *discovery.EndpointSlice { + slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4)) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4)) + return slice + }, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 2, + addedPerSync: 0, + removedPerSync: 0, + numCreated: 0, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: 0, + }, + expectError: false, + }, { + name: "Update if publishNotReady false and no nodes are present", + publishNotReady: false, + existingNodes: []*corev1.Node{}, + existingSlice: func() *discovery.EndpointSlice { + slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4)) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4)) + return slice + }, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 0, + addedPerSync: 0, + removedPerSync: 2, + numCreated: 0, + // the slice is updated not deleted + numUpdated: 1, + numDeleted: 0, + slicesChangedPerSync: 1, + }, + expectError: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + client := newClientset() + setupMetrics() + r := newReconciler(client, tc.existingNodes, defaultMaxEndpointsPerSlice) + + svc := service.DeepCopy() + svc.Spec.PublishNotReadyAddresses = tc.publishNotReady + existingSlices := []*discovery.EndpointSlice{} + if tc.existingSlice != nil { + slice := tc.existingSlice() + existingSlices = append(existingSlices, slice) + _, createErr := client.DiscoveryV1().EndpointSlices(namespace).Create(context.TODO(), slice, metav1.CreateOptions{}) + if createErr != nil { + t.Errorf("Expected no error creating endpoint slice") + } + } + err := r.reconcile(svc, pods, existingSlices, time.Now()) + if err == nil && tc.expectError { + t.Errorf("Expected error but no error received") + } + if err != nil && !tc.expectError { + t.Errorf("Unexpected error: %v", err) + } + + fetchedSlices := fetchEndpointSlices(t, client, namespace) + if len(fetchedSlices) != tc.expectedMetrics.actualSlices { + t.Fatalf("Actual slices %d doesn't match metric %d", len(fetchedSlices), tc.expectedMetrics.actualSlices) + } + expectMetrics(t, tc.expectedMetrics) + + }) + } +} + func TestReconcileTopology(t *testing.T) { ns := "testing" svc, endpointMeta := newServiceAndEndpointMeta("foo", ns) @@ -1594,7 +1779,7 @@ func TestReconcileTopology(t *testing.T) { expectedCrossZoneHints: 1, expectedMetrics: expectedMetrics{ desiredSlices: 1, - actualSlices: 1, + actualSlices: 2, desiredEndpoints: 9, addedPerSync: 0, removedPerSync: 0, @@ -1617,7 +1802,7 @@ func TestReconcileTopology(t *testing.T) { expectedCrossZoneHints: 0, expectedMetrics: expectedMetrics{ desiredSlices: 1, - actualSlices: 1, + actualSlices: 2, desiredEndpoints: 9, addedPerSync: 0, removedPerSync: 0, @@ -1650,6 +1835,9 @@ func TestReconcileTopology(t *testing.T) { cmc.Check(t) expectMetrics(t, tc.expectedMetrics) fetchedSlices := fetchEndpointSlices(t, client, ns) + if len(fetchedSlices) != tc.expectedMetrics.actualSlices { + t.Fatalf("Actual slices %d doesn't match metric %d", len(fetchedSlices), tc.expectedMetrics.actualSlices) + } if tc.expectedHints == nil { for _, slice := range fetchedSlices { @@ -1864,7 +2052,7 @@ func expectMetrics(t *testing.T, em expectedMetrics) { actualNumSlices, err := testutil.GetGaugeMetricValue(metrics.NumEndpointSlices.WithLabelValues()) handleErr(t, err, "numEndpointSlices") - if actualDesiredSlices != float64(em.desiredSlices) { + if actualNumSlices != float64(em.actualSlices) { t.Errorf("Expected numEndpointSlices to be %d, got %v", em.actualSlices, actualNumSlices) }