Skip to content

Commit

Permalink
Updating EndpointSlice controller to wait for cache to be updated
Browse files Browse the repository at this point in the history
This updates the EndpointSlice controller to make use of the
EndpointSlice tracker to identify when expected changes are not present
in the cache yet. If this is detected, the controller will wait to sync
until all expected updates have been received. This should help avoid
race conditions that would result in duplicate EndpointSlices or failed
attempts to update stale EndpointSlices. To simplify this logic, this
also moves the EndpointSlice tracker from relying on resource versions
to generations.
  • Loading branch information
robscott committed Mar 11, 2021
1 parent 02b9dd6 commit 95ceaca
Show file tree
Hide file tree
Showing 9 changed files with 473 additions and 182 deletions.
1 change: 1 addition & 0 deletions pkg/controller/endpointslice/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"endpointset.go",
"endpointslice_controller.go",
"endpointslice_tracker.go",
"errors.go",
"reconciler.go",
"utils.go",
],
Expand Down
25 changes: 22 additions & 3 deletions pkg/controller/endpointslice/endpointslice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,10 @@ func (c *Controller) syncService(key string) error {
return err
}

if c.endpointSliceTracker.StaleSlices(service, endpointSlices) {
return &StaleInformerCache{"EndpointSlice informer cache is out of date"}
}

// We call ComputeEndpointLastChangeTriggerTime here to make sure that the
// state of the trigger time tracker gets updated even if the sync turns out
// to be no-op and we don't update the EndpointSlice objects.
Expand Down Expand Up @@ -395,7 +399,7 @@ func (c *Controller) onEndpointSliceAdd(obj interface{}) {
utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceAdd()"))
return
}
if managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice) {
if managedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice) {
c.queueServiceForEndpointSlice(endpointSlice)
}
}
Expand All @@ -411,7 +415,18 @@ func (c *Controller) onEndpointSliceUpdate(prevObj, obj interface{}) {
utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceUpdate()"))
return
}
if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.Stale(endpointSlice)) {
// EndpointSlice generation does not change when labels change. Although the
// controller will never change LabelServiceName, users might. This check
// ensures that we handle changes to this label.
svcName := endpointSlice.Labels[discovery.LabelServiceName]
prevSvcName := prevEndpointSlice.Labels[discovery.LabelServiceName]
if svcName != prevSvcName {
klog.Warningf("%s label changed from %s to %s for %s", discovery.LabelServiceName, prevSvcName, svcName, endpointSlice.Name)
c.queueServiceForEndpointSlice(endpointSlice)
c.queueServiceForEndpointSlice(prevEndpointSlice)
return
}
if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice)) {
c.queueServiceForEndpointSlice(endpointSlice)
}
}
Expand All @@ -422,7 +437,11 @@ func (c *Controller) onEndpointSliceUpdate(prevObj, obj interface{}) {
func (c *Controller) onEndpointSliceDelete(obj interface{}) {
endpointSlice := getEndpointSliceFromDeleteAction(obj)
if endpointSlice != nil && managedByController(endpointSlice) && c.endpointSliceTracker.Has(endpointSlice) {
c.queueServiceForEndpointSlice(endpointSlice)
// This returns false if we didn't expect the EndpointSlice to be
// deleted. If that is the case, we queue the Service for another sync.
if !c.endpointSliceTracker.HandleDeletion(endpointSlice) {
c.queueServiceForEndpointSlice(endpointSlice)
}
}
}

Expand Down
75 changes: 75 additions & 0 deletions pkg/controller/endpointslice/endpointslice_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,81 @@ func TestPodDeleteBatching(t *testing.T) {
}
}

func TestSyncServiceStaleInformer(t *testing.T) {
testcases := []struct {
name string
informerGenerationNumber int64
trackerGenerationNumber int64
expectError bool
}{
{
name: "informer cache outdated",
informerGenerationNumber: 10,
trackerGenerationNumber: 12,
expectError: true,
},
{
name: "cache and tracker synced",
informerGenerationNumber: 10,
trackerGenerationNumber: 10,
expectError: false,
},
{
name: "tracker outdated",
informerGenerationNumber: 10,
trackerGenerationNumber: 1,
expectError: false,
},
}

for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
_, esController := newController([]string{"node-1"}, time.Duration(0))
ns := metav1.NamespaceDefault
serviceName := "testing-1"

// Store Service in the cache
esController.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceName, Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{TargetPort: intstr.FromInt(80)}},
},
})

// Create EndpointSlice in the informer cache with informerGenerationNumber
epSlice1 := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: "matching-1",
Namespace: ns,
Generation: testcase.informerGenerationNumber,
Labels: map[string]string{
discovery.LabelServiceName: serviceName,
discovery.LabelManagedBy: controllerName,
},
},
AddressType: discovery.AddressTypeIPv4,
}
err := esController.endpointSliceStore.Add(epSlice1)
if err != nil {
t.Fatalf("Expected no error adding EndpointSlice: %v", err)
}

// Create EndpointSlice in the tracker with trackerGenerationNumber
epSlice2 := epSlice1.DeepCopy()
epSlice2.Generation = testcase.trackerGenerationNumber
esController.endpointSliceTracker.Update(epSlice2)

err = esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
// Check if we got a StaleInformerCache error
if isStaleInformerCacheErr(err) != testcase.expectError {
t.Fatalf("Expected error because informer cache is outdated")
}

})
}
}

// Test helpers
func addPods(t *testing.T, esController *endpointSliceController, namespace string, podsCount int) {
t.Helper()
Expand Down
130 changes: 91 additions & 39 deletions pkg/controller/endpointslice/endpointslice_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,102 +19,154 @@ package endpointslice
import (
"sync"

"k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
"k8s.io/apimachinery/pkg/types"
)

// endpointSliceResourceVersions tracks expected EndpointSlice resource versions
// by EndpointSlice name.
type endpointSliceResourceVersions map[string]string
const (
deletionExpected = -1
)

// generationsBySlice tracks expected EndpointSlice generations by EndpointSlice
// uid. A value of deletionExpected (-1) may be used here to indicate that we
// expect this EndpointSlice to be deleted.
type generationsBySlice map[types.UID]int64

// endpointSliceTracker tracks EndpointSlices and their associated resource
// versions to help determine if a change to an EndpointSlice has been processed
// by the EndpointSlice controller.
// endpointSliceTracker tracks EndpointSlices and their associated generation to
// help determine if a change to an EndpointSlice has been processed by the
// EndpointSlice controller.
type endpointSliceTracker struct {
// lock protects resourceVersionsByService.
// lock protects generationsByService.
lock sync.Mutex
// resourceVersionsByService tracks the list of EndpointSlices and
// associated resource versions expected for a given Service.
resourceVersionsByService map[types.NamespacedName]endpointSliceResourceVersions
// generationsByService tracks the generations of EndpointSlices for each
// Service.
generationsByService map[types.NamespacedName]generationsBySlice
}

// newEndpointSliceTracker creates and initializes a new endpointSliceTracker.
func newEndpointSliceTracker() *endpointSliceTracker {
return &endpointSliceTracker{
resourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{},
generationsByService: map[types.NamespacedName]generationsBySlice{},
}
}

// Has returns true if the endpointSliceTracker has a resource version for the
// Has returns true if the endpointSliceTracker has a generation for the
// provided EndpointSlice.
func (est *endpointSliceTracker) Has(endpointSlice *discovery.EndpointSlice) bool {
est.lock.Lock()
defer est.lock.Unlock()

rrv, ok := est.relatedResourceVersions(endpointSlice)
gfs, ok := est.generationsForSliceUnsafe(endpointSlice)
if !ok {
return false
}
_, ok = rrv[endpointSlice.Name]
_, ok = gfs[endpointSlice.UID]
return ok
}

// Stale returns true if this endpointSliceTracker does not have a resource
// version for the provided EndpointSlice or it does not match the resource
// version of the provided EndpointSlice.
func (est *endpointSliceTracker) Stale(endpointSlice *discovery.EndpointSlice) bool {
// ShouldSync returns true if this endpointSliceTracker does not have a
// generation for the provided EndpointSlice or it is greater than the
// generation of the tracked EndpointSlice.
func (est *endpointSliceTracker) ShouldSync(endpointSlice *discovery.EndpointSlice) bool {
est.lock.Lock()
defer est.lock.Unlock()

rrv, ok := est.relatedResourceVersions(endpointSlice)
gfs, ok := est.generationsForSliceUnsafe(endpointSlice)
if !ok {
return true
}
return rrv[endpointSlice.Name] != endpointSlice.ResourceVersion
g, ok := gfs[endpointSlice.UID]
return !ok || endpointSlice.Generation > g
}

// StaleSlices returns true if one or more of the provided EndpointSlices
// have older generations than the corresponding tracked ones or if the tracker
// is expecting one or more of the provided EndpointSlices to be deleted.
func (est *endpointSliceTracker) StaleSlices(service *v1.Service, endpointSlices []*discovery.EndpointSlice) bool {
est.lock.Lock()
defer est.lock.Unlock()

nn := types.NamespacedName{Name: service.Name, Namespace: service.Namespace}
gfs, ok := est.generationsByService[nn]
if !ok {
return false
}
for _, endpointSlice := range endpointSlices {
g, ok := gfs[endpointSlice.UID]
if ok && (g == deletionExpected || g > endpointSlice.Generation) {
return true
}
}
return false
}

// Update adds or updates the resource version in this endpointSliceTracker for
// the provided EndpointSlice.
// Update adds or updates the generation in this endpointSliceTracker for the
// provided EndpointSlice.
func (est *endpointSliceTracker) Update(endpointSlice *discovery.EndpointSlice) {
est.lock.Lock()
defer est.lock.Unlock()

rrv, ok := est.relatedResourceVersions(endpointSlice)
gfs, ok := est.generationsForSliceUnsafe(endpointSlice)

if !ok {
rrv = endpointSliceResourceVersions{}
est.resourceVersionsByService[getServiceNN(endpointSlice)] = rrv
gfs = generationsBySlice{}
est.generationsByService[getServiceNN(endpointSlice)] = gfs
}
rrv[endpointSlice.Name] = endpointSlice.ResourceVersion
gfs[endpointSlice.UID] = endpointSlice.Generation
}

// DeleteService removes the set of resource versions tracked for the Service.
// DeleteService removes the set of generations tracked for the Service.
func (est *endpointSliceTracker) DeleteService(namespace, name string) {
est.lock.Lock()
defer est.lock.Unlock()

serviceNN := types.NamespacedName{Name: name, Namespace: namespace}
delete(est.resourceVersionsByService, serviceNN)
delete(est.generationsByService, serviceNN)
}

// Delete removes the resource version in this endpointSliceTracker for the
// provided EndpointSlice.
func (est *endpointSliceTracker) Delete(endpointSlice *discovery.EndpointSlice) {
// ExpectDeletion sets the generation to deletionExpected in this
// endpointSliceTracker for the provided EndpointSlice.
func (est *endpointSliceTracker) ExpectDeletion(endpointSlice *discovery.EndpointSlice) {
est.lock.Lock()
defer est.lock.Unlock()

gfs, ok := est.generationsForSliceUnsafe(endpointSlice)

if !ok {
gfs = generationsBySlice{}
est.generationsByService[getServiceNN(endpointSlice)] = gfs
}
gfs[endpointSlice.UID] = deletionExpected
}

// HandleDeletion removes the generation in this endpointSliceTracker for the
// provided EndpointSlice. This returns true if the tracker expected this
// EndpointSlice to be deleted and false if not.
func (est *endpointSliceTracker) HandleDeletion(endpointSlice *discovery.EndpointSlice) bool {
est.lock.Lock()
defer est.lock.Unlock()

rrv, ok := est.relatedResourceVersions(endpointSlice)
gfs, ok := est.generationsForSliceUnsafe(endpointSlice)

if ok {
delete(rrv, endpointSlice.Name)
g, ok := gfs[endpointSlice.UID]
delete(gfs, endpointSlice.UID)
if ok && g != deletionExpected {
return false
}
}

return true
}

// relatedResourceVersions returns the set of resource versions tracked for the
// Service corresponding to the provided EndpointSlice, and a bool to indicate
// if it exists.
func (est *endpointSliceTracker) relatedResourceVersions(endpointSlice *discovery.EndpointSlice) (endpointSliceResourceVersions, bool) {
// generationsForSliceUnsafe returns the generations for the Service
// corresponding to the provided EndpointSlice, and a bool to indicate if it
// exists. A lock must be applied before calling this function.
func (est *endpointSliceTracker) generationsForSliceUnsafe(endpointSlice *discovery.EndpointSlice) (generationsBySlice, bool) {
serviceNN := getServiceNN(endpointSlice)
vers, ok := est.resourceVersionsByService[serviceNN]
return vers, ok
generations, ok := est.generationsByService[serviceNN]
return generations, ok
}

// getServiceNN returns a namespaced name for the Service corresponding to the
Expand Down

0 comments on commit 95ceaca

Please sign in to comment.