Skip to content

Commit

Permalink
OPNET-197: Extend logic for detecting Node IP
Browse files Browse the repository at this point in the history
When generating keepalived.conf we are relying on the logic to gather
IPs of all the cluster nodes for the IP stack used by the specific VIP.
This logic currently relies only on the addresses reported as part of
Node.Status.Addresses.

In some scenarios it may be that the node is not reporting all its IPs
via kubelet but still have those available. If we detect such a scenario
(e.g. kubelet reporting only IPv4, but VIP being IPv6), we will check
for Node annotations created by OVN as those use different source of
truth so kubelet not reporting IPs is not affecting it.

The newly introduced behaviour is just a fallback in case
Node.Status.Addresses does not contain an IP of a requested stack,
therefore not changing the behaviour for currently working scenarios.

Contributes-to: OPNET-197
  • Loading branch information
mkowalski committed Feb 1, 2023
1 parent 2905c04 commit 9d8f8ce
Show file tree
Hide file tree
Showing 48 changed files with 6,500 additions and 37 deletions.
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -47,6 +47,7 @@ require (
github.com/nxadm/tail v1.4.4 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/thoas/go-funk v0.9.3 // indirect
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Expand Up @@ -275,6 +275,8 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2 h1:b6uOv7YOFK0TYG7HtkIgExQo+2RdLuwRft63jn2HWj8=
github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw=
github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/vishvananda/netlink v1.1.0 h1:1iyaYNBLmP6L0220aDnYQpo1QEV4t4hJ+xEEhhJH8j0=
github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE=
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/net.go
Expand Up @@ -41,7 +41,7 @@ func getInterfaceAndNonVIPAddrFromFile(vip net.IP) (*net.Interface, *net.IPNet,
if err != nil {
return nil, nil, err
}
return utils.GetInterfaceWithCidrByIP(ip)
return utils.GetInterfaceWithCidrByIP(ip, true)
}

// NOTE(bnemec): All addresses in the vips array must be the same ip version
Expand Down
117 changes: 93 additions & 24 deletions pkg/config/node.go
Expand Up @@ -3,6 +3,7 @@ package config
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -291,7 +292,7 @@ func IsUpgradeStillRunning(kubeconfigPath string) (bool, error) {
return true, nil
}

func GetIngressConfig(kubeconfigPath string, filterIpType string) (ingressConfig IngressConfig, err error) {
func GetIngressConfig(kubeconfigPath, apiVip, ingressVip string) (ingressConfig IngressConfig, err error) {
config, err := utils.GetClientConfig("", kubeconfigPath)
if err != nil {
return ingressConfig, err
Expand All @@ -306,23 +307,89 @@ func GetIngressConfig(kubeconfigPath string, filterIpType string) (ingressConfig
return ingressConfig, err
}

// As it is not possible to get cluster's Machine Network directly, we are using a workaround
// by detecting which of the local interfaces belongs to the same subnet as requested VIP.
// This interface can be used to detect what was the original machine network as it contains
// the subnet mask that we need.
machineNetwork, err := utils.GetLocalCIDRByIP(apiVip)
if err != nil {
return ingressConfig, err
}

for _, node := range nodes.Items {
for _, address := range node.Status.Addresses {
if address.Type == v1.NodeInternalIP {
if filterIpType != "" {
if (net.ParseIP(filterIpType).To4() != nil && net.ParseIP(address.Address).To4() == nil) ||
(net.ParseIP(filterIpType).To4() == nil && net.ParseIP(address.Address).To4() != nil) {
continue
}
}
ingressConfig.Peers = append(ingressConfig.Peers, address.Address)
}
addr := getNodeIpForRequestedIpStack(node, apiVip, ingressVip, machineNetwork)
if addr != "" {
ingressConfig.Peers = append(ingressConfig.Peers, addr)
}
}

return ingressConfig, nil
}

func getNodeIpForRequestedIpStack(node v1.Node, apiVip, ingressVip, machineNetwork string) string {
log.SetLevel(logrus.DebugLevel)
log.Debugf("Searching for Node IP of %s. Using '%s' as machine network. Filtering out VIPs '%s' and '%s'.", node.Name, machineNetwork, apiVip, ingressVip)

isFilterV4 := utils.IsIPv4Addr(apiVip)
isFilterV6 := utils.IsIPv6Addr(apiVip)

if !isFilterV4 && !isFilterV6 {
return ""
}

// We need to collect IP address of a matching IP stack for every node that is part of the
// cluster. We need to account for a scenario where Node.Status.Addresses list is incomplete
// and use different source of the address.
//
// We will use here the following sources:
// 1) Node.Status.Addresses list
// 2) Node annotation "k8s.ovn.org/host-addresses" in combination with Machine Networks
//
// If none of those returns a conclusive result, we don't return an IP for this node. This is
// not a desired outcome, but can be extended in the future if desired.

var addr string
for _, address := range node.Status.Addresses {
if address.Type == v1.NodeInternalIP {
if (utils.IsIPv4Addr(address.Address) && isFilterV4) || (utils.IsIPv6Addr(address.Address) && isFilterV6) {
addr = address.Address
log.Debugf("For node %s selected peer address %s using NodeInternalIP", node.Name, addr)
}
}
}
if addr == "" {
log.Debugf("For node %s can't find address using NodeInternalIP. Fallback to OVN annotation.", node.Name)

var ovnHostAddresses []string
if err := json.Unmarshal([]byte(node.Annotations["k8s.ovn.org/host-addresses"]), &ovnHostAddresses); err != nil {
log.Debugf("Couldn't unmarshall OVN annotations: '%s'. Skipping.", node.Annotations["k8s.ovn.org/host-addresses"])
return ""
}

for _, hostAddr := range ovnHostAddresses {
if hostAddr == apiVip || hostAddr == ingressVip {
log.Debugf("Address %s is VIP. Skipping.", hostAddr)
continue
}
if (utils.IsIPv4Addr(hostAddr) && !isFilterV4) || (utils.IsIPv6Addr(hostAddr) && !isFilterV6) {
log.Debugf("Address %s doesn't match requested IP stack. Skipping.", hostAddr)
continue
}

match, err := utils.IpInCidr(hostAddr, machineNetwork)
if err != nil {
log.Debugf("Address '%s' and subnet '%s' couldn't be parsed. Skipping.", hostAddr, machineNetwork)
continue
}
if match {
addr = hostAddr
log.Debugf("For node %s selected peer address %s using using OVN annotations.", node.Name, addr)
}
}
}
return addr
}

// Returns a Node object populated with the configuration specified by the parameters
// to the function.
// kubeconfigPath: The path to a kubeconfig that can be used to read cluster status
Expand Down Expand Up @@ -457,8 +524,7 @@ func getNodeConfig(kubeconfigPath, clusterConfigPath, resolvConfPath string, api

// getSortedBackends builds config to communicate with kube-api based on kubeconfigPath parameter value, if kubeconfigPath is not empty it will build the
// config based on that content else config will point to localhost.
func getSortedBackends(kubeconfigPath string, readFromLocalAPI bool, apiVip net.IP) (backends []Backend, err error) {

func getSortedBackends(kubeconfigPath string, readFromLocalAPI bool, apiVip, ingressVip net.IP) (backends []Backend, err error) {
kubeApiServerUrl := ""
if readFromLocalAPI {
kubeApiServerUrl = localhostKubeApiServerUrl
Expand All @@ -483,15 +549,18 @@ func getSortedBackends(kubeconfigPath string, readFromLocalAPI bool, apiVip net.
}).Info("Failed to get master Nodes list")
return []Backend{}, err
}
apiVipv6 := utils.IsIPv6(apiVip)

// As it is not possible to get cluster's Machine Network directly, we are using a workaround
// by detecting which of the local interfaces belongs to the same subnet as requested VIP.
// This interface can be used to detect what was the original machine network as it contains
// the subnet mask that we need.
machineNetwork, err := utils.GetLocalCIDRByIP(apiVip.String())
if err != nil {
log.Warnf("Could not retrieve subnet for IP %s", apiVip.String())
}

for _, node := range nodes.Items {
masterIp := ""
for _, address := range node.Status.Addresses {
if address.Type == v1.NodeInternalIP && utils.IsIPv6(net.ParseIP(address.Address)) == apiVipv6 {
masterIp = address.Address
break
}
}
masterIp := getNodeIpForRequestedIpStack(node, apiVip.String(), ingressVip.String(), machineNetwork)
if masterIp != "" {
backends = append(backends, Backend{Host: node.ObjectMeta.Name, Address: masterIp})
} else {
Expand All @@ -505,7 +574,7 @@ func getSortedBackends(kubeconfigPath string, readFromLocalAPI bool, apiVip net.
return backends, err
}

func GetLBConfig(kubeconfigPath string, apiPort, lbPort, statPort uint16, apiVip net.IP) (ApiLBConfig, error) {
func GetLBConfig(kubeconfigPath string, apiPort, lbPort, statPort uint16, apiVip, ingressVip net.IP) (ApiLBConfig, error) {
config := ApiLBConfig{
ApiPort: apiPort,
LbPort: lbPort,
Expand All @@ -517,11 +586,11 @@ func GetLBConfig(kubeconfigPath string, apiPort, lbPort, statPort uint16, apiVip
config.FrontendAddr = "::"
}
// Try reading master nodes details first from api-vip:kube-apiserver and failover to localhost:kube-apiserver
backends, err := getSortedBackends(kubeconfigPath, false, apiVip)
backends, err := getSortedBackends(kubeconfigPath, false, apiVip, ingressVip)
if err != nil {
log.Infof("An error occurred while trying to read master nodes details from api-vip:kube-apiserver: %v", err)
log.Infof("Trying to read master nodes details from localhost:kube-apiserver")
backends, err = getSortedBackends(kubeconfigPath, true, apiVip)
backends, err = getSortedBackends(kubeconfigPath, true, apiVip, ingressVip)
if err != nil {
log.WithFields(logrus.Fields{
"kubeconfigPath": kubeconfigPath,
Expand Down
125 changes: 125 additions & 0 deletions pkg/config/node_test.go
@@ -0,0 +1,125 @@
package config

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var (
testOvnHostAddressesAnnotation = map[string]string{
"k8s.ovn.org/host-addresses": "[\"192.168.1.102\",\"192.168.1.99\",\"192.168.1.101\",\"fd00::101\",\"2001:db8::49a\",\"fd00::102\",\"fd00::5\",\"fd69::2\"]",
}

testNodeDualStack1 = v1.Node{Status: v1.NodeStatus{Addresses: []v1.NodeAddress{
{Type: "InternalIP", Address: "192.168.1.99"},
{Type: "InternalIP", Address: "fd00::5"},
{Type: "ExternalIP", Address: "172.16.1.99"},
}}}
testNodeDualStack2 = v1.Node{
Status: v1.NodeStatus{Addresses: []v1.NodeAddress{
{Type: "InternalIP", Address: "192.168.1.99"},
{Type: "ExternalIP", Address: "172.16.1.99"},
}},
ObjectMeta: metav1.ObjectMeta{
Annotations: testOvnHostAddressesAnnotation,
},
}
testNodeDualStack3 = v1.Node{
ObjectMeta: metav1.ObjectMeta{
Annotations: testOvnHostAddressesAnnotation,
},
}
testNodeSingleStackV4 = v1.Node{Status: v1.NodeStatus{Addresses: []v1.NodeAddress{
{Type: "InternalIP", Address: "192.168.1.99"},
{Type: "ExternalIP", Address: "172.16.1.99"},
}}}
testNodeSingleStackV6 = v1.Node{Status: v1.NodeStatus{Addresses: []v1.NodeAddress{
{Type: "InternalIP", Address: "fd00::5"},
{Type: "ExternalIP", Address: "2001:db8::49a"},
}}}

testMachineNetworkV4 = "192.168.1.0/24"
testMachineNetworkV6 = "fd00::5/64"
testApiVipV4 = "192.168.1.101"
testApiVipV6 = "fd00::101"
testIngressVipV4 = "192.168.1.102"
testIngressVipV6 = "fd00::102"
)

var _ = Describe("getNodePeersForIpStack", func() {
Context("for dual-stack node", func() {
Context("with address only in status", func() {
It("matches an IPv4 VIP", func() {
res := getNodeIpForRequestedIpStack(testNodeDualStack1, testApiVipV4, testIngressVipV4, testMachineNetworkV4)
Expect(res).To(Equal("192.168.1.99"))
})
It("matches an IPv6 VIP", func() {
res := getNodeIpForRequestedIpStack(testNodeDualStack1, testApiVipV6, testIngressVipV6, testMachineNetworkV6)
Expect(res).To(Equal("fd00::5"))
})
})

Context("with address only in OVN annotation", func() {
It("matches an IPv4 VIP", func() {
res := getNodeIpForRequestedIpStack(testNodeDualStack3, testApiVipV4, testIngressVipV4, testMachineNetworkV4)
Expect(res).To(Equal("192.168.1.99"))
})
It("matches an IPv6 VIP", func() {
res := getNodeIpForRequestedIpStack(testNodeDualStack3, testApiVipV6, testIngressVipV6, testMachineNetworkV6)
Expect(res).To(Equal("fd00::5"))
})
})

Context("with address in status and OVN annotation", func() {
It("matches an IPv4 VIP", func() {
res := getNodeIpForRequestedIpStack(testNodeDualStack2, testApiVipV4, testIngressVipV4, testMachineNetworkV4)
Expect(res).To(Equal("192.168.1.99"))
})
It("matches an IPv6 VIP", func() {
res := getNodeIpForRequestedIpStack(testNodeDualStack2, testApiVipV6, testIngressVipV6, testMachineNetworkV6)
Expect(res).To(Equal("fd00::5"))
})
})
})

Context("for single-stack v4 node", func() {
It("matches an IPv4 VIP", func() {
res := getNodeIpForRequestedIpStack(testNodeSingleStackV4, testApiVipV4, testIngressVipV4, testMachineNetworkV4)
Expect(res).To(Equal("192.168.1.99"))
})
It("empty for IPv6 VIP", func() {
res := getNodeIpForRequestedIpStack(testNodeSingleStackV4, testApiVipV6, testIngressVipV6, testMachineNetworkV6)
Expect(res).To(Equal(""))
})
})

Context("for single-stack v6 node", func() {
It("empty for IPv4 VIP", func() {
res := getNodeIpForRequestedIpStack(testNodeSingleStackV6, testApiVipV4, testIngressVipV4, testMachineNetworkV4)
Expect(res).To(Equal(""))
})
It("matches an IPv6 VIP", func() {
res := getNodeIpForRequestedIpStack(testNodeSingleStackV6, testApiVipV6, testIngressVipV6, testMachineNetworkV6)
Expect(res).To(Equal("fd00::5"))
})
})

It("empty for empty node", func() {
res := getNodeIpForRequestedIpStack(v1.Node{}, testApiVipV4, testIngressVipV4, testMachineNetworkV4)
Expect(res).To(Equal(""))
})

It("empty for node with IPs and empty VIP requested", func() {
res := getNodeIpForRequestedIpStack(testNodeSingleStackV4, "", "", testMachineNetworkV4)
Expect(res).To(Equal(""))
})
})

func Test(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Config tests")
}
12 changes: 6 additions & 6 deletions pkg/monitor/dynkeepalived.go
Expand Up @@ -64,23 +64,23 @@ func updateUnicastConfig(kubeconfigPath string, newConfig *config.Node) {
if !newConfig.EnableUnicast {
return
}
newConfig.IngressConfig, err = config.GetIngressConfig(kubeconfigPath, newConfig.Cluster.APIVIP)
newConfig.IngressConfig, err = config.GetIngressConfig(kubeconfigPath, newConfig.Cluster.APIVIP, newConfig.Cluster.IngressVIP)
if err != nil {
log.Warnf("Could not retrieve ingress config: %v", err)
}

newConfig.LBConfig, err = config.GetLBConfig(kubeconfigPath, dummyPortNum, dummyPortNum, dummyPortNum, net.ParseIP(newConfig.Cluster.APIVIP))
newConfig.LBConfig, err = config.GetLBConfig(kubeconfigPath, dummyPortNum, dummyPortNum, dummyPortNum, net.ParseIP(newConfig.Cluster.APIVIP), net.ParseIP(newConfig.Cluster.IngressVIP))
if err != nil {
log.Warnf("Could not retrieve LB config: %v", err)
}

for i, c := range *newConfig.Configs {
// Must do this by index instead of using c because c is local to this loop
(*newConfig.Configs)[i].IngressConfig, err = config.GetIngressConfig(kubeconfigPath, c.Cluster.APIVIP)
(*newConfig.Configs)[i].IngressConfig, err = config.GetIngressConfig(kubeconfigPath, c.Cluster.APIVIP, c.Cluster.IngressVIP)
if err != nil {
log.Warnf("Could not retrieve ingress config: %v", err)
}
(*newConfig.Configs)[i].LBConfig, err = config.GetLBConfig(kubeconfigPath, dummyPortNum, dummyPortNum, dummyPortNum, net.ParseIP(c.Cluster.APIVIP))
(*newConfig.Configs)[i].LBConfig, err = config.GetLBConfig(kubeconfigPath, dummyPortNum, dummyPortNum, dummyPortNum, net.ParseIP(c.Cluster.APIVIP), net.ParseIP(c.Cluster.IngressVIP))
if err != nil {
log.Warnf("Could not retrieve LB config: %v", err)
}
Expand Down Expand Up @@ -147,7 +147,7 @@ func handleBootstrapStopKeepalived(kubeconfigPath string, bootstrapStopKeepalive
first that it's operational. */
log.Info("handleBootstrapStopKeepalived: verify first that local kube-apiserver is operational")
for start := time.Now(); time.Since(start) < time.Second*30; {
if _, err := config.GetIngressConfig(kubeconfigPath, ""); err == nil {
if _, err := config.GetIngressConfig(kubeconfigPath, "", ""); err == nil {
log.Info("handleBootstrapStopKeepalived: local kube-apiserver is operational")
break
}
Expand All @@ -156,7 +156,7 @@ func handleBootstrapStopKeepalived(kubeconfigPath string, bootstrapStopKeepalive
}

for {
if _, err := config.GetIngressConfig(kubeconfigPath, ""); err != nil {
if _, err := config.GetIngressConfig(kubeconfigPath, "", ""); err != nil {
// We have started to talk to Ironic through the API VIP as well,
// so if Ironic is still up then we need to keep the VIP, even if
// the apiserver has gone down.
Expand Down
8 changes: 7 additions & 1 deletion pkg/monitor/monitor.go
Expand Up @@ -58,7 +58,13 @@ func Monitor(kubeconfigPath, clusterName, clusterDomain, templatePath, cfgPath s
}
return nil
default:
config, err := config.GetLBConfig(kubeconfigPath, apiPort, lbPort, statPort, net.ParseIP(apiVips[0]))
// NOTE(mko) This codepath handles only haproxy configuration that uses IPv4. For this
// reason we can pass empty as Ingress VIP to the GetLBConfig function as
// the sole use of the Ingress VIP there is to filter out IPs when collecting
// IPs of the nodes.
// As this filtering is relevant only for IPv6 addresses, passing nil value
// does not change the behaviour.
config, err := config.GetLBConfig(kubeconfigPath, apiPort, lbPort, statPort, net.ParseIP(apiVips[0]), net.IP{})
if err != nil {
log.WithFields(logrus.Fields{
"kubeconfigPath": kubeconfigPath,
Expand Down

0 comments on commit 9d8f8ce

Please sign in to comment.