Skip to content

Commit

Permalink
Drop PendingChanges methods from change trackers, move into UpdateRes…
Browse files Browse the repository at this point in the history
…ults

This fixes a race condition where the tracker could be updated in
between us calling .PendingChanges() and .Update().
  • Loading branch information
danwinship authored and hakman committed Jan 13, 2024
1 parent 2887762 commit 221c1ad
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 166 deletions.
20 changes: 10 additions & 10 deletions pkg/proxy/endpoints.go
Expand Up @@ -205,16 +205,9 @@ func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.
return changeNeeded
}

// PendingChanges returns a set whose keys are the names of the services whose endpoints
// have changed since the last time ect was used to update an EndpointsMap. (You must call
// this _before_ calling em.Update(ect).)
func (ect *EndpointsChangeTracker) PendingChanges() sets.Set[string] {
return ect.endpointSliceCache.pendingChanges()
}

// checkoutChanges returns a list of pending endpointsChanges and marks them as
// checkoutChanges returns a map of pending endpointsChanges and marks them as
// applied.
func (ect *EndpointsChangeTracker) checkoutChanges() []*endpointsChange {
func (ect *EndpointsChangeTracker) checkoutChanges() map[types.NamespacedName]*endpointsChange {
metrics.EndpointChangesPending.Set(0)

return ect.endpointSliceCache.checkoutChanges()
Expand Down Expand Up @@ -269,6 +262,10 @@ type endpointsChange struct {

// UpdateEndpointsMapResult is the updated results after applying endpoints changes.
type UpdateEndpointsMapResult struct {
// UpdatedServices lists the names of all services with added/updated/deleted
// endpoints since the last Update.
UpdatedServices sets.Set[types.NamespacedName]

// DeletedUDPEndpoints identifies UDP endpoints that have just been deleted.
// Existing conntrack NAT entries pointing to these endpoints must be deleted to
// ensure that no further traffic for the Service gets delivered to them.
Expand All @@ -294,6 +291,7 @@ type EndpointsMap map[ServicePortName][]Endpoint
// changes map.
func (em EndpointsMap) Update(ect *EndpointsChangeTracker) UpdateEndpointsMapResult {
result := UpdateEndpointsMapResult{
UpdatedServices: sets.New[types.NamespacedName](),
DeletedUDPEndpoints: make([]ServiceEndpoint, 0),
NewlyActiveUDPServices: make([]ServicePortName, 0),
LastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
Expand All @@ -303,10 +301,12 @@ func (em EndpointsMap) Update(ect *EndpointsChangeTracker) UpdateEndpointsMapRes
}

changes := ect.checkoutChanges()
for _, change := range changes {
for nn, change := range changes {
if ect.processEndpointsMapChange != nil {
ect.processEndpointsMapChange(change.previous, change.current)
}
result.UpdatedServices.Insert(nn)

em.unmerge(change.previous)
em.merge(change.current)
detectStaleConntrackEntries(change.previous, change.current, &result.DeletedUDPEndpoints, &result.NewlyActiveUDPServices)
Expand Down

0 comments on commit 221c1ad

Please sign in to comment.