Skip to content

Commit

Permalink
endpointslices: node missing on Pod scenario
Browse files Browse the repository at this point in the history
When a Pod is referencing a Node that doesn't exist on the local
informer cache, the current behavior was to return an error to
retry later and stop processing.
However, this can cause scenarios that a missing node leaves a
Slice stuck, it can no reflect other changes, or be created.
Also, this doesn't respect the publishNotReadyAddresses options
on Services, that considers ok to publish pod Addresses that are
known to not be ready.

The new behavior keeps retrying the problematic Service, but it
keeps processing the updates, reflacting current state on the
EndpointSlice. If the publishNotReadyAddresses is set, a missing
node on a Pod is not treated as an error.
  • Loading branch information
aojea authored and jluhrsen committed Jul 1, 2022
1 parent 698d989 commit 62eeccb
Show file tree
Hide file tree
Showing 2 changed files with 209 additions and 2 deletions.
26 changes: 24 additions & 2 deletions pkg/controller/endpointslice/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
// slices (by address type) for the given service. It creates, updates, or deletes endpoint slices
// to ensure the desired set of pods are represented by endpoint slices.
func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time, addressType discovery.AddressType) error {
errs := []error{}

slicesToCreate := []*discovery.EndpointSlice{}
slicesToUpdate := []*discovery.EndpointSlice{}
Expand Down Expand Up @@ -174,7 +175,23 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor

node, err := r.nodeLister.Get(pod.Spec.NodeName)
if err != nil {
return err
// we are getting the information from the local informer,
// an error different than IsNotFound should not happen
if !errors.IsNotFound(err) {
return err
}
// If the Node specified by the Pod doesn't exist we want to requeue the Service so we
// retry later, but also update the EndpointSlice without the problematic Pod.
// Theoretically, the pod Garbage Collector will remove the Pod, but we want to avoid
// situations where a reference from a Pod to a missing node can leave the EndpointSlice
// stuck forever.
// On the other side, if the service.Spec.PublishNotReadyAddresses is set we just add the
// Pod, since the user is explicitly indicating that the Pod address should be published.
if !service.Spec.PublishNotReadyAddresses {
klog.Warningf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName)
errs = append(errs, fmt.Errorf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName))
continue
}
}
endpoint := podToEndpoint(pod, node, service, addressType)
if len(endpoint.Addresses) > 0 {
Expand Down Expand Up @@ -257,7 +274,12 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor
slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si)
}

return r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)
err := r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)
if err != nil {
errs = append(errs, err)
}
return utilerrors.NewAggregate(errs)

}

// placeholderSliceCompare is a conversion func for comparing two placeholder endpoint slices.
Expand Down
185 changes: 185 additions & 0 deletions pkg/controller/endpointslice/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1438,6 +1438,191 @@ func TestReconcilerFinalizeSvcDeletionTimestamp(t *testing.T) {
}
}

// When a Pod references a Node that is not present in the informer cache the
// EndpointSlices will continue to allow create, updates and deletes through.
// The Pod with the missing reference will be published or retried depending of
// the service.spec.PublishNotReadyAddresses.
// The test considers two Pods on different nodes, one of the Nodes is not present.
func TestReconcilerPodMissingNode(t *testing.T) {
namespace := "test"

nodes := make([]*corev1.Node, 2)
nodes[0] = &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-0"}}
nodes[1] = &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}

pods := make([]*corev1.Pod, 2)
pods[0] = newPod(0, namespace, true, 1, false)
pods[0].Spec.NodeName = nodes[0].Name
pods[1] = newPod(1, namespace, true, 1, false)
pods[1].Spec.NodeName = nodes[1].Name

service, endpointMeta := newServiceAndEndpointMeta("foo", namespace)

testCases := []struct {
name string
publishNotReady bool
existingNodes []*corev1.Node
existingSlice func() *discovery.EndpointSlice
expectedMetrics expectedMetrics
expectError bool
}{{
name: "Create and publishNotReady false",
publishNotReady: false,
existingNodes: []*corev1.Node{nodes[0]},
expectedMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 1,
addedPerSync: 1,
removedPerSync: 0,
numCreated: 1,
numUpdated: 0,
numDeleted: 0,
slicesChangedPerSync: 1,
},
expectError: true,
}, {
name: "Create and publishNotReady true",
publishNotReady: true,
existingNodes: []*corev1.Node{nodes[0]},
expectedMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 2,
addedPerSync: 2,
removedPerSync: 0,
numCreated: 1,
numUpdated: 0,
numDeleted: 0,
slicesChangedPerSync: 1,
},
expectError: false,
}, {
name: "Update and publishNotReady false",
publishNotReady: false,
existingNodes: []*corev1.Node{nodes[0]},
existingSlice: func() *discovery.EndpointSlice {
slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service)
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4))
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4))
return slice
},
expectedMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 1,
addedPerSync: 0,
removedPerSync: 1,
numCreated: 0,
numUpdated: 1,
numDeleted: 0,
slicesChangedPerSync: 1,
},
expectError: true,
}, {
name: "Update and publishNotReady true and all nodes missing",
publishNotReady: true,
existingSlice: func() *discovery.EndpointSlice {
slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service)
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4))
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4))
return slice
},
expectedMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 2,
addedPerSync: 0,
removedPerSync: 0,
numCreated: 0,
numUpdated: 0,
numDeleted: 0,
slicesChangedPerSync: 0,
},
expectError: false,
}, {
name: "Update and publishNotReady true",
publishNotReady: true,
existingNodes: []*corev1.Node{nodes[0]},
existingSlice: func() *discovery.EndpointSlice {
slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service)
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4))
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4))
return slice
},
expectedMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 2,
addedPerSync: 0,
removedPerSync: 0,
numCreated: 0,
numUpdated: 0,
numDeleted: 0,
slicesChangedPerSync: 0,
},
expectError: false,
}, {
name: "Update if publishNotReady false and no nodes are present",
publishNotReady: false,
existingNodes: []*corev1.Node{},
existingSlice: func() *discovery.EndpointSlice {
slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service)
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4))
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4))
return slice
},
expectedMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 0,
addedPerSync: 0,
removedPerSync: 2,
numCreated: 0,
// the slice is updated not deleted
numUpdated: 1,
numDeleted: 0,
slicesChangedPerSync: 1,
},
expectError: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
client := newClientset()
setupMetrics()
r := newReconciler(client, tc.existingNodes, defaultMaxEndpointsPerSlice)

svc := service.DeepCopy()
svc.Spec.PublishNotReadyAddresses = tc.publishNotReady
existingSlices := []*discovery.EndpointSlice{}
if tc.existingSlice != nil {
slice := tc.existingSlice()
existingSlices = append(existingSlices, slice)
_, createErr := client.DiscoveryV1().EndpointSlices(namespace).Create(context.TODO(), slice, metav1.CreateOptions{})
if createErr != nil {
t.Errorf("Expected no error creating endpoint slice")
}
}
err := r.reconcile(svc, pods, existingSlices, time.Now())
if err == nil && tc.expectError {
t.Errorf("Expected error but no error received")
}
if err != nil && !tc.expectError {
t.Errorf("Unexpected error: %v", err)
}

fetchedSlices := fetchEndpointSlices(t, client, namespace)
if len(fetchedSlices) != tc.expectedMetrics.actualSlices {
t.Fatalf("Actual slices %d doesn't match metric %d", len(fetchedSlices), tc.expectedMetrics.actualSlices)
}
expectMetrics(t, tc.expectedMetrics)

})
}
}

func TestReconcileTopology(t *testing.T) {
ns := "testing"
svc, endpointMeta := newServiceAndEndpointMeta("foo", ns)
Expand Down

0 comments on commit 62eeccb

Please sign in to comment.