Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug 1988512: [release-4.6] Fix LGW externalIP #676

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
130 changes: 15 additions & 115 deletions go-controller/pkg/node/gateway_localnet.go
Expand Up @@ -73,31 +73,24 @@ func getLocalAddrs() (map[string]net.IPNet, error) {
return localAddrSet, nil
}

func (npw *localPortWatcherData) networkHasAddress(ip net.IP) bool {
for _, net := range npw.localAddrSet {
if net.Contains(ip) {
return true
}
}
return false
}

func (npw *localPortWatcherData) addService(svc *kapi.Service) error {
// don't process headless service or services that do not have NodePorts or ExternalIPs
if !util.ServiceTypeHasClusterIP(svc) || !util.IsClusterIPSet(svc) {
return nil
}

iptRules := []iptRule{}
isIPv6Service := utilnet.IsIPv6String(svc.Spec.ClusterIP)
gatewayIP := npw.gatewayIPv4
if isIPv6Service {
gatewayIP = npw.gatewayIPv6
}
// holds map of external ips and if they are currently using routes
routeUsage := make(map[string]bool)
for _, port := range svc.Spec.Ports {
if util.ServiceTypeHasNodePort(svc) {
if err := util.ValidatePort(port.Protocol, port.NodePort); err != nil {
klog.Warningf("Invalid service node port %s, err: %v", port.Name, err)
continue
}

if gatewayIP != "" {
// Fix Azure/GCP LoadBalancers. They will forward traffic directly to the node with the
// dest address as the load-balancer ingress IP and port
Expand All @@ -120,36 +113,8 @@ func (npw *localPortWatcherData) addService(svc *kapi.Service) error {
externalIP, svc.Namespace, svc.Name, svc.Spec.ClusterIP)
continue
}
if _, exists := npw.localAddrSet[externalIP]; exists {
if !util.IsClusterIPSet(svc) {
serviceRef := kapi.ObjectReference{
Kind: "Service",
Namespace: svc.Namespace,
Name: svc.Name,
}
npw.recorder.Eventf(&serviceRef, kapi.EventTypeWarning, "UnsupportedServiceDefinition", "Unsupported service definition, headless service: %s with a local ExternalIP is not supported by ovn-kubernetes in local gateway mode", svc.Name)
klog.Warningf("UnsupportedServiceDefinition event for service %s in namespace %s", svc.Name, svc.Namespace)
continue
}
iptRules = append(iptRules, getExternalIPTRules(port, externalIP, svc.Spec.ClusterIP)...)
klog.V(5).Infof("Will add iptables rule for ExternalIP: %s", externalIP)
} else if npw.networkHasAddress(net.ParseIP(externalIP)) {
klog.V(5).Infof("ExternalIP: %s is reachable through one of the interfaces on this node, will skip setup", externalIP)
} else {
if gatewayIP != "" {
routeUsage[externalIP] = true
} else {
klog.Warningf("No gateway of appropriate IP family for ExternalIP %s for Service %s/%s",
externalIP, svc.Namespace, svc.Name)
}
}
}
}
for externalIP := range routeUsage {
if stdout, stderr, err := util.RunIP("route", "replace", externalIP, "via", gatewayIP, "dev", util.K8sMgmtIntfName, "table", localnetGatewayExternalIDTable); err != nil {
klog.Errorf("Error adding routing table entry for ExternalIP %s, via gw: %s: stdout: %s, stderr: %s, err: %v", externalIP, gatewayIP, stdout, stderr, err)
} else {
klog.V(5).Infof("Successfully added route for ExternalIP: %s", externalIP)
iptRules = append(iptRules, getExternalIPTRules(port, externalIP, svc.Spec.ClusterIP)...)
klog.V(5).Infof("Will add iptables rule for ExternalIP: %s", externalIP)
}
}
klog.V(5).Infof("Adding iptables rules: %v for service: %v", iptRules, svc.Name)
Expand All @@ -163,8 +128,6 @@ func (npw *localPortWatcherData) deleteService(svc *kapi.Service) error {
if isIPv6Service {
gatewayIP = npw.gatewayIPv6
}
// holds map of external ips and if they are currently using routes
routeUsage := make(map[string]bool)
// Note that unlike with addService we just silently ignore IPv4/IPv6 mismatches here
for _, port := range svc.Spec.Ports {
if util.ServiceTypeHasNodePort(svc) {
Expand All @@ -181,24 +144,10 @@ func (npw *localPortWatcherData) deleteService(svc *kapi.Service) error {
if utilnet.IsIPv6String(externalIP) != isIPv6Service {
continue
}
if _, exists := npw.localAddrSet[externalIP]; exists {
iptRules = append(iptRules, getExternalIPTRules(port, externalIP, svc.Spec.ClusterIP)...)
klog.V(5).Infof("Will delete iptables rule for ExternalIP: %s", externalIP)
} else if npw.networkHasAddress(net.ParseIP(externalIP)) {
klog.V(5).Infof("ExternalIP: %s is reachable through one of the interfaces on this node, will skip cleanup", externalIP)
} else {
if gatewayIP != "" {
routeUsage[externalIP] = true
}
}
}
}

iptRules = append(iptRules, getExternalIPTRules(port, externalIP, svc.Spec.ClusterIP)...)
klog.V(5).Infof("Will delete iptables rule for ExternalIP: %s", externalIP)

for externalIP := range routeUsage {
if stdout, stderr, err := util.RunIP("route", "del", externalIP, "via", gatewayIP, "dev", util.K8sMgmtIntfName, "table", localnetGatewayExternalIDTable); err != nil {
klog.Errorf("Error delete routing table entry for ExternalIP %s: stdout: %s, stderr: %s, err: %v", externalIP, stdout, stderr, err)
} else {
klog.V(5).Infof("Successfully deleted route for ExternalIP: %s", externalIP)
}
}

Expand All @@ -207,73 +156,24 @@ func (npw *localPortWatcherData) deleteService(svc *kapi.Service) error {
}

func (npw *localPortWatcherData) syncServices(serviceInterface []interface{}) {
removeStaleRoutes := func(keepRoutes []string) {
stdout, stderr, err := util.RunIP("route", "list", "table", localnetGatewayExternalIDTable)
if err != nil || stdout == "" {
klog.Infof("No routing table entries for ExternalIP table %s: stdout: %s, stderr: %s, err: %v",
localnetGatewayExternalIDTable, stdout, strings.Replace(stderr, "\n", "", -1), err)
return
}
for _, existingRoute := range strings.Split(stdout, "\n") {
isFound := false
for _, keepRoute := range keepRoutes {
if strings.Contains(existingRoute, keepRoute) {
isFound = true
break
}
}
if !isFound {
klog.V(5).Infof("Deleting stale routing rule: %s", existingRoute)
if _, stderr, err := util.RunIP("route", "del", existingRoute, "table", localnetGatewayExternalIDTable); err != nil {
klog.Errorf("Error deleting stale routing rule: stderr: %s, err: %v", stderr, err)
}
}
}
}
keepIPTRules := []iptRule{}
keepRoutes := []string{}
for _, service := range serviceInterface {
svc, ok := service.(*kapi.Service)
if !ok {
klog.Errorf("Spurious object in syncServices: %v", serviceInterface)
continue
}
gatewayIP := npw.gatewayIPv4
if utilnet.IsIPv6String(svc.Spec.ClusterIP) {
gatewayIP = npw.gatewayIPv6
}
if gatewayIP != "" {
keepIPTRules = append(keepIPTRules, getGatewayIPTRules(svc, gatewayIP, nil)...)
}
keepRoutes = append(keepRoutes, svc.Spec.ExternalIPs...)
}
for _, chain := range []string{iptableNodePortChain, iptableExternalIPChain} {
recreateIPTRules("nat", chain, keepIPTRules)
recreateIPTRules("filter", chain, keepIPTRules)
}
removeStaleRoutes(keepRoutes)
}

func initRoutingRules() error {
stdout, stderr, err := util.RunIP("rule")
if err != nil {
return fmt.Errorf("error listing routing rules, stdout: %s, stderr: %s, err: %v", stdout, stderr, err)
}
if !strings.Contains(stdout, fmt.Sprintf("from all lookup %s", localnetGatewayExternalIDTable)) {
if stdout, stderr, err := util.RunIP("rule", "add", "from", "all", "table", localnetGatewayExternalIDTable); err != nil {
return fmt.Errorf("error adding routing rule for ExternalIP table (%s): stdout: %s, stderr: %s, err: %v", localnetGatewayExternalIDTable, stdout, stderr, err)
}
// Previously LGW used routes in the localnetGatewayExternalIDTable, to handle
// upgrades correctly make sure we flush this table of all routes
klog.Infof("Flushing host's routing table: %s", localnetGatewayExternalIDTable)
if _, stderr, err := util.RunIP("route", "flush", "table", localnetGatewayExternalIDTable); err != nil {
klog.Errorf("Error flushing host's routing table: %s stderr: %s err: %v", localnetGatewayExternalIDTable, stderr, err)
}
return nil
}

func (n *OvnNode) watchLocalPorts(npw *localPortWatcherData) error {
if err := initLocalGatewayIPTables(); err != nil {
return err
}
if err := initRoutingRules(); err != nil {
return err
}
n.watchFactory.AddServiceHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
svc := obj.(*kapi.Service)
Expand Down