Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare for periodic runner #46080

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
86 changes: 33 additions & 53 deletions pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ type endpointsChange struct {
}

type endpointsChangeMap struct {
sync.Mutex
lock sync.Mutex
hostname string
items map[types.NamespacedName]*endpointsChange
}
Expand All @@ -210,7 +210,7 @@ type serviceChange struct {
}

type serviceChangeMap struct {
sync.Mutex
lock sync.Mutex
items map[types.NamespacedName]*serviceChange
}

Expand All @@ -225,8 +225,8 @@ func newEndpointsChangeMap(hostname string) endpointsChangeMap {
}

func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) bool {
ecm.Lock()
defer ecm.Unlock()
ecm.lock.Lock()
defer ecm.lock.Unlock()

change, exists := ecm.items[*namespacedName]
if !exists {
Expand Down Expand Up @@ -254,8 +254,8 @@ func newServiceChangeMap() serviceChangeMap {
}

func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) bool {
scm.Lock()
defer scm.Unlock()
scm.lock.Lock()
defer scm.lock.Unlock()

change, exists := scm.items[*namespacedName]
if !exists {
Expand Down Expand Up @@ -577,7 +577,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {

// Sync is called to immediately synchronize the proxier state to iptables
func (proxier *Proxier) Sync() {
proxier.syncProxyRules(syncReasonForce)
proxier.syncProxyRules()
}

// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
Expand All @@ -604,7 +604,7 @@ func (proxier *Proxier) OnServiceAdd(service *api.Service) {
// state right after kube-proxy restart). This can eat a token for calling
// syncProxyRules, but is not that critical since it can happen only
// after kube-proxy was (re)started.
proxier.syncProxyRules(syncReasonServices)
proxier.syncProxyRules()
}
}

Expand All @@ -617,7 +617,7 @@ func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
// state right after kube-proxy restart). This can eat a token for calling
// syncProxyRules, but is not that critical since it can happen only
// after kube-proxy was (re)started.
proxier.syncProxyRules(syncReasonServices)
proxier.syncProxyRules()
}
}

Expand All @@ -630,7 +630,7 @@ func (proxier *Proxier) OnServiceDelete(service *api.Service) {
// state right after kube-proxy restart). This can eat a token for calling
// syncProxyRules, but is not that critical since it can happen only
// after kube-proxy was (re)started.
proxier.syncProxyRules(syncReasonServices)
proxier.syncProxyRules()
}
}

Expand All @@ -639,7 +639,7 @@ func (proxier *Proxier) OnServiceSynced() {
proxier.servicesSynced = true
proxier.mu.Unlock()
// Call it unconditionally - this is called once per lifetime.
proxier.syncProxyRules(syncReasonServices)
proxier.syncProxyRules()
}

func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool {
Expand All @@ -660,17 +660,15 @@ func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool
// <changes> map is cleared after applying them.
func updateServiceMap(
serviceMap proxyServiceMap,
changes *serviceChangeMap) (syncRequired bool, hcServices map[types.NamespacedName]uint16, staleServices sets.String) {
syncRequired = false
changes *serviceChangeMap) (hcServices map[types.NamespacedName]uint16, staleServices sets.String) {
staleServices = sets.NewString()

func() {
changes.Lock()
defer changes.Unlock()
changes.lock.Lock()
defer changes.lock.Unlock()
for _, change := range changes.items {
existingPorts := serviceMap.merge(change.current)
serviceMap.unmerge(change.previous, existingPorts, staleServices)
syncRequired = true
}
changes.items = make(map[types.NamespacedName]*serviceChange)
}()
Expand All @@ -684,7 +682,7 @@ func updateServiceMap(
}
}

return syncRequired, hcServices, staleServices
return hcServices, staleServices
}

func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
Expand All @@ -696,7 +694,7 @@ func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
// state right after kube-proxy restart). This can eat a token for calling
// syncProxyRules, but is not that critical since it can happen only
// after kube-proxy was (re)started.
proxier.syncProxyRules(syncReasonEndpoints)
proxier.syncProxyRules()
}
}

Expand All @@ -709,7 +707,7 @@ func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints
// state right after kube-proxy restart). This can eat a token for calling
// syncProxyRules, but is not that critical since it can happen only
// after kube-proxy was (re)started.
proxier.syncProxyRules(syncReasonEndpoints)
proxier.syncProxyRules()
}
}

Expand All @@ -722,7 +720,7 @@ func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
// state right after kube-proxy restart). This can eat a token for calling
// syncProxyRules, but is not that critical since it can happen only
// after kube-proxy was (re)started.
proxier.syncProxyRules(syncReasonEndpoints)
proxier.syncProxyRules()
}
}

Expand All @@ -731,26 +729,24 @@ func (proxier *Proxier) OnEndpointsSynced() {
proxier.endpointsSynced = true
proxier.mu.Unlock()
// Call it unconditionally - this is called once per lifetime.
proxier.syncProxyRules(syncReasonEndpoints)
proxier.syncProxyRules()
}

// <endpointsMap> is updated by this function (based on the given changes).
// <changes> map is cleared after applying them.
func updateEndpointsMap(
endpointsMap proxyEndpointsMap,
changes *endpointsChangeMap,
hostname string) (syncRequired bool, hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {
syncRequired = false
hostname string) (hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {
staleSet = make(map[endpointServicePair]bool)

func() {
changes.Lock()
defer changes.Unlock()
changes.lock.Lock()
defer changes.lock.Unlock()
for _, change := range changes.items {
endpointsMap.unmerge(change.previous)
endpointsMap.merge(change.current)
detectStaleConnections(change.previous, change.current, staleSet)
syncRequired = true
}
changes.items = make(map[types.NamespacedName]*endpointsChange)
}()
Expand All @@ -767,7 +763,7 @@ func updateEndpointsMap(
hcEndpoints[nsn] = len(ips)
}

return syncRequired, hcEndpoints, staleSet
return hcEndpoints, staleSet
}

// <staleEndpoints> are modified by this function with detected stale
Expand Down Expand Up @@ -942,16 +938,10 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ
}
}

type syncReason string

const syncReasonServices syncReason = "ServicesUpdate"
const syncReasonEndpoints syncReason = "EndpointsUpdate"
const syncReasonForce syncReason = "Force"

// This is where all of the iptables-save/restore calls happen.
// The only other iptables rules are those that are setup in iptablesInit()
// assumes proxier.mu is held
func (proxier *Proxier) syncProxyRules(reason syncReason) {
func (proxier *Proxier) syncProxyRules() {
proxier.mu.Lock()
defer proxier.mu.Unlock()

Expand All @@ -961,33 +951,23 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
start := time.Now()
defer func() {
SyncProxyRulesLatency.Observe(sinceInMicroseconds(start))
glog.V(4).Infof("syncProxyRules(%s) took %v", reason, time.Since(start))
glog.V(4).Infof("syncProxyRules took %v", time.Since(start))
}()
// don't sync rules till we've received services and endpoints
if !proxier.endpointsSynced || !proxier.servicesSynced {
glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
return
}

// Figure out the new services we need to activate.
serviceSyncRequired, hcServices, staleServices := updateServiceMap(
// We assume that if syncProxyRules was called, we really want to sync them,
// even if nothing changed in the meantime. In other words, caller are
// responsible for detecting no-op changes and not calling syncProxyRules in
// such cases.
hcServices, staleServices := updateServiceMap(
proxier.serviceMap, &proxier.serviceChanges)

// If this was called because of a services update, but nothing actionable has changed, skip it.
if reason == syncReasonServices && !serviceSyncRequired {
glog.V(3).Infof("Skipping iptables sync because nothing changed")
return
}

endpointsSyncRequired, hcEndpoints, staleEndpoints := updateEndpointsMap(
hcEndpoints, staleEndpoints := updateEndpointsMap(
proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)

// If this was called because of an endpoints update, but nothing actionable has changed, skip it.
if reason == syncReasonEndpoints && !endpointsSyncRequired {
glog.V(3).Infof("Skipping iptables sync because nothing changed")
return
}

glog.V(3).Infof("Syncing iptables rules")

// Create and link the kube services chain.
Expand Down Expand Up @@ -1569,8 +1549,8 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
}
proxier.portsMap = replacementPortsMap

// Update healthz timestamp if it is periodic sync.
if proxier.healthzServer != nil && reason == syncReasonForce {
// Update healthz timestamp.
if proxier.healthzServer != nil {
proxier.healthzServer.UpdateTimestamp()
}

Expand Down