Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove reasons from iptables syncProxyRules #45723

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
64 changes: 25 additions & 39 deletions pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ type endpointsChange struct {
}

type endpointsChangeMap struct {
sync.Mutex
lock sync.Mutex
items map[types.NamespacedName]*endpointsChange
}

Expand All @@ -209,7 +209,7 @@ type serviceChange struct {
}

type serviceChangeMap struct {
sync.Mutex
lock sync.Mutex
items map[types.NamespacedName]*serviceChange
}

Expand All @@ -223,8 +223,8 @@ func newEndpointsChangeMap() endpointsChangeMap {
}

func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Endpoints) {
ecm.Lock()
defer ecm.Unlock()
ecm.lock.Lock()
defer ecm.lock.Unlock()

change, exists := ecm.items[*namespacedName]
if !exists {
Expand All @@ -242,8 +242,8 @@ func newServiceChangeMap() serviceChangeMap {
}

func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *api.Service) {
scm.Lock()
defer scm.Unlock()
scm.lock.Lock()
defer scm.lock.Unlock()

change, exists := scm.items[*namespacedName]
if !exists {
Expand Down Expand Up @@ -509,7 +509,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {

// Sync is called to immediately synchronize the proxier state to iptables
func (proxier *Proxier) Sync() {
proxier.syncProxyRules(syncReasonForce)
proxier.syncProxyRules()
}

// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
Expand All @@ -531,29 +531,29 @@ func (proxier *Proxier) OnServiceAdd(service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
proxier.serviceChanges.update(&namespacedName, nil, service)

proxier.syncProxyRules(syncReasonServices)
proxier.syncProxyRules()
}

func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
proxier.serviceChanges.update(&namespacedName, oldService, service)

proxier.syncProxyRules(syncReasonServices)
proxier.syncProxyRules()
}

func (proxier *Proxier) OnServiceDelete(service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
proxier.serviceChanges.update(&namespacedName, service, nil)

proxier.syncProxyRules(syncReasonServices)
proxier.syncProxyRules()
}

func (proxier *Proxier) OnServiceSynced() {
proxier.mu.Lock()
proxier.servicesSynced = true
proxier.mu.Unlock()

proxier.syncProxyRules(syncReasonServices)
proxier.syncProxyRules()
}

func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool {
Expand Down Expand Up @@ -587,7 +587,7 @@ func (sm *proxyServiceMap) mergeService(service *api.Service) (bool, sets.String
info := newServiceInfo(serviceName, servicePort, service)
oldInfo, exists := (*sm)[serviceName]
equal := reflect.DeepEqual(info, oldInfo)
if exists {
if !exists {
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
} else if !equal {
glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol)
Expand Down Expand Up @@ -662,29 +662,29 @@ func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
proxier.endpointsChanges.update(&namespacedName, nil, endpoints)

proxier.syncProxyRules(syncReasonEndpoints)
proxier.syncProxyRules()
}

func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints)

proxier.syncProxyRules(syncReasonEndpoints)
proxier.syncProxyRules()
}

func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
proxier.endpointsChanges.update(&namespacedName, endpoints, nil)

proxier.syncProxyRules(syncReasonEndpoints)
proxier.syncProxyRules()
}

func (proxier *Proxier) OnEndpointsSynced() {
proxier.mu.Lock()
proxier.endpointsSynced = true
proxier.mu.Unlock()

proxier.syncProxyRules(syncReasonEndpoints)
proxier.syncProxyRules()
}

// <endpointsMap> is updated by this function (based on the given changes).
Expand Down Expand Up @@ -873,16 +873,10 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ
}
}

type syncReason string

const syncReasonServices syncReason = "ServicesUpdate"
const syncReasonEndpoints syncReason = "EndpointsUpdate"
const syncReasonForce syncReason = "Force"

// This is where all of the iptables-save/restore calls happen.
// The only other iptables rules are those that are setup in iptablesInit()
// assumes proxier.mu is held
func (proxier *Proxier) syncProxyRules(reason syncReason) {
func (proxier *Proxier) syncProxyRules() {
proxier.mu.Lock()
defer proxier.mu.Unlock()

Expand All @@ -891,7 +885,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
}
start := time.Now()
defer func() {
glog.V(4).Infof("syncProxyRules(%s) took %v", reason, time.Since(start))
glog.V(4).Infof("syncProxyRules took %v", time.Since(start))
}()
// don't sync rules till we've received services and endpoints
if !proxier.endpointsSynced || !proxier.servicesSynced {
Expand All @@ -900,28 +894,20 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
}

// Figure out the new services we need to activate.
proxier.serviceChanges.Lock()
proxier.serviceChanges.lock.Lock()
serviceSyncRequired, hcServices, staleServices := updateServiceMap(
proxier.serviceMap, &proxier.serviceChanges)
proxier.serviceChanges.Unlock()
proxier.serviceChanges.lock.Unlock()

// If this was called because of a services update, but nothing actionable has changed, skip it.
if reason == syncReasonServices && !serviceSyncRequired {
glog.V(3).Infof("Skipping iptables sync because nothing changed")
return
}

proxier.endpointsChanges.Lock()
proxier.endpointsChanges.lock.Lock()
endpointsSyncRequired, hcEndpoints, staleEndpoints := updateEndpointsMap(
proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)
proxier.endpointsChanges.Unlock()
proxier.endpointsChanges.lock.Unlock()

// If this was called because of an endpoints update, but nothing actionable has changed, skip it.
if reason == syncReasonEndpoints && !endpointsSyncRequired {
if !serviceSyncRequired && !endpointsSyncRequired {
glog.V(3).Infof("Skipping iptables sync because nothing changed")
return
}

glog.V(3).Infof("Syncing iptables rules")

// Create and link the kube services chain.
Expand Down Expand Up @@ -1495,8 +1481,8 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
}
proxier.portsMap = replacementPortsMap

// Update healthz timestamp if it is periodic sync.
if proxier.healthzServer != nil && reason == syncReasonForce {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We check sync reason here to make sure we only update healthz timestamp in periodic sync. Deleting it means we now update timestamp on every sync, which may increase lock contention with the healthz server. Would it increase the risk that DDOSing the healthz server will halt syncProxyRules thread?

cc @nicksardo

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I think it should be fine, doesn't seem like Load() will block Store():

// Load returns the value set by the most recent Store.

// Update healthz timestamp.
if proxier.healthzServer != nil {
proxier.healthzServer.UpdateTimestamp()
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/proxy/iptables/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ func TestClusterIPReject(t *testing.T) {
}),
)
makeEndpointsMap(fp)
fp.syncProxyRules(syncReasonForce)
fp.syncProxyRules()

svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(api.ProtocolTCP))))
svcRules := ipt.GetRules(svcChain)
Expand Down Expand Up @@ -628,7 +628,7 @@ func TestClusterIPEndpointsJump(t *testing.T) {
}),
)

fp.syncProxyRules(syncReasonForce)
fp.syncProxyRules()

epStr := fmt.Sprintf("%s:%d", epIP, svcPort)
svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(api.ProtocolTCP))))
Expand Down Expand Up @@ -692,7 +692,7 @@ func TestLoadBalancer(t *testing.T) {
}),
)

fp.syncProxyRules(syncReasonForce)
fp.syncProxyRules()

proto := strings.ToLower(string(api.ProtocolTCP))
fwChain := string(serviceFirewallChainName(svcPortName.String(), proto))
Expand Down Expand Up @@ -749,7 +749,7 @@ func TestNodePort(t *testing.T) {
}),
)

fp.syncProxyRules(syncReasonForce)
fp.syncProxyRules()

proto := strings.ToLower(string(api.ProtocolTCP))
svcChain := string(servicePortChainName(svcPortName.String(), proto))
Expand Down Expand Up @@ -786,7 +786,7 @@ func TestExternalIPsReject(t *testing.T) {
)
makeEndpointsMap(fp)

fp.syncProxyRules(syncReasonForce)
fp.syncProxyRules()

kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
if !hasJump(kubeSvcRules, iptablestest.Reject, svcExternalIPs, svcPort) {
Expand Down Expand Up @@ -819,7 +819,7 @@ func TestNodePortReject(t *testing.T) {
)
makeEndpointsMap(fp)

fp.syncProxyRules(syncReasonForce)
fp.syncProxyRules()

kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP, svcNodePort) {
Expand Down Expand Up @@ -882,7 +882,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
}),
)

fp.syncProxyRules(syncReasonForce)
fp.syncProxyRules()

proto := strings.ToLower(string(api.ProtocolTCP))
fwChain := string(serviceFirewallChainName(svcPortName.String(), proto))
Expand Down Expand Up @@ -973,7 +973,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable
}),
)

fp.syncProxyRules(syncReasonForce)
fp.syncProxyRules()

proto := strings.ToLower(string(api.ProtocolTCP))
lbChain := string(serviceLBChainName(svcPortName.String(), proto))
Expand Down