Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #86092: Ensuring kube-proxy does not mutate shared EndpointSlices #86016: Ensuring EndpointSlices are not used for Windows kube-proxy #89117

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/kube-proxy/app/server.go
Expand Up @@ -678,7 +678,7 @@ func (s *ProxyServer) Run() error {
serviceConfig.RegisterEventHandler(s.Proxier)
go serviceConfig.Run(wait.NeverStop)

if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
if s.UseEndpointSlices {
endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1beta1().EndpointSlices(), s.ConfigSyncPeriod)
endpointSliceConfig.RegisterEventHandler(s.Proxier)
go endpointSliceConfig.Run(wait.NeverStop)
Expand Down
1 change: 1 addition & 0 deletions cmd/kube-proxy/app/server_others.go
Expand Up @@ -268,6 +268,7 @@ func newProxyServer(
OOMScoreAdj: config.OOMScoreAdj,
ConfigSyncPeriod: config.ConfigSyncPeriod.Duration,
HealthzServer: healthzServer,
UseEndpointSlices: utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice),
}, nil
}

Expand Down
1 change: 1 addition & 0 deletions cmd/kube-proxy/app/server_windows.go
Expand Up @@ -145,6 +145,7 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
OOMScoreAdj: config.OOMScoreAdj,
ConfigSyncPeriod: config.ConfigSyncPeriod.Duration,
HealthzServer: healthzServer,
UseEndpointSlices: false,
}, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/proxy/BUILD
Expand Up @@ -70,6 +70,7 @@ go_test(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
Expand Down
4 changes: 3 additions & 1 deletion pkg/proxy/endpointslicecache.go
Expand Up @@ -108,11 +108,13 @@ func newEndpointSliceTracker() *endpointSliceTracker {
// newEndpointSliceInfo generates endpointSliceInfo from an EndpointSlice.
func newEndpointSliceInfo(endpointSlice *discovery.EndpointSlice, remove bool) *endpointSliceInfo {
esInfo := &endpointSliceInfo{
Ports: endpointSlice.Ports,
Ports: make([]discovery.EndpointPort, len(endpointSlice.Ports)),
Endpoints: []*endpointInfo{},
Remove: remove,
}

// copy here to avoid mutating shared EndpointSlice object.
copy(esInfo.Ports, endpointSlice.Ports)
sort.Sort(byPort(esInfo.Ports))

if !remove {
Expand Down
49 changes: 49 additions & 0 deletions pkg/proxy/endpointslicecache_test.go
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilpointer "k8s.io/utils/pointer"
)
Expand Down Expand Up @@ -152,11 +153,13 @@ func TestEndpointsMapFromESC(t *testing.T) {
t.Run(name, func(t *testing.T) {
esCache := NewEndpointSliceCache(tc.hostname, nil, nil, nil)

cmc := newCacheMutationCheck(tc.endpointSlices)
for _, endpointSlice := range tc.endpointSlices {
esCache.updatePending(endpointSlice, false)
}

compareEndpointsMapsStr(t, esCache.getEndpointsMap(tc.namespacedName, esCache.trackerByServiceMap[tc.namespacedName].pending), tc.expectedMap)
cmc.Check(t)
})
}
}
Expand Down Expand Up @@ -315,6 +318,8 @@ func TestEsInfoChanged(t *testing.T) {

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
cmc := newCacheMutationCheck([]*discovery.EndpointSlice{tc.initialSlice})

if tc.initialSlice != nil {
tc.cache.updatePending(tc.initialSlice, false)
tc.cache.checkoutChanges()
Expand All @@ -331,6 +336,8 @@ func TestEsInfoChanged(t *testing.T) {
if tc.expectChanged != changed {
t.Errorf("Expected esInfoChanged() to return %t, got %t", tc.expectChanged, changed)
}

cmc.Check(t)
})
}
}
Expand Down Expand Up @@ -378,3 +385,45 @@ func generateEndpointSliceWithOffset(serviceName, namespace string, sliceNum, of
func generateEndpointSlice(serviceName, namespace string, sliceNum, numEndpoints, unreadyMod int, hosts []string, portNums []*int32) *discovery.EndpointSlice {
return generateEndpointSliceWithOffset(serviceName, namespace, sliceNum, sliceNum, numEndpoints, unreadyMod, hosts, portNums)
}

// cacheMutationCheck helps ensure that cached objects have not been changed
// in any way throughout a test run.
type cacheMutationCheck struct {
objects []cacheObject
}

// cacheObject stores a reference to an original object as well as a deep copy
// of that object to track any mutations in the original object.
type cacheObject struct {
original runtime.Object
deepCopy runtime.Object
}

// newCacheMutationCheck initializes a cacheMutationCheck with EndpointSlices.
func newCacheMutationCheck(endpointSlices []*discovery.EndpointSlice) cacheMutationCheck {
cmc := cacheMutationCheck{}
for _, endpointSlice := range endpointSlices {
cmc.Add(endpointSlice)
}
return cmc
}

// Add appends a runtime.Object and a deep copy of that object into the
// cacheMutationCheck.
func (cmc *cacheMutationCheck) Add(o runtime.Object) {
cmc.objects = append(cmc.objects, cacheObject{
original: o,
deepCopy: o.DeepCopyObject(),
})
}

// Check verifies that no objects in the cacheMutationCheck have been mutated.
func (cmc *cacheMutationCheck) Check(t *testing.T) {
for _, o := range cmc.objects {
if !reflect.DeepEqual(o.original, o.deepCopy) {
// Cached objects can't be safely mutated and instead should be deep
// copied before changed in any way.
t.Errorf("Cached object was unexpectedly mutated. Original: %+v, Mutated: %+v", o.deepCopy, o.original)
}
}
}