Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #48402 #48524 upstream release 1.7 #48809

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/kubelet/cm/container_manager_linux_test.go
Expand Up @@ -47,6 +47,14 @@ func (mi *fakeMountInterface) List() ([]mount.MountPoint, error) {
return mi.mountPoints, nil
}

func (mi *fakeMountInterface) IsMountPointMatch(mp mount.MountPoint, dir string) bool {
return (mp.Path == dir)
}

func (mi *fakeMountInterface) IsNotMountPoint(dir string) (bool, error) {
return false, fmt.Errorf("unsupported")
}

func (mi *fakeMountInterface) IsLikelyNotMountPoint(file string) (bool, error) {
return false, fmt.Errorf("unsupported")
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/kubelet/cm/container_manager_unsupported_test.go
Expand Up @@ -40,6 +40,14 @@ func (mi *fakeMountInterface) List() ([]mount.MountPoint, error) {
return mi.mountPoints, nil
}

func (f *fakeMountInterface) IsMountPointMatch(mp mount.MountPoint, dir string) bool {
return (mp.Path == dir)
}

func (f *fakeMountInterface) IsNotMountPoint(dir string) (bool, error) {
return false, fmt.Errorf("unsupported")
}

func (mi *fakeMountInterface) IsLikelyNotMountPoint(file string) (bool, error) {
return false, fmt.Errorf("unsupported")
}
Expand Down
67 changes: 47 additions & 20 deletions pkg/proxy/iptables/proxier.go
Expand Up @@ -250,6 +250,17 @@ type serviceChangeMap struct {
items map[types.NamespacedName]*serviceChange
}

type updateEndpointMapResult struct {
hcEndpoints map[types.NamespacedName]int
staleEndpoints map[endpointServicePair]bool
staleServiceNames map[proxy.ServicePortName]bool
}

type updateServiceMapResult struct {
hcServices map[types.NamespacedName]uint16
staleServices sets.String
}

type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo

Expand Down Expand Up @@ -694,29 +705,29 @@ func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool
// <changes> map is cleared after applying them.
func updateServiceMap(
serviceMap proxyServiceMap,
changes *serviceChangeMap) (hcServices map[types.NamespacedName]uint16, staleServices sets.String) {
staleServices = sets.NewString()
changes *serviceChangeMap) (result updateServiceMapResult) {
result.staleServices = sets.NewString()

func() {
changes.lock.Lock()
defer changes.lock.Unlock()
for _, change := range changes.items {
existingPorts := serviceMap.merge(change.current)
serviceMap.unmerge(change.previous, existingPorts, staleServices)
serviceMap.unmerge(change.previous, existingPorts, result.staleServices)
}
changes.items = make(map[types.NamespacedName]*serviceChange)
}()

// TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to serviceMap.
hcServices = make(map[types.NamespacedName]uint16)
result.hcServices = make(map[types.NamespacedName]uint16)
for svcPortName, info := range serviceMap {
if info.healthCheckNodePort != 0 {
hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort)
result.hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort)
}
}

return hcServices, staleServices
return result
}

func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
Expand Down Expand Up @@ -755,16 +766,17 @@ func (proxier *Proxier) OnEndpointsSynced() {
func updateEndpointsMap(
endpointsMap proxyEndpointsMap,
changes *endpointsChangeMap,
hostname string) (hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {
staleSet = make(map[endpointServicePair]bool)
hostname string) (result updateEndpointMapResult) {
result.staleEndpoints = make(map[endpointServicePair]bool)
result.staleServiceNames = make(map[proxy.ServicePortName]bool)

func() {
changes.lock.Lock()
defer changes.lock.Unlock()
for _, change := range changes.items {
endpointsMap.unmerge(change.previous)
endpointsMap.merge(change.current)
detectStaleConnections(change.previous, change.current, staleSet)
detectStaleConnections(change.previous, change.current, result.staleEndpoints, result.staleServiceNames)
}
changes.items = make(map[types.NamespacedName]*endpointsChange)
}()
Expand All @@ -775,18 +787,17 @@ func updateEndpointsMap(

// TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to endpointsMap.
hcEndpoints = make(map[types.NamespacedName]int)
result.hcEndpoints = make(map[types.NamespacedName]int)
localIPs := getLocalIPs(endpointsMap)
for nsn, ips := range localIPs {
hcEndpoints[nsn] = len(ips)
result.hcEndpoints[nsn] = len(ips)
}

return hcEndpoints, staleSet
return result
}

// <staleEndpoints> are modified by this function with detected stale
// connections.
func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool) {
// <staleEndpoints> and <staleServices> are modified by this function with detected stale connections.
func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool, staleServiceNames map[proxy.ServicePortName]bool) {
for svcPortName, epList := range oldEndpointsMap {
for _, ep := range epList {
stale := true
Expand All @@ -802,6 +813,13 @@ func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap,
}
}
}

for svcPortName, epList := range newEndpointsMap {
// For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service.
if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 {
staleServiceNames[svcPortName] = true
}
}
}

func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String {
Expand Down Expand Up @@ -983,11 +1001,20 @@ func (proxier *Proxier) syncProxyRules() {
// We assume that if this was called, we really want to sync them,
// even if nothing changed in the meantime. In other words, callers are
// responsible for detecting no-op changes and not calling this function.
hcServices, staleServices := updateServiceMap(
serviceUpdateResult := updateServiceMap(
proxier.serviceMap, &proxier.serviceChanges)
hcEndpoints, staleEndpoints := updateEndpointsMap(
endpointUpdateResult := updateEndpointsMap(
proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)

staleServices := serviceUpdateResult.staleServices
// merge stale services gathered from updateEndpointsMap
for svcPortName := range endpointUpdateResult.staleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.protocol == api.ProtocolUDP {
glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.clusterIP.String())
staleServices.Insert(svcInfo.clusterIP.String())
}
}

glog.V(3).Infof("Syncing iptables rules")

// Create and link the kube services chain.
Expand Down Expand Up @@ -1594,17 +1621,17 @@ func (proxier *Proxier) syncProxyRules() {
// Update healthchecks. The endpoints list might include services that are
// not "OnlyLocal", but the services list will not, and the healthChecker
// will just drop those endpoints.
if err := proxier.healthChecker.SyncServices(hcServices); err != nil {
if err := proxier.healthChecker.SyncServices(serviceUpdateResult.hcServices); err != nil {
glog.Errorf("Error syncing healtcheck services: %v", err)
}
if err := proxier.healthChecker.SyncEndpoints(hcEndpoints); err != nil {
if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil {
glog.Errorf("Error syncing healthcheck endoints: %v", err)
}

// Finish housekeeping.
// TODO: these and clearUDPConntrackForPort() could be made more consistent.
utilproxy.DeleteServiceConnections(proxier.exec, staleServices.List())
proxier.deleteEndpointConnections(staleEndpoints)
proxier.deleteEndpointConnections(endpointUpdateResult.staleEndpoints)
}

// Clear UDP conntrack for port or all conntrack entries when port equal zero.
Expand Down