Skip to content

Commit

Permalink
Merge pull request #122756 from hakman/automated-cherry-pick-of-#1222…
Browse files Browse the repository at this point in the history
…04-upstream-release-1.29

Automated cherry pick of #122204: Fix race condition in iptables partial sync handling
  • Loading branch information
k8s-ci-robot committed Feb 9, 2024
2 parents b39bf52 + 221c1ad commit 97d1f81
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 204 deletions.
54 changes: 24 additions & 30 deletions pkg/proxy/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,16 +205,9 @@ func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.
return changeNeeded
}

// PendingChanges returns a set whose keys are the names of the services whose endpoints
// have changed since the last time ect was used to update an EndpointsMap. (You must call
// this _before_ calling em.Update(ect).)
func (ect *EndpointsChangeTracker) PendingChanges() sets.Set[string] {
return ect.endpointSliceCache.pendingChanges()
}

// checkoutChanges returns a list of pending endpointsChanges and marks them as
// checkoutChanges returns a map of pending endpointsChanges and marks them as
// applied.
func (ect *EndpointsChangeTracker) checkoutChanges() []*endpointsChange {
func (ect *EndpointsChangeTracker) checkoutChanges() map[types.NamespacedName]*endpointsChange {
metrics.EndpointChangesPending.Set(0)

return ect.endpointSliceCache.checkoutChanges()
Expand Down Expand Up @@ -269,6 +262,10 @@ type endpointsChange struct {

// UpdateEndpointsMapResult is the updated results after applying endpoints changes.
type UpdateEndpointsMapResult struct {
// UpdatedServices lists the names of all services with added/updated/deleted
// endpoints since the last Update.
UpdatedServices sets.Set[types.NamespacedName]

// DeletedUDPEndpoints identifies UDP endpoints that have just been deleted.
// Existing conntrack NAT entries pointing to these endpoints must be deleted to
// ensure that no further traffic for the Service gets delivered to them.
Expand All @@ -286,40 +283,37 @@ type UpdateEndpointsMapResult struct {
LastChangeTriggerTimes map[types.NamespacedName][]time.Time
}

// Update updates endpointsMap base on the given changes.
func (em EndpointsMap) Update(changes *EndpointsChangeTracker) (result UpdateEndpointsMapResult) {
result.DeletedUDPEndpoints = make([]ServiceEndpoint, 0)
result.NewlyActiveUDPServices = make([]ServicePortName, 0)
result.LastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)

em.apply(changes, &result.DeletedUDPEndpoints, &result.NewlyActiveUDPServices, &result.LastChangeTriggerTimes)

return result
}

// EndpointsMap maps a service name to a list of all its Endpoints.
type EndpointsMap map[ServicePortName][]Endpoint

// apply the changes to EndpointsMap, update the passed-in stale-conntrack-entry arrays,
// and clear the changes map. In addition it returns (via argument) and resets the
// lastChangeTriggerTimes for all endpoints that were changed and will result in syncing
// the proxy rules. apply triggers processEndpointsMapChange on every change.
func (em EndpointsMap) apply(ect *EndpointsChangeTracker, deletedUDPEndpoints *[]ServiceEndpoint,
newlyActiveUDPServices *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
// Update updates em based on the changes in ect, returns information about the diff since
// the last Update, triggers processEndpointsMapChange on every change, and clears the
// changes map.
func (em EndpointsMap) Update(ect *EndpointsChangeTracker) UpdateEndpointsMapResult {
result := UpdateEndpointsMapResult{
UpdatedServices: sets.New[types.NamespacedName](),
DeletedUDPEndpoints: make([]ServiceEndpoint, 0),
NewlyActiveUDPServices: make([]ServicePortName, 0),
LastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
}
if ect == nil {
return
return result
}

changes := ect.checkoutChanges()
for _, change := range changes {
for nn, change := range changes {
if ect.processEndpointsMapChange != nil {
ect.processEndpointsMapChange(change.previous, change.current)
}
result.UpdatedServices.Insert(nn)

em.unmerge(change.previous)
em.merge(change.current)
detectStaleConntrackEntries(change.previous, change.current, deletedUDPEndpoints, newlyActiveUDPServices)
detectStaleConntrackEntries(change.previous, change.current, &result.DeletedUDPEndpoints, &result.NewlyActiveUDPServices)
}
ect.checkoutTriggerTimes(lastChangeTriggerTimes)
ect.checkoutTriggerTimes(&result.LastChangeTriggerTimes)

return result
}

// Merge ensures that the current EndpointsMap contains all <service, endpoints> pairs from the EndpointsMap passed in.
Expand Down

0 comments on commit 97d1f81

Please sign in to comment.