Skip to content

Commit

Permalink
Updating EndpointSliceMirroring controller to wait for cache to be up…
Browse files Browse the repository at this point in the history
…dated

This matches the recent updates to the EndpointSliceTracker for the
EndpointSlice controller in kubernetes#99345 that accomplished the same thing.
  • Loading branch information
robscott committed Mar 11, 2021
1 parent f2fe40c commit 2cda973
Show file tree
Hide file tree
Showing 7 changed files with 381 additions and 172 deletions.
1 change: 1 addition & 0 deletions pkg/controller/endpointslicemirroring/BUILD
Expand Up @@ -6,6 +6,7 @@ go_library(
"endpointset.go",
"endpointslice_tracker.go",
"endpointslicemirroring_controller.go",
"errors.go",
"events.go",
"reconciler.go",
"reconciler_helpers.go",
Expand Down
130 changes: 91 additions & 39 deletions pkg/controller/endpointslicemirroring/endpointslice_tracker.go
Expand Up @@ -19,102 +19,154 @@ package endpointslicemirroring
import (
"sync"

"k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
"k8s.io/apimachinery/pkg/types"
)

// endpointSliceResourceVersions tracks expected EndpointSlice resource versions
// by EndpointSlice name.
type endpointSliceResourceVersions map[string]string
const (
deletionExpected = -1
)

// generationsBySlice tracks expected EndpointSlice generations by EndpointSlice
// uid. A value of deletionExpected (-1) may be used here to indicate that we
// expect this EndpointSlice to be deleted.
type generationsBySlice map[types.UID]int64

// endpointSliceTracker tracks EndpointSlices and their associated resource
// versions to help determine if a change to an EndpointSlice has been processed
// by the EndpointSlice controller.
// endpointSliceTracker tracks EndpointSlices and their associated generation to
// help determine if a change to an EndpointSlice has been processed by the
// EndpointSlice controller.
type endpointSliceTracker struct {
// lock protects resourceVersionsByService.
// lock protects generationsByService.
lock sync.Mutex
// resourceVersionsByService tracks the list of EndpointSlices and
// associated resource versions expected for a given Service.
resourceVersionsByService map[types.NamespacedName]endpointSliceResourceVersions
// generationsByService tracks the generations of EndpointSlices for each
// Service.
generationsByService map[types.NamespacedName]generationsBySlice
}

// newEndpointSliceTracker creates and initializes a new endpointSliceTracker.
func newEndpointSliceTracker() *endpointSliceTracker {
return &endpointSliceTracker{
resourceVersionsByService: map[types.NamespacedName]endpointSliceResourceVersions{},
generationsByService: map[types.NamespacedName]generationsBySlice{},
}
}

// Has returns true if the endpointSliceTracker has a resource version for the
// Has returns true if the endpointSliceTracker has a generation for the
// provided EndpointSlice.
func (est *endpointSliceTracker) Has(endpointSlice *discovery.EndpointSlice) bool {
est.lock.Lock()
defer est.lock.Unlock()

rrv, ok := est.relatedResourceVersions(endpointSlice)
gfs, ok := est.generationsForSliceUnsafe(endpointSlice)
if !ok {
return false
}
_, ok = rrv[endpointSlice.Name]
_, ok = gfs[endpointSlice.UID]
return ok
}

// Stale returns true if this endpointSliceTracker does not have a resource
// version for the provided EndpointSlice or it does not match the resource
// version of the provided EndpointSlice.
func (est *endpointSliceTracker) Stale(endpointSlice *discovery.EndpointSlice) bool {
// ShouldSync returns true if this endpointSliceTracker does not have a
// generation for the provided EndpointSlice or it is greater than the
// generation of the tracked EndpointSlice.
func (est *endpointSliceTracker) ShouldSync(endpointSlice *discovery.EndpointSlice) bool {
est.lock.Lock()
defer est.lock.Unlock()

rrv, ok := est.relatedResourceVersions(endpointSlice)
gfs, ok := est.generationsForSliceUnsafe(endpointSlice)
if !ok {
return true
}
return rrv[endpointSlice.Name] != endpointSlice.ResourceVersion
g, ok := gfs[endpointSlice.UID]
return !ok || endpointSlice.Generation > g
}

// StaleSlices returns true if one or more of the provided EndpointSlices
// have older generations than the corresponding tracked ones or if the tracker
// is expecting one or more of the provided EndpointSlices to be deleted.
func (est *endpointSliceTracker) StaleSlices(service *v1.Service, endpointSlices []*discovery.EndpointSlice) bool {
est.lock.Lock()
defer est.lock.Unlock()

nn := types.NamespacedName{Name: service.Name, Namespace: service.Namespace}
gfs, ok := est.generationsByService[nn]
if !ok {
return false
}
for _, endpointSlice := range endpointSlices {
g, ok := gfs[endpointSlice.UID]
if ok && (g == deletionExpected || g > endpointSlice.Generation) {
return true
}
}
return false
}

// Update adds or updates the resource version in this endpointSliceTracker for
// the provided EndpointSlice.
// Update adds or updates the generation in this endpointSliceTracker for the
// provided EndpointSlice.
func (est *endpointSliceTracker) Update(endpointSlice *discovery.EndpointSlice) {
est.lock.Lock()
defer est.lock.Unlock()

rrv, ok := est.relatedResourceVersions(endpointSlice)
gfs, ok := est.generationsForSliceUnsafe(endpointSlice)

if !ok {
rrv = endpointSliceResourceVersions{}
est.resourceVersionsByService[getServiceNN(endpointSlice)] = rrv
gfs = generationsBySlice{}
est.generationsByService[getServiceNN(endpointSlice)] = gfs
}
rrv[endpointSlice.Name] = endpointSlice.ResourceVersion
gfs[endpointSlice.UID] = endpointSlice.Generation
}

// DeleteService removes the set of resource versions tracked for the Service.
// DeleteService removes the set of generations tracked for the Service.
func (est *endpointSliceTracker) DeleteService(namespace, name string) {
est.lock.Lock()
defer est.lock.Unlock()

serviceNN := types.NamespacedName{Name: name, Namespace: namespace}
delete(est.resourceVersionsByService, serviceNN)
delete(est.generationsByService, serviceNN)
}

// Delete removes the resource version in this endpointSliceTracker for the
// provided EndpointSlice.
func (est *endpointSliceTracker) Delete(endpointSlice *discovery.EndpointSlice) {
// ExpectDeletion sets the generation to deletionExpected in this
// endpointSliceTracker for the provided EndpointSlice.
func (est *endpointSliceTracker) ExpectDeletion(endpointSlice *discovery.EndpointSlice) {
est.lock.Lock()
defer est.lock.Unlock()

gfs, ok := est.generationsForSliceUnsafe(endpointSlice)

if !ok {
gfs = generationsBySlice{}
est.generationsByService[getServiceNN(endpointSlice)] = gfs
}
gfs[endpointSlice.UID] = deletionExpected
}

// HandleDeletion removes the generation in this endpointSliceTracker for the
// provided EndpointSlice. This returns true if the tracker expected this
// EndpointSlice to be deleted and false if not.
func (est *endpointSliceTracker) HandleDeletion(endpointSlice *discovery.EndpointSlice) bool {
est.lock.Lock()
defer est.lock.Unlock()

rrv, ok := est.relatedResourceVersions(endpointSlice)
gfs, ok := est.generationsForSliceUnsafe(endpointSlice)

if ok {
delete(rrv, endpointSlice.Name)
g, ok := gfs[endpointSlice.UID]
delete(gfs, endpointSlice.UID)
if ok && g != deletionExpected {
return false
}
}

return true
}

// relatedResourceVersions returns the set of resource versions tracked for the
// Service corresponding to the provided EndpointSlice, and a bool to indicate
// if it exists.
func (est *endpointSliceTracker) relatedResourceVersions(endpointSlice *discovery.EndpointSlice) (endpointSliceResourceVersions, bool) {
// generationsForSliceUnsafe returns the generations for the Service
// corresponding to the provided EndpointSlice, and a bool to indicate if it
// exists. A lock must be applied before calling this function.
func (est *endpointSliceTracker) generationsForSliceUnsafe(endpointSlice *discovery.EndpointSlice) (generationsBySlice, bool) {
serviceNN := getServiceNN(endpointSlice)
vers, ok := est.resourceVersionsByService[serviceNN]
return vers, ok
generations, ok := est.generationsByService[serviceNN]
return generations, ok
}

// getServiceNN returns a namespaced name for the Service corresponding to the
Expand Down

0 comments on commit 2cda973

Please sign in to comment.