Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #85368: Deep copying EndpointSlices in reconciler before modifying #85583

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
66 changes: 54 additions & 12 deletions pkg/controller/endpointslice/endpointslice_controller_test.go
Expand Up @@ -18,13 +18,15 @@ package endpointslice

import (
"fmt"
"reflect"
"testing"
"time"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
Expand Down Expand Up @@ -239,6 +241,8 @@ func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) {
AddressType: discovery.AddressTypeIPv4,
}}

cmc := newCacheMutationCheck(endpointSlices)

// need to add them to both store and fake clientset
for _, endpointSlice := range endpointSlices {
err := esController.endpointSliceStore.Add(endpointSlice)
Expand All @@ -262,6 +266,9 @@ func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) {
// only 2 slices should match, 2 should be deleted, 1 should be updated as a placeholder
expectAction(t, client.Actions(), numActionsBefore, "update", "endpointslices")
expectAction(t, client.Actions(), numActionsBefore+1, "delete", "endpointslices")

// ensure cache mutation has not occurred
cmc.Check(t)
}

// Ensure SyncService handles a variety of protocols and IPs appropriately.
Expand Down Expand Up @@ -316,17 +323,17 @@ func TestSyncServiceFull(t *testing.T) {
assert.Len(t, slice.Endpoints, 1, "Expected 1 endpoints in first slice")
assert.Equal(t, slice.Annotations["endpoints.kubernetes.io/last-change-trigger-time"], serviceCreateTime.Format(time.RFC3339Nano))
assert.EqualValues(t, []discovery.EndpointPort{{
Name: strPtr("sctp-example"),
Name: utilpointer.StringPtr("sctp-example"),
Protocol: protoPtr(v1.ProtocolSCTP),
Port: int32Ptr(int32(3456)),
Port: utilpointer.Int32Ptr(int32(3456)),
}, {
Name: strPtr("udp-example"),
Name: utilpointer.StringPtr("udp-example"),
Protocol: protoPtr(v1.ProtocolUDP),
Port: int32Ptr(int32(161)),
Port: utilpointer.Int32Ptr(int32(161)),
}, {
Name: strPtr("tcp-example"),
Name: utilpointer.StringPtr("tcp-example"),
Protocol: protoPtr(v1.ProtocolTCP),
Port: int32Ptr(int32(80)),
Port: utilpointer.Int32Ptr(int32(80)),
}}, slice.Ports)

assert.ElementsMatch(t, []discovery.Endpoint{{
Expand Down Expand Up @@ -382,14 +389,49 @@ func expectAction(t *testing.T, actions []k8stesting.Action, index int, verb, re
}
}

func strPtr(str string) *string {
return &str
}

// protoPtr takes a Protocol and returns a pointer to it.
func protoPtr(proto v1.Protocol) *v1.Protocol {
return &proto
}

func int32Ptr(num int32) *int32 {
return &num
// cacheMutationCheck helps ensure that cached objects have not been changed
// in any way throughout a test run.
type cacheMutationCheck struct {
objects []cacheObject
}

// cacheObject stores a reference to an original object as well as a deep copy
// of that object to track any mutations in the original object.
type cacheObject struct {
original runtime.Object
deepCopy runtime.Object
}

// newCacheMutationCheck initializes a cacheMutationCheck with EndpointSlices.
func newCacheMutationCheck(endpointSlices []*discovery.EndpointSlice) cacheMutationCheck {
cmc := cacheMutationCheck{}
for _, endpointSlice := range endpointSlices {
cmc.Add(endpointSlice)
}
return cmc
}

// Add appends a runtime.Object and a deep copy of that object into the
// cacheMutationCheck.
func (cmc *cacheMutationCheck) Add(o runtime.Object) {
cmc.objects = append(cmc.objects, cacheObject{
original: o,
deepCopy: o.DeepCopyObject(),
})
}

// Check verifies that no objects in the cacheMutationCheck have been mutated.
func (cmc *cacheMutationCheck) Check(t *testing.T) {
for _, o := range cmc.objects {
if !reflect.DeepEqual(o.original, o.deepCopy) {
// Cached objects can't be safely mutated and instead should be deep
// copied before changed in any way.
t.Errorf("Cached object was unexpectedly mutated. Original: %+v, Mutated: %+v", o.deepCopy, o.original)
}
}
}
12 changes: 9 additions & 3 deletions pkg/controller/endpointslice/reconciler.go
Expand Up @@ -290,9 +290,11 @@ func (r *reconciler) reconcileByPortMapping(
// if no endpoints desired in this slice, mark for deletion
sliceNamesToDelete.Insert(existingSlice.Name)
} else {
// otherwise, mark for update
existingSlice.Endpoints = newEndpoints
sliceNamesToUpdate.Insert(existingSlice.Name)
// otherwise, copy and mark for update
epSlice := existingSlice.DeepCopy()
epSlice.Endpoints = newEndpoints
slicesByName[existingSlice.Name] = epSlice
sliceNamesToUpdate.Insert(epSlice.Name)
}
} else {
// slices with no changes will be useful if there are leftover endpoints
Expand Down Expand Up @@ -344,6 +346,10 @@ func (r *reconciler) reconcileByPortMapping(
// If we didn't find a sliceToFill, generate a new empty one.
if sliceToFill == nil {
sliceToFill = newEndpointSlice(service, endpointMeta)
} else {
// deep copy required to modify this slice.
sliceToFill = sliceToFill.DeepCopy()
slicesByName[sliceToFill.Name] = sliceToFill
}

// Fill the slice up with remaining endpoints.
Expand Down
20 changes: 20 additions & 0 deletions pkg/controller/endpointslice/reconciler_test.go
Expand Up @@ -318,6 +318,7 @@ func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) {
}

existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2}
cmc := newCacheMutationCheck(existingSlices)
createEndpointSlices(t, client, namespace, existingSlices)

numActionsBefore := len(client.Actions())
Expand All @@ -332,6 +333,9 @@ func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) {
// 1 new slice (0->100) + 1 updated slice (62->89)
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{89, 61, 100})
expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 250, addedPerSync: 127, removedPerSync: 0, numCreated: 1, numUpdated: 1, numDeleted: 0})

// ensure cache mutation has not occurred
cmc.Check(t)
}

// now with preexisting slices, we have 300 pods matching a service
Expand Down Expand Up @@ -370,6 +374,7 @@ func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) {
}

existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2}
cmc := newCacheMutationCheck(existingSlices)
createEndpointSlices(t, client, namespace, existingSlices)

numActionsBefore := len(client.Actions())
Expand All @@ -383,6 +388,9 @@ func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) {
// 2 new slices (100, 52) in addition to existing slices (74, 74)
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{74, 74, 100, 52})
expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 4, desiredEndpoints: 300, addedPerSync: 152, removedPerSync: 0, numCreated: 2, numUpdated: 0, numDeleted: 0})

// ensure cache mutation has not occurred
cmc.Check(t)
}

// In some cases, such as a service port change, all slices for that service will require a change
Expand Down Expand Up @@ -445,6 +453,7 @@ func TestReconcileEndpointSlicesRecycling(t *testing.T) {
existingSlices[sliceNum].Endpoints = append(existingSlices[sliceNum].Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc))
}

cmc := newCacheMutationCheck(existingSlices)
createEndpointSlices(t, client, namespace, existingSlices)

numActionsBefore := len(client.Actions())
Expand All @@ -463,6 +472,9 @@ func TestReconcileEndpointSlicesRecycling(t *testing.T) {
// thanks to recycling, we get a free repack of endpoints, resulting in 3 full slices instead of 10 mostly empty slices
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 100})
expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 300, addedPerSync: 300, removedPerSync: 0, numCreated: 0, numUpdated: 3, numDeleted: 7})

// ensure cache mutation has not occurred
cmc.Check(t)
}

// In this test, we want to verify that endpoints are added to a slice that will
Expand Down Expand Up @@ -493,6 +505,7 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
}
existingSlices = append(existingSlices, slice2)

cmc := newCacheMutationCheck(existingSlices)
createEndpointSlices(t, client, namespace, existingSlices)

// ensure that endpoints in each slice will be marked for update.
Expand All @@ -519,6 +532,9 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {

// additional pods should get added to fuller slice
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{95, 20})

// ensure cache mutation has not occurred
cmc.Check(t)
}

// In this test, we want to verify that old EndpointSlices with a deprecated IP
Expand Down Expand Up @@ -552,6 +568,7 @@ func TestReconcileEndpointSlicesReplaceDeprecated(t *testing.T) {

createEndpointSlices(t, client, namespace, existingSlices)

cmc := newCacheMutationCheck(existingSlices)
r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
reconcileHelper(t, r, &svc, pods, existingSlices, time.Now())

Expand All @@ -569,6 +586,9 @@ func TestReconcileEndpointSlicesReplaceDeprecated(t *testing.T) {
t.Errorf("Expected address type to be IPv4, got %s", endpointSlice.AddressType)
}
}

// ensure cache mutation has not occurred
cmc.Check(t)
}

// Named ports can map to different port numbers on different pods.
Expand Down