Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual cherry pick of part of #44053 to release-1.5 #44365

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
69 changes: 45 additions & 24 deletions pkg/proxy/iptables/proxier.go
Expand Up @@ -436,6 +436,9 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
proxier.haveReceivedServiceUpdate = true

activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set
// Warning: activeLocalServices is used as a surgical fix in 1.5 for the health check bug (#44053).
// This fix does not exist on 1.6+ as we overhauled healthcheck package completely.
activeLocalServices := make(map[types.NamespacedName]bool) // use a map as a set

for i := range allServices {
service := &allServices[i]
Expand Down Expand Up @@ -482,6 +485,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
info.loadBalancerSourceRanges = service.Spec.LoadBalancerSourceRanges
info.onlyNodeLocalEndpoints = apiservice.NeedsHealthCheck(service) && featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() && (service.Spec.Type == api.ServiceTypeLoadBalancer || service.Spec.Type == api.ServiceTypeNodePort)
if info.onlyNodeLocalEndpoints {
activeLocalServices[serviceName.NamespacedName] = true
p := apiservice.GetServiceHealthCheckNodePort(service)
if p == 0 {
glog.Errorf("Service does not contain necessary annotation %v",
Expand Down Expand Up @@ -512,11 +516,16 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
staleUDPServices.Insert(info.clusterIP.String())
}
delete(proxier.serviceMap, name)

if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 {
// Remove ServiceListener health check nodePorts from the health checker
// TODO - Stats
glog.V(4).Infof("Deleting health check for %+v, port %v", name.NamespacedName, info.healthCheckNodePort)
healthcheck.DeleteServiceListener(name.NamespacedName, info.healthCheckNodePort)
// Remove health check when service disappeared, but not when service+port disappeared.
// Changes on service's ports should not trigger health check deletion.
if !activeLocalServices[name.NamespacedName] {
// Remove ServiceListener health check nodePorts from the health checker
// TODO - Stats
glog.V(4).Infof("Deleting health check for %+v, port %v", name.NamespacedName, info.healthCheckNodePort)
healthcheck.DeleteServiceListener(name.NamespacedName, info.healthCheckNodePort)
}
}
}
}
Expand Down Expand Up @@ -634,16 +643,41 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
}
}

// Update service health check
allSvcPorts := make(map[proxy.ServicePortName]bool)
// Update service health check. We include entries from the current map,
// with zero-length value, to trigger the healthchecker to stop reporting
// health for that service.
//
// This whole mechanism may be over-designed. It builds a list of endpoints
// per service, filters for local endpoints, builds a string that is the
// same as the name, and then passes each (name, list) pair over a channel.
//
// I am pretty sure that there's no way there can be more than one entry in
// the final list, and passing an empty list as a delete signal is weird.
// It could probably be simplified to a synchronous function call of a set
// of NamespacedNames. I am not making that simplification at this time.
//
// ServicePortName includes the port name, which doesn't matter for
// healthchecks. It's possible that a single update both added and removed
// ports on the same IP, so we need to make sure that removals are counted,
// with additions overriding them. Track all endpoints so we can find local
// ones.
epsBySvcName := map[types.NamespacedName][]*endpointsInfo{}
for svcPort := range proxier.endpointsMap {
allSvcPorts[svcPort] = true
epsBySvcName[svcPort.NamespacedName] = nil
}
for svcPort := range newEndpointsMap {
allSvcPorts[svcPort] = true
}
for svcPort := range allSvcPorts {
proxier.updateHealthCheckEntries(svcPort.NamespacedName, svcPortToInfoMap[svcPort])
epsBySvcName[svcPort.NamespacedName] = append(epsBySvcName[svcPort.NamespacedName], newEndpointsMap[svcPort]...)
}
for nsn, eps := range epsBySvcName {
// Use a set instead of a slice to provide deduplication
epSet := sets.NewString()
for _, ep := range eps {
if ep.localEndpoint {
// kube-proxy health check only needs local endpoints
epSet.Insert(fmt.Sprintf("%s/%s", nsn.Namespace, nsn.Name))
}
}
healthcheck.UpdateEndpoints(nsn, epSet)
}

if len(newEndpointsMap) != len(proxier.endpointsMap) || !reflect.DeepEqual(newEndpointsMap, proxier.endpointsMap) {
Expand All @@ -656,19 +690,6 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
proxier.deleteEndpointConnections(staleConnections)
}

// updateHealthCheckEntries - send the new set of local endpoints to the health checker
func (proxier *Proxier) updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo) {
// Use a set instead of a slice to provide deduplication
endpoints := sets.NewString()
for _, portInfo := range hostPorts {
if portInfo.localEndpoint {
// kube-proxy health check only needs local endpoints
endpoints.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name))
}
}
healthcheck.UpdateEndpoints(name, endpoints)
}

// used in OnEndpointsUpdate
type hostPortInfo struct {
host string
Expand Down