Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating EndpointSlices to use PublishNotReadyAddresses from Services. #84573

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/controller/endpointslice/reconciler.go
Expand Up @@ -90,7 +90,7 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
if err != nil {
return err
}
endpoint := podToEndpoint(pod, node)
endpoint := podToEndpoint(pod, node, service.Spec.PublishNotReadyAddresses)
desiredEndpointsByPortMap[epHash].Insert(&endpoint)
numDesiredEndpoints++
}
Expand Down
48 changes: 41 additions & 7 deletions pkg/controller/endpointslice/reconciler_test.go
Expand Up @@ -139,6 +139,40 @@ func TestReconcile1EndpointSlice(t *testing.T) {
expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 0, numUpdated: 1, numDeleted: 0})
}

// when a Service has PublishNotReadyAddresses set to true, corresponding
// Endpoints should be considered ready, even if the backing Pod is not.
func TestReconcile1EndpointSlicePublishNotReadyAddresses(t *testing.T) {
client := newClientset()
namespace := "test"
svc, _ := newServiceAndEndpointMeta("foo", namespace)
svc.Spec.PublishNotReadyAddresses = true

// start with 50 pods, 1/3 not ready
pods := []*corev1.Pod{}
for i := 0; i < 50; i++ {
ready := !(i%3 == 0)
pods = append(pods, newPod(i, namespace, ready, 1))
}

r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now())

// Only 1 action, an EndpointSlice create
assert.Len(t, client.Actions(), 1, "Expected 1 additional clientset action")
expectActions(t, client.Actions(), 1, "create", "endpointslices")

// Two endpoint slices should be completely full, the remainder should be in another one
endpointSlices := fetchEndpointSlices(t, client, namespace)
for _, endpointSlice := range endpointSlices {
for i, endpoint := range endpointSlice.Endpoints {
if !*endpoint.Conditions.Ready {
t.Errorf("Expected endpoints[%d] to be ready", i)
}
}
}
expectUnorderedSlicesWithLengths(t, endpointSlices, []int{50})
}

// a simple use case with 250 pods matching a service and no existing slices
// reconcile should create 3 slices, completely filling 2 of them
func TestReconcileManyPods(t *testing.T) {
Expand Down Expand Up @@ -190,13 +224,13 @@ func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) {
// have approximately 1/4 in first slice
endpointSlice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
for i := 1; i < len(pods)-4; i += 4 {
endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}))
endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, false))
}

// have approximately 1/4 in second slice
endpointSlice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
for i := 3; i < len(pods)-4; i += 4 {
endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}))
endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, false))
}

existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2}
Expand Down Expand Up @@ -242,13 +276,13 @@ func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) {
// have approximately 1/4 in first slice
endpointSlice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
for i := 1; i < len(pods)-4; i += 4 {
endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}))
endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, false))
}

// have approximately 1/4 in second slice
endpointSlice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
for i := 3; i < len(pods)-4; i += 4 {
endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}))
endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, false))
}

existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2}
Expand Down Expand Up @@ -324,7 +358,7 @@ func TestReconcileEndpointSlicesRecycling(t *testing.T) {
if i%30 == 0 {
existingSlices = append(existingSlices, newEmptyEndpointSlice(sliceNum, namespace, endpointMeta, svc))
}
existingSlices[sliceNum].Endpoints = append(existingSlices[sliceNum].Endpoints, podToEndpoint(pod, &corev1.Node{}))
existingSlices[sliceNum].Endpoints = append(existingSlices[sliceNum].Endpoints, podToEndpoint(pod, &corev1.Node{}, false))
}

createEndpointSlices(t, client, namespace, existingSlices)
Expand Down Expand Up @@ -362,15 +396,15 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
slice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
for i := 0; i < 80; i++ {
pod := newPod(i, namespace, true, 1)
slice1.Endpoints = append(slice1.Endpoints, podToEndpoint(pod, &corev1.Node{}))
slice1.Endpoints = append(slice1.Endpoints, podToEndpoint(pod, &corev1.Node{}, false))
pods = append(pods, pod)
}
existingSlices = append(existingSlices, slice1)

slice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
for i := 100; i < 120; i++ {
pod := newPod(i, namespace, true, 1)
slice2.Endpoints = append(slice2.Endpoints, podToEndpoint(pod, &corev1.Node{}))
slice2.Endpoints = append(slice2.Endpoints, podToEndpoint(pod, &corev1.Node{}, false))
pods = append(pods, pod)
}
existingSlices = append(existingSlices, slice2)
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/endpointslice/utils.go
Expand Up @@ -35,8 +35,8 @@ import (
// podEndpointChanged returns true if the results of podToEndpoint are different
// for the pods passed to this function.
func podEndpointChanged(pod1, pod2 *corev1.Pod) bool {
endpoint1 := podToEndpoint(pod1, &corev1.Node{})
endpoint2 := podToEndpoint(pod2, &corev1.Node{})
endpoint1 := podToEndpoint(pod1, &corev1.Node{}, false)
endpoint2 := podToEndpoint(pod2, &corev1.Node{}, false)

endpoint1.TargetRef.ResourceVersion = ""
endpoint2.TargetRef.ResourceVersion = ""
Expand All @@ -45,7 +45,7 @@ func podEndpointChanged(pod1, pod2 *corev1.Pod) bool {
}

// podToEndpoint returns an Endpoint object generated from a Pod and Node.
func podToEndpoint(pod *corev1.Pod, node *corev1.Node) discovery.Endpoint {
func podToEndpoint(pod *corev1.Pod, node *corev1.Node, publishNotReadyAddresses bool) discovery.Endpoint {
// Build out topology information. This is currently limited to hostname,
// zone, and region, but this will be expanded in the future.
topology := map[string]string{}
Expand All @@ -67,7 +67,7 @@ func podToEndpoint(pod *corev1.Pod, node *corev1.Node) discovery.Endpoint {
}
}

ready := podutil.IsPodReady(pod)
ready := publishNotReadyAddresses || podutil.IsPodReady(pod)
return discovery.Endpoint{
Addresses: getEndpointAddresses(pod.Status),
Conditions: discovery.EndpointConditions{
Expand Down
50 changes: 44 additions & 6 deletions pkg/controller/endpointslice/utils_test.go
Expand Up @@ -18,6 +18,7 @@ package endpointslice

import (
"fmt"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -91,10 +92,11 @@ func TestPodToEndpoint(t *testing.T) {
}

testCases := []struct {
name string
pod *v1.Pod
node *v1.Node
expectedEndpoint discovery.Endpoint
name string
pod *v1.Pod
node *v1.Node
expectedEndpoint discovery.Endpoint
publishNotReadyAddresses bool
}{
{
name: "Ready pod",
Expand All @@ -112,6 +114,23 @@ func TestPodToEndpoint(t *testing.T) {
},
},
},
{
name: "Ready pod + publishNotReadyAddresses",
pod: readyPod,
publishNotReadyAddresses: true,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"1.2.3.5"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Namespace: ns,
Name: readyPod.Name,
UID: readyPod.UID,
ResourceVersion: readyPod.ResourceVersion,
},
},
},
{
name: "Unready pod",
pod: unreadyPod,
Expand All @@ -128,6 +147,23 @@ func TestPodToEndpoint(t *testing.T) {
},
},
},
{
name: "Unready pod + publishNotReadyAddresses",
pod: unreadyPod,
publishNotReadyAddresses: true,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"1.2.3.5"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Namespace: ns,
Name: readyPod.Name,
UID: readyPod.UID,
ResourceVersion: readyPod.ResourceVersion,
},
},
},
{
name: "Ready pod + node labels",
pod: readyPod,
Expand Down Expand Up @@ -174,8 +210,10 @@ func TestPodToEndpoint(t *testing.T) {

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
endpoint := podToEndpoint(testCase.pod, testCase.node)
assert.EqualValues(t, testCase.expectedEndpoint, endpoint, "Test case failed: %s", testCase.name)
endpoint := podToEndpoint(testCase.pod, testCase.node, testCase.publishNotReadyAddresses)
if !reflect.DeepEqual(testCase.expectedEndpoint, endpoint) {
t.Errorf("Expected endpoint: %v, got: %v", testCase.expectedEndpoint, endpoint)
}
})
}
}
Expand Down