Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing memory leak in EndpointSliceMirroring EndpointSlice tracker #93441

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/controller/endpointslicemirroring/BUILD
Expand Up @@ -64,6 +64,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/rand:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
Expand Down
63 changes: 39 additions & 24 deletions pkg/controller/endpointslicemirroring/endpointslice_tracker.go
Expand Up @@ -45,61 +45,76 @@ func newEndpointSliceTracker() *endpointSliceTracker {
}
}

// has returns true if the endpointSliceTracker has a resource version for the
// Has returns true if the endpointSliceTracker has a resource version for the
// provided EndpointSlice.
func (est *endpointSliceTracker) has(endpointSlice *discovery.EndpointSlice) bool {
func (est *endpointSliceTracker) Has(endpointSlice *discovery.EndpointSlice) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: do these need to be exported prior to be moved into a shared package in #93443?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you're completely right. I was updating this to match the EndpointSlice controller implementation precisely which already included exported functions. Since the end goal was to extract this to a shared package (#93443) it seemed like this would result in less long term conflicts. Especially if this commit ends up applying to earlier versions of Kubernetes than the shared package does. I'd thought this might be easier to backport than extracting the tracker into a shared package.

est.lock.Lock()
defer est.lock.Unlock()

rrv := est.relatedResourceVersions(endpointSlice)
_, ok := rrv[endpointSlice.Name]
rrv, ok := est.relatedResourceVersions(endpointSlice)
if !ok {
return false
}
_, ok = rrv[endpointSlice.Name]
return ok
}

// stale returns true if this endpointSliceTracker does not have a resource
// Stale returns true if this endpointSliceTracker does not have a resource
// version for the provided EndpointSlice or it does not match the resource
// version of the provided EndpointSlice.
func (est *endpointSliceTracker) stale(endpointSlice *discovery.EndpointSlice) bool {
func (est *endpointSliceTracker) Stale(endpointSlice *discovery.EndpointSlice) bool {
est.lock.Lock()
defer est.lock.Unlock()

rrv := est.relatedResourceVersions(endpointSlice)
rrv, ok := est.relatedResourceVersions(endpointSlice)
if !ok {
return true
}
return rrv[endpointSlice.Name] != endpointSlice.ResourceVersion
}

// update adds or updates the resource version in this endpointSliceTracker for
// Update adds or updates the resource version in this endpointSliceTracker for
// the provided EndpointSlice.
func (est *endpointSliceTracker) update(endpointSlice *discovery.EndpointSlice) {
func (est *endpointSliceTracker) Update(endpointSlice *discovery.EndpointSlice) {
est.lock.Lock()
defer est.lock.Unlock()

rrv := est.relatedResourceVersions(endpointSlice)
rrv, ok := est.relatedResourceVersions(endpointSlice)
if !ok {
rrv = endpointSliceResourceVersions{}
est.resourceVersionsByService[getServiceNN(endpointSlice)] = rrv
}
rrv[endpointSlice.Name] = endpointSlice.ResourceVersion
}

// delete removes the resource version in this endpointSliceTracker for the
// DeleteService removes the set of resource versions tracked for the Service.
func (est *endpointSliceTracker) DeleteService(namespace, name string) {
est.lock.Lock()
defer est.lock.Unlock()

serviceNN := types.NamespacedName{Name: name, Namespace: namespace}
delete(est.resourceVersionsByService, serviceNN)
}

// Delete removes the resource version in this endpointSliceTracker for the
// provided EndpointSlice.
func (est *endpointSliceTracker) delete(endpointSlice *discovery.EndpointSlice) {
func (est *endpointSliceTracker) Delete(endpointSlice *discovery.EndpointSlice) {
est.lock.Lock()
defer est.lock.Unlock()

rrv := est.relatedResourceVersions(endpointSlice)
delete(rrv, endpointSlice.Name)
rrv, ok := est.relatedResourceVersions(endpointSlice)
if ok {
delete(rrv, endpointSlice.Name)
}
}

// relatedResourceVersions returns the set of resource versions tracked for the
// Service corresponding to the provided EndpointSlice. If no resource versions
// are currently tracked for this service, an empty set is initialized.
func (est *endpointSliceTracker) relatedResourceVersions(endpointSlice *discovery.EndpointSlice) endpointSliceResourceVersions {
// Service corresponding to the provided EndpointSlice, and a bool to indicate
// if it exists.
func (est *endpointSliceTracker) relatedResourceVersions(endpointSlice *discovery.EndpointSlice) (endpointSliceResourceVersions, bool) {
serviceNN := getServiceNN(endpointSlice)
vers, ok := est.resourceVersionsByService[serviceNN]

if !ok {
vers = endpointSliceResourceVersions{}
est.resourceVersionsByService[serviceNN] = vers
}

return vers
return vers, ok
}

// getServiceNN returns a namespaced name for the Service corresponding to the
Expand Down
121 changes: 106 additions & 15 deletions pkg/controller/endpointslicemirroring/endpointslice_tracker_test.go
Expand Up @@ -19,8 +19,11 @@ package endpointslicemirroring
import (
"testing"

"github.com/stretchr/testify/assert"

discovery "k8s.io/api/discovery/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

func TestEndpointSliceTrackerUpdate(t *testing.T) {
Expand All @@ -43,47 +46,69 @@ func TestEndpointSliceTrackerUpdate(t *testing.T) {
epSlice1DifferentRV.ResourceVersion = "rv2"

testCases := map[string]struct {
updateParam *discovery.EndpointSlice
checksParam *discovery.EndpointSlice
expectHas bool
expectStale bool
updateParam *discovery.EndpointSlice
checksParam *discovery.EndpointSlice
expectHas bool
expectStale bool
expectResourceVersionsByService map[types.NamespacedName]endpointSliceResourceVersions
}{
"same slice": {
updateParam: epSlice1,
checksParam: epSlice1,
expectHas: true,
expectStale: false,
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
epSlice1.Name: epSlice1.ResourceVersion,
},
},
},
"different namespace": {
updateParam: epSlice1,
checksParam: epSlice1DifferentNS,
expectHas: false,
expectStale: true,
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
epSlice1.Name: epSlice1.ResourceVersion,
},
},
},
"different service": {
updateParam: epSlice1,
checksParam: epSlice1DifferentService,
expectHas: false,
expectStale: true,
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
epSlice1.Name: epSlice1.ResourceVersion,
},
},
},
"different resource version": {
updateParam: epSlice1,
checksParam: epSlice1DifferentRV,
expectHas: true,
expectStale: true,
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
epSlice1.Name: epSlice1.ResourceVersion,
},
},
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
esTracker := newEndpointSliceTracker()
esTracker.update(tc.updateParam)
if esTracker.has(tc.checksParam) != tc.expectHas {
t.Errorf("tc.tracker.has(%+v) == %t, expected %t", tc.checksParam, esTracker.has(tc.checksParam), tc.expectHas)
esTracker.Update(tc.updateParam)
if esTracker.Has(tc.checksParam) != tc.expectHas {
t.Errorf("tc.tracker.Has(%+v) == %t, expected %t", tc.checksParam, esTracker.Has(tc.checksParam), tc.expectHas)
}
if esTracker.stale(tc.checksParam) != tc.expectStale {
t.Errorf("tc.tracker.stale(%+v) == %t, expected %t", tc.checksParam, esTracker.stale(tc.checksParam), tc.expectStale)
if esTracker.Stale(tc.checksParam) != tc.expectStale {
t.Errorf("tc.tracker.Stale(%+v) == %t, expected %t", tc.checksParam, esTracker.Stale(tc.checksParam), tc.expectStale)
}
assert.Equal(t, tc.expectResourceVersionsByService, esTracker.resourceVersionsByService)
})
}
}
Expand Down Expand Up @@ -160,15 +185,81 @@ func TestEndpointSliceTrackerDelete(t *testing.T) {
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
esTracker := newEndpointSliceTracker()
esTracker.update(epSlice1)
esTracker.Update(epSlice1)

esTracker.delete(tc.deleteParam)
if esTracker.has(tc.checksParam) != tc.expectHas {
t.Errorf("esTracker.has(%+v) == %t, expected %t", tc.checksParam, esTracker.has(tc.checksParam), tc.expectHas)
esTracker.Delete(tc.deleteParam)
if esTracker.Has(tc.checksParam) != tc.expectHas {
t.Errorf("esTracker.Has(%+v) == %t, expected %t", tc.checksParam, esTracker.Has(tc.checksParam), tc.expectHas)
}
if esTracker.Stale(tc.checksParam) != tc.expectStale {
t.Errorf("esTracker.Stale(%+v) == %t, expected %t", tc.checksParam, esTracker.Stale(tc.checksParam), tc.expectStale)
}
})
}
}

func TestEndpointSliceTrackerDeleteService(t *testing.T) {
svcName1, svcNS1 := "svc1", "ns1"
svcName2, svcNS2 := "svc2", "ns2"
epSlice1 := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: "example-1",
Namespace: svcNS1,
ResourceVersion: "rv1",
Labels: map[string]string{discovery.LabelServiceName: svcName1},
},
}

testCases := map[string]struct {
updateParam *discovery.EndpointSlice
deleteServiceParam *types.NamespacedName
expectHas bool
expectStale bool
expectResourceVersionsByService map[types.NamespacedName]endpointSliceResourceVersions
}{
"same service": {
updateParam: epSlice1,
deleteServiceParam: &types.NamespacedName{Namespace: svcNS1, Name: svcName1},
expectHas: false,
expectStale: true,
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{},
},
"different namespace": {
updateParam: epSlice1,
deleteServiceParam: &types.NamespacedName{Namespace: svcNS2, Name: svcName1},
expectHas: true,
expectStale: false,
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
epSlice1.Name: epSlice1.ResourceVersion,
},
},
},
"different service": {
updateParam: epSlice1,
deleteServiceParam: &types.NamespacedName{Namespace: svcNS1, Name: svcName2},
expectHas: true,
expectStale: false,
expectResourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{
{Namespace: epSlice1.Namespace, Name: "svc1"}: {
epSlice1.Name: epSlice1.ResourceVersion,
},
},
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
esTracker := newEndpointSliceTracker()
esTracker.Update(tc.updateParam)
esTracker.DeleteService(tc.deleteServiceParam.Namespace, tc.deleteServiceParam.Name)
if esTracker.Has(tc.updateParam) != tc.expectHas {
t.Errorf("tc.tracker.Has(%+v) == %t, expected %t", tc.updateParam, esTracker.Has(tc.updateParam), tc.expectHas)
}
if esTracker.stale(tc.checksParam) != tc.expectStale {
t.Errorf("esTracker.stale(%+v) == %t, expected %t", tc.checksParam, esTracker.stale(tc.checksParam), tc.expectStale)
if esTracker.Stale(tc.updateParam) != tc.expectStale {
t.Errorf("tc.tracker.Stale(%+v) == %t, expected %t", tc.updateParam, esTracker.Stale(tc.updateParam), tc.expectStale)
}
assert.Equal(t, tc.expectResourceVersionsByService, esTracker.resourceVersionsByService)
})
}
}
Expand Up @@ -285,6 +285,7 @@ func (c *Controller) syncEndpoints(key string) error {
endpoints, err := c.endpointsLister.Endpoints(namespace).Get(name)
if err != nil || !c.shouldMirror(endpoints) {
if apierrors.IsNotFound(err) || !c.shouldMirror(endpoints) {
c.endpointSliceTracker.DeleteService(namespace, name)
return c.reconciler.deleteEndpoints(namespace, name, endpointSlices)
}
return err
Expand Down Expand Up @@ -389,7 +390,7 @@ func (c *Controller) onEndpointSliceAdd(obj interface{}) {
utilruntime.HandleError(fmt.Errorf("onEndpointSliceAdd() expected type discovery.EndpointSlice, got %T", obj))
return
}
if managedByController(endpointSlice) && c.endpointSliceTracker.stale(endpointSlice) {
if managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice) {
c.queueEndpointsForEndpointSlice(endpointSlice)
}
}
Expand All @@ -405,7 +406,7 @@ func (c *Controller) onEndpointSliceUpdate(prevObj, obj interface{}) {
utilruntime.HandleError(fmt.Errorf("onEndpointSliceUpdated() expected type discovery.EndpointSlice, got %T, %T", prevObj, obj))
return
}
if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.stale(endpointSlice)) {
if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice)) {
c.queueEndpointsForEndpointSlice(endpointSlice)
}
}
Expand All @@ -419,7 +420,7 @@ func (c *Controller) onEndpointSliceDelete(obj interface{}) {
utilruntime.HandleError(fmt.Errorf("onEndpointSliceDelete() expected type discovery.EndpointSlice, got %T", obj))
return
}
if managedByController(endpointSlice) && c.endpointSliceTracker.has(endpointSlice) {
if managedByController(endpointSlice) && c.endpointSliceTracker.Has(endpointSlice) {
c.queueEndpointsForEndpointSlice(endpointSlice)
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/endpointslicemirroring/reconciler.go
Expand Up @@ -246,7 +246,7 @@ func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction
}
errs = append(errs, fmt.Errorf("Error creating EndpointSlice for Endpoints %s/%s: %v", endpoints.Namespace, endpoints.Name, err))
} else {
r.endpointSliceTracker.update(createdSlice)
r.endpointSliceTracker.Update(createdSlice)
metrics.EndpointSliceChanges.WithLabelValues("create").Inc()
}
}
Expand All @@ -257,7 +257,7 @@ func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction
if err != nil {
errs = append(errs, fmt.Errorf("Error updating %s EndpointSlice for Endpoints %s/%s: %v", endpointSlice.Name, endpoints.Namespace, endpoints.Name, err))
} else {
r.endpointSliceTracker.update(updatedSlice)
r.endpointSliceTracker.Update(updatedSlice)
metrics.EndpointSliceChanges.WithLabelValues("update").Inc()
}
}
Expand All @@ -267,7 +267,7 @@ func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction
if err != nil {
errs = append(errs, fmt.Errorf("Error deleting %s EndpointSlice for Endpoints %s/%s: %v", endpointSlice.Name, endpoints.Namespace, endpoints.Name, err))
} else {
r.endpointSliceTracker.delete(endpointSlice)
r.endpointSliceTracker.Delete(endpointSlice)
metrics.EndpointSliceChanges.WithLabelValues("delete").Inc()
}
}
Expand Down