Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address remaining TODOs in kube-proxy. #46201

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
88 changes: 27 additions & 61 deletions pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/golang/glog"
Expand Down Expand Up @@ -237,14 +238,8 @@ func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, prev
change.current = endpointsToEndpointsMap(current, ecm.hostname)
if reflect.DeepEqual(change.previous, change.current) {
delete(ecm.items, *namespacedName)
return false
}
// TODO: Instead of returning true/false, we should consider returning whether
// the map contains some element or not. Currently, if the change is
// "reverting" some previous endpoints update, but there are still some other
// modified endpoints, we will return false, even though there are some change
// to apply.
return true
}
return len(ecm.items) > 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this works. I was thinking of the return as "did this update() cause a change", but this should be behaviorally equivalent at the end.

}

func newServiceChangeMap() serviceChangeMap {
Expand All @@ -266,14 +261,8 @@ func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previo
change.current = serviceToServiceMap(current)
if reflect.DeepEqual(change.previous, change.current) {
delete(scm.items, *namespacedName)
return false
}
// TODO: Instead of returning true/false, we should consider returning whether
// the map contains some element or not. Currently, if the change is
// "reverting" some previous endpoints update, but there are still some other
// modified endpoints, we will return false, even though there are some change
// to apply.
return true
}
return len(scm.items) > 0
}

func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String {
Expand Down Expand Up @@ -340,6 +329,7 @@ type Proxier struct {
// with some partial data after kube-proxy restart.
endpointsSynced bool
servicesSynced bool
initialized int32

throttle flowcontrol.RateLimiter

Expand Down Expand Up @@ -595,48 +585,43 @@ func (proxier *Proxier) SyncLoop() {
}
}

func (proxier *Proxier) setInitialized(value bool) {
var initialized int32
if value {
initialized = 1
}
atomic.StoreInt32(&proxier.initialized, initialized)
}

func (proxier *Proxier) isInitialized() bool {
return atomic.LoadInt32(&proxier.initialized) > 0
}

func (proxier *Proxier) OnServiceAdd(service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if proxier.serviceChanges.update(&namespacedName, nil, service) {
// TODO(wojtek-t): If the initial sync of informer either for endpoints or
// services is not finished, it doesn't make sense to call syncProxyRules
// because it will early-return (to avoid resyncing iptables with partial
// state right after kube-proxy restart). This can eat a token for calling
// syncProxyRules, but is not that critical since it can happen only
// after kube-proxy was (re)started.
if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() {
proxier.syncProxyRules()
}
}

func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if proxier.serviceChanges.update(&namespacedName, oldService, service) {
// TODO(wojtek-t): If the initial sync of informer either for endpoints or
// services is not finished, it doesn't make sense to call syncProxyRules
// because it will early-return (to avoid resyncing iptables with partial
// state right after kube-proxy restart). This can eat a token for calling
// syncProxyRules, but is not that critical since it can happen only
// after kube-proxy was (re)started.
if proxier.serviceChanges.update(&namespacedName, oldService, service) && proxier.isInitialized() {
proxier.syncProxyRules()
}
}

func (proxier *Proxier) OnServiceDelete(service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if proxier.serviceChanges.update(&namespacedName, service, nil) {
// TODO(wojtek-t): If the initial sync of informer either for endpoints or
// services is not finished, it doesn't make sense to call syncProxyRules
// because it will early-return (to avoid resyncing iptables with partial
// state right after kube-proxy restart). This can eat a token for calling
// syncProxyRules, but is not that critical since it can happen only
// after kube-proxy was (re)started.
if proxier.serviceChanges.update(&namespacedName, service, nil) && proxier.isInitialized() {
proxier.syncProxyRules()
}
}

func (proxier *Proxier) OnServiceSynced() {
proxier.mu.Lock()
proxier.servicesSynced = true
proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
proxier.mu.Unlock()
// Call it unconditionally - this is called once per lifetime.
proxier.syncProxyRules()
Expand Down Expand Up @@ -687,46 +672,29 @@ func updateServiceMap(

func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) {
// TODO(wojtek-t): If the initial sync of informer either for endpoints or
// services is not finished, it doesn't make sense to call syncProxyRules
// because it will early-return (to avoid resyncing iptables with partial
// state right after kube-proxy restart). This can eat a token for calling
// syncProxyRules, but is not that critical since it can happen only
// after kube-proxy was (re)started.
if proxier.endpointsChanges.update(&namespacedName, nil, endpoints) && proxier.isInitialized() {
proxier.syncProxyRules()
}
}

func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) {
// TODO(wojtek-t): If the initial sync of informer either for endpoints or
// services is not finished, it doesn't make sense to call syncProxyRules
// because it will early-return (to avoid resyncing iptables with partial
// state right after kube-proxy restart). This can eat a token for calling
// syncProxyRules, but is not that critical since it can happen only
// after kube-proxy was (re)started.
if proxier.endpointsChanges.update(&namespacedName, oldEndpoints, endpoints) && proxier.isInitialized() {
proxier.syncProxyRules()
}
}

func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}
if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) {
// TODO(wojtek-t): If the initial sync of informer either for endpoints or
// services is not finished, it doesn't make sense to call syncProxyRules
// because it will early-return (to avoid resyncing iptables with partial
// state right after kube-proxy restart). This can eat a token for calling
// syncProxyRules, but is not that critical since it can happen only
// after kube-proxy was (re)started.
if proxier.endpointsChanges.update(&namespacedName, endpoints, nil) && proxier.isInitialized() {
proxier.syncProxyRules()
}
}

func (proxier *Proxier) OnEndpointsSynced() {
proxier.mu.Lock()
proxier.endpointsSynced = true
proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
proxier.mu.Unlock()
// Call it unconditionally - this is called once per lifetime.
proxier.syncProxyRules()
Expand Down Expand Up @@ -1530,9 +1498,7 @@ func (proxier *Proxier) syncProxyRules() {
proxier.iptablesData.Write(proxier.natChains.Bytes())
proxier.iptablesData.Write(proxier.natRules.Bytes())

if glog.V(5) {
glog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
}
glog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
glog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, proxier.iptablesData.Bytes())
Expand Down