Skip to content

Commit

Permalink
Merge pull request #48809 from wojtek-t/automated-cherry-pick-of-#48402-
Browse files Browse the repository at this point in the history
#48524-upstream-release-1.7

Automatic merge from submit-queue

Automated cherry pick of #48402 #48524 upstream release 1.7

Cherry pick of #48524 and #48402 on release-1.7.

#48524 : fix udp service blackhole problem when number of backends changes from 0 to non-0
#48402 : Local storage teardown fix
  • Loading branch information
Kubernetes Submit Queue committed Jul 18, 2017
2 parents e803c55 + 1c75818 commit e76b450
Show file tree
Hide file tree
Showing 13 changed files with 324 additions and 118 deletions.
8 changes: 8 additions & 0 deletions pkg/kubelet/cm/container_manager_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ func (mi *fakeMountInterface) List() ([]mount.MountPoint, error) {
return mi.mountPoints, nil
}

func (mi *fakeMountInterface) IsMountPointMatch(mp mount.MountPoint, dir string) bool {
return (mp.Path == dir)
}

func (mi *fakeMountInterface) IsNotMountPoint(dir string) (bool, error) {
return false, fmt.Errorf("unsupported")
}

func (mi *fakeMountInterface) IsLikelyNotMountPoint(file string) (bool, error) {
return false, fmt.Errorf("unsupported")
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/kubelet/cm/container_manager_unsupported_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ func (mi *fakeMountInterface) List() ([]mount.MountPoint, error) {
return mi.mountPoints, nil
}

func (f *fakeMountInterface) IsMountPointMatch(mp mount.MountPoint, dir string) bool {
return (mp.Path == dir)
}

func (f *fakeMountInterface) IsNotMountPoint(dir string) (bool, error) {
return false, fmt.Errorf("unsupported")
}

func (mi *fakeMountInterface) IsLikelyNotMountPoint(file string) (bool, error) {
return false, fmt.Errorf("unsupported")
}
Expand Down
67 changes: 47 additions & 20 deletions pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,17 @@ type serviceChangeMap struct {
items map[types.NamespacedName]*serviceChange
}

type updateEndpointMapResult struct {
hcEndpoints map[types.NamespacedName]int
staleEndpoints map[endpointServicePair]bool
staleServiceNames map[proxy.ServicePortName]bool
}

type updateServiceMapResult struct {
hcServices map[types.NamespacedName]uint16
staleServices sets.String
}

type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo

Expand Down Expand Up @@ -694,29 +705,29 @@ func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool
// <changes> map is cleared after applying them.
func updateServiceMap(
serviceMap proxyServiceMap,
changes *serviceChangeMap) (hcServices map[types.NamespacedName]uint16, staleServices sets.String) {
staleServices = sets.NewString()
changes *serviceChangeMap) (result updateServiceMapResult) {
result.staleServices = sets.NewString()

func() {
changes.lock.Lock()
defer changes.lock.Unlock()
for _, change := range changes.items {
existingPorts := serviceMap.merge(change.current)
serviceMap.unmerge(change.previous, existingPorts, staleServices)
serviceMap.unmerge(change.previous, existingPorts, result.staleServices)
}
changes.items = make(map[types.NamespacedName]*serviceChange)
}()

// TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to serviceMap.
hcServices = make(map[types.NamespacedName]uint16)
result.hcServices = make(map[types.NamespacedName]uint16)
for svcPortName, info := range serviceMap {
if info.healthCheckNodePort != 0 {
hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort)
result.hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort)
}
}

return hcServices, staleServices
return result
}

func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
Expand Down Expand Up @@ -755,16 +766,17 @@ func (proxier *Proxier) OnEndpointsSynced() {
func updateEndpointsMap(
endpointsMap proxyEndpointsMap,
changes *endpointsChangeMap,
hostname string) (hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {
staleSet = make(map[endpointServicePair]bool)
hostname string) (result updateEndpointMapResult) {
result.staleEndpoints = make(map[endpointServicePair]bool)
result.staleServiceNames = make(map[proxy.ServicePortName]bool)

func() {
changes.lock.Lock()
defer changes.lock.Unlock()
for _, change := range changes.items {
endpointsMap.unmerge(change.previous)
endpointsMap.merge(change.current)
detectStaleConnections(change.previous, change.current, staleSet)
detectStaleConnections(change.previous, change.current, result.staleEndpoints, result.staleServiceNames)
}
changes.items = make(map[types.NamespacedName]*endpointsChange)
}()
Expand All @@ -775,18 +787,17 @@ func updateEndpointsMap(

// TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to endpointsMap.
hcEndpoints = make(map[types.NamespacedName]int)
result.hcEndpoints = make(map[types.NamespacedName]int)
localIPs := getLocalIPs(endpointsMap)
for nsn, ips := range localIPs {
hcEndpoints[nsn] = len(ips)
result.hcEndpoints[nsn] = len(ips)
}

return hcEndpoints, staleSet
return result
}

// <staleEndpoints> are modified by this function with detected stale
// connections.
func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool) {
// <staleEndpoints> and <staleServices> are modified by this function with detected stale connections.
func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool, staleServiceNames map[proxy.ServicePortName]bool) {
for svcPortName, epList := range oldEndpointsMap {
for _, ep := range epList {
stale := true
Expand All @@ -802,6 +813,13 @@ func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap,
}
}
}

for svcPortName, epList := range newEndpointsMap {
// For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service.
if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 {
staleServiceNames[svcPortName] = true
}
}
}

func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String {
Expand Down Expand Up @@ -983,11 +1001,20 @@ func (proxier *Proxier) syncProxyRules() {
// We assume that if this was called, we really want to sync them,
// even if nothing changed in the meantime. In other words, callers are
// responsible for detecting no-op changes and not calling this function.
hcServices, staleServices := updateServiceMap(
serviceUpdateResult := updateServiceMap(
proxier.serviceMap, &proxier.serviceChanges)
hcEndpoints, staleEndpoints := updateEndpointsMap(
endpointUpdateResult := updateEndpointsMap(
proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)

staleServices := serviceUpdateResult.staleServices
// merge stale services gathered from updateEndpointsMap
for svcPortName := range endpointUpdateResult.staleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.protocol == api.ProtocolUDP {
glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.clusterIP.String())
staleServices.Insert(svcInfo.clusterIP.String())
}
}

glog.V(3).Infof("Syncing iptables rules")

// Create and link the kube services chain.
Expand Down Expand Up @@ -1594,17 +1621,17 @@ func (proxier *Proxier) syncProxyRules() {
// Update healthchecks. The endpoints list might include services that are
// not "OnlyLocal", but the services list will not, and the healthChecker
// will just drop those endpoints.
if err := proxier.healthChecker.SyncServices(hcServices); err != nil {
if err := proxier.healthChecker.SyncServices(serviceUpdateResult.hcServices); err != nil {
glog.Errorf("Error syncing healtcheck services: %v", err)
}
if err := proxier.healthChecker.SyncEndpoints(hcEndpoints); err != nil {
if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil {
glog.Errorf("Error syncing healthcheck endoints: %v", err)
}

// Finish housekeeping.
// TODO: these and clearUDPConntrackForPort() could be made more consistent.
utilproxy.DeleteServiceConnections(proxier.exec, staleServices.List())
proxier.deleteEndpointConnections(staleEndpoints)
proxier.deleteEndpointConnections(endpointUpdateResult.staleEndpoints)
}

// Clear UDP conntrack for port or all conntrack entries when port equal zero.
Expand Down

0 comments on commit e76b450

Please sign in to comment.