Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move On*Update handling into sync funcion #44019

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
102 changes: 57 additions & 45 deletions pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,18 @@ type endpointsInfo struct {
isLocal bool
}

func (e *endpointsInfo) String() string {
return fmt.Sprintf("%v", *e)
}

// returns a new serviceInfo struct
func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, service *api.Service) *serviceInfo {
onlyNodeLocalEndpoints := apiservice.NeedsHealthCheck(service) && utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) && (service.Spec.Type == api.ServiceTypeLoadBalancer || service.Spec.Type == api.ServiceTypeNodePort)
onlyNodeLocalEndpoints := false
if utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) &&
(service.Spec.Type == api.ServiceTypeLoadBalancer || service.Spec.Type == api.ServiceTypeNodePort) &&
apiservice.NeedsHealthCheck(service) {
onlyNodeLocalEndpoints = true
}
info := &serviceInfo{
clusterIP: net.ParseIP(service.Spec.ClusterIP),
port: int(port.Port),
Expand Down Expand Up @@ -519,22 +528,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []*api.Service) {
glog.V(2).Info("Received first Services update")
}
proxier.allServices = allServices

newServiceMap, hcPorts, staleUDPServices := buildNewServiceMap(allServices, proxier.serviceMap)

// update healthcheck ports
if err := proxier.healthChecker.SyncServices(hcPorts); err != nil {
glog.Errorf("Error syncing healtcheck ports: %v", err)
}

if len(newServiceMap) != len(proxier.serviceMap) || !reflect.DeepEqual(newServiceMap, proxier.serviceMap) {
proxier.serviceMap = newServiceMap
proxier.syncProxyRules(syncReasonServices)
} else {
glog.V(4).Infof("Skipping proxy iptables rule sync on service update because nothing changed")
}

utilproxy.DeleteServiceConnections(proxier.exec, staleUDPServices.List())
proxier.syncProxyRules(syncReasonServices)
}

// OnEndpointsUpdate takes in a slice of updated endpoints.
Expand All @@ -545,23 +539,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []*api.Endpoints) {
glog.V(2).Info("Received first Endpoints update")
}
proxier.allEndpoints = allEndpoints

// TODO: once service has made this same transform, move this into proxier.syncProxyRules()
newMap, hcEndpoints, staleConnections := buildNewEndpointsMap(proxier.allEndpoints, proxier.endpointsMap, proxier.hostname)

// update healthcheck endpoints
if err := proxier.healthChecker.SyncEndpoints(hcEndpoints); err != nil {
glog.Errorf("Error syncing healthcheck endoints: %v", err)
}

if len(newMap) != len(proxier.endpointsMap) || !reflect.DeepEqual(newMap, proxier.endpointsMap) {
proxier.endpointsMap = newMap
proxier.syncProxyRules(syncReasonEndpoints)
} else {
glog.V(4).Infof("Skipping proxy iptables rule sync on endpoint update because nothing changed")
}

proxier.deleteEndpointConnections(staleConnections)
proxier.syncProxyRules(syncReasonEndpoints)
}

// Convert a slice of api.Endpoints objects into a map of service-port -> endpoints.
Expand All @@ -574,7 +552,7 @@ func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap

// Update endpoints for services.
for i := range allEndpoints {
accumulateEndpointsMap(allEndpoints[i], hostname, curMap, &newMap)
accumulateEndpointsMap(allEndpoints[i], hostname, &newMap)
}
// Check stale connections against endpoints missing from the update.
// TODO: we should really only mark a connection stale if the proto was UDP
Expand Down Expand Up @@ -632,10 +610,7 @@ func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap
// - hostPortInfo and endpointsInfo overlap too much
// - the test for this is overlapped by the test for buildNewEndpointsMap
// - naming is poor and responsibilities are muddled
func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string,
curEndpoints proxyEndpointMap,
newEndpoints *proxyEndpointMap) {

func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, newEndpoints *proxyEndpointMap) {
// We need to build a map of portname -> all ip:ports for that
// portname. Explode Endpoints.Subsets[*] into this structure.
for i := range endpoints.Subsets {
Expand Down Expand Up @@ -761,6 +736,25 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
return
}

// Figure out the new services we need to activate.
newServices, hcServices, staleServices := buildNewServiceMap(proxier.allServices, proxier.serviceMap)

// If this was called because of a services update, but nothing actionable has changed, skip it.
if reason == syncReasonServices && reflect.DeepEqual(newServices, proxier.serviceMap) {
glog.V(3).Infof("Skipping iptables sync because nothing changed")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops! That is why we do code review, I guess.

return
}

// Figure out the new endpoints we need to activate.
newEndpoints, hcEndpoints, staleEndpoints := buildNewEndpointsMap(proxier.allEndpoints, proxier.endpointsMap, proxier.hostname)

// If this was called because of an endpoints update, but nothing actionable has changed, skip it.
if reason == syncReasonEndpoints && reflect.DeepEqual(newEndpoints, proxier.endpointsMap) {
glog.V(3).Infof("Skipping iptables sync because nothing changed")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return ?

return
}

glog.V(3).Infof("Syncing iptables rules")

// Create and link the kube services chain.
Expand Down Expand Up @@ -891,7 +885,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
replacementPortsMap := map[localPort]closeable{}

// Build rules for each service.
for svcName, svcInfo := range proxier.serviceMap {
for svcName, svcInfo := range newServices {
protocol := strings.ToLower(string(svcInfo.protocol))

// Create the per-service chain, retaining counters if possible.
Expand Down Expand Up @@ -1082,7 +1076,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
continue
}
if lp.protocol == "udp" {
proxier.clearUdpConntrackForPort(lp.port)
proxier.clearUDPConntrackForPort(lp.port)
}
replacementPortsMap[lp] = socket
} // We're holding the port, so it's OK to install iptables rules.
Expand All @@ -1108,7 +1102,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
// table doesn't currently have the same per-service structure that
// the nat table does, so we just stick this into the kube-services
// chain.
if len(proxier.endpointsMap[svcName]) == 0 {
if len(newEndpoints[svcName]) == 0 {
writeLine(filterRules,
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcName.String()),
Expand All @@ -1121,7 +1115,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
}

// If the service has no endpoints then reject packets.
if len(proxier.endpointsMap[svcName]) == 0 {
if len(newEndpoints[svcName]) == 0 {
writeLine(filterRules,
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcName.String()),
Expand All @@ -1140,7 +1134,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
// These two slices parallel each other - keep in sync
endpoints := make([]*endpointsInfo, 0)
endpointChains := make([]utiliptables.Chain, 0)
for _, ep := range proxier.endpointsMap[svcName] {
for _, ep := range newEndpoints[svcName] {
endpoints = append(endpoints, ep)
endpointChain := servicePortEndpointChainName(svcName, protocol, ep.endpoint)
endpointChains = append(endpointChains, endpointChain)
Expand Down Expand Up @@ -1317,14 +1311,32 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
}
}
proxier.portsMap = replacementPortsMap

// Update healthchecks. The endpoints list might include services that are
// not "OnlyLocal", but the services list will not, and the healthChecker
// will just drop those endpoints.
if err := proxier.healthChecker.SyncServices(hcServices); err != nil {
glog.Errorf("Error syncing healtcheck services: %v", err)
}
if err := proxier.healthChecker.SyncEndpoints(hcEndpoints); err != nil {
glog.Errorf("Error syncing healthcheck endoints: %v", err)
}

// Finish housekeeping.
proxier.serviceMap = newServices
proxier.endpointsMap = newEndpoints

// TODO: these and clearUDPConntrackForPort() could be made more consistent.
utilproxy.DeleteServiceConnections(proxier.exec, staleServices.List())
proxier.deleteEndpointConnections(staleEndpoints)
}

// Clear UDP conntrack for port or all conntrack entries when port equal zero.
// When a packet arrives, it will not go through NAT table again, because it is not "the first" packet.
// The solution is clearing the conntrack. Known issus:
// https://github.com/docker/docker/issues/8795
// https://github.com/kubernetes/kubernetes/issues/31983
func (proxier *Proxier) clearUdpConntrackForPort(port int) {
func (proxier *Proxier) clearUDPConntrackForPort(port int) {
glog.V(2).Infof("Deleting conntrack entries for udp connections")
if port > 0 {
err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "-p", "udp", "--dport", strconv.Itoa(port))
Expand Down