Skip to content

Commit

Permalink
optimize IPAM initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangzujian committed May 7, 2022
1 parent 76fe9ce commit 2e681af
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 123 deletions.
2 changes: 1 addition & 1 deletion docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

1. Kube-OVN is developed by [Go](https://golang.org/) 1.16 and uses [Go Modules](https://github.com/golang/go/wiki/Modules) to manage dependency. Make sure `GO111MODULE="on"`.

2. We also use [gosec](https://github.com/securego/gosec) to inspects source code for security problems.
2. We also use [gosec](https://github.com/securego/gosec) to inspect source code for security problems.

```shell
go get github.com/securego/gosec/v2/cmd/gosec
Expand Down
139 changes: 63 additions & 76 deletions pkg/controller/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"

v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -287,6 +288,7 @@ func (c *Controller) initLoadBalancer() error {
}

func (c *Controller) InitIPAM() error {
start := time.Now()
subnets, err := c.subnetsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list subnet: %v", err)
Expand All @@ -298,69 +300,88 @@ func (c *Controller) InitIPAM() error {
}
}

result, err := c.ovnClient.CustomFindEntity("logical_switch_port", []string{"name"}, `external-ids:vendor{<}""`)
if err != nil {
klog.Errorf("failed to find logical switch port without external-ids:vendor: %v", err)
}
lspWithoutVendor := make(map[string]struct{}, len(result))
for _, lsp := range result {
lspWithoutVendor[lsp["name"][0]] = struct{}{}
}

pods, err := c.podsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list pods: %v", err)
return err
}

ips, err := c.ipsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list IPs: %v", err)
return err
}

ipsMap := make(map[string]*kubeovnv1.IP, len(ips))
for _, ip := range ips {
ipsMap[ip.Name] = ip
if ip.Spec.PodType != "StatefulSet" {
continue
}

var ipamKey string
if ip.Spec.Namespace != "" {
ipamKey = fmt.Sprintf("%s/%s", ip.Spec.Namespace, ip.Spec.PodName)
} else {
ipamKey = fmt.Sprintf("node-%s", ip.Spec.PodName)
}
if _, _, _, err = c.ipam.GetStaticAddress(ipamKey, ip.Name, ip.Spec.IPAddress, ip.Spec.MacAddress, ip.Spec.Subnet, true); err != nil {
klog.Errorf("failed to init IPAM from IP CR %s: %v", ip.Name, err)
}
}

for _, pod := range pods {
if pod.Spec.HostNetwork || !isPodAlive(pod) {
continue
}
podName := c.getNameByPod(pod)

podNets, err := c.getPodKubeovnNets(pod)
if err != nil {
klog.Errorf("failed to get pod kubeovn nets %s.%s address %s: %v", pod.Name, pod.Namespace, pod.Annotations[util.IpAddressAnnotation], err)
continue
}

podType := getPodType(pod)
podName := c.getNameByPod(pod)
key := fmt.Sprintf("%s/%s", pod.Namespace, podName)
for _, podNet := range podNets {
if !isOvnSubnet(podNet.Subnet) {
continue
}
if pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName)] == "true" {
key := fmt.Sprintf("%s/%s", pod.Namespace, podName)
portName := ovs.PodNameToPortName(podName, pod.Namespace, podNet.ProviderName)
ip := pod.Annotations[fmt.Sprintf(util.IpAddressAnnotationTemplate, podNet.ProviderName)]
mac := pod.Annotations[fmt.Sprintf(util.MacAddressAnnotationTemplate, podNet.ProviderName)]
subnet := pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, podNet.ProviderName)]
podType := getPodType(pod)
_, _, _, err := c.ipam.GetStaticAddress(key, portName, ip, mac, subnet, false)
_, _, _, err := c.ipam.GetStaticAddress(key, portName, ip, mac, subnet, true)
if err != nil {
klog.Errorf("failed to init pod %s.%s address %s: %v", podName, pod.Namespace, pod.Annotations[fmt.Sprintf(util.IpAddressAnnotationTemplate, podNet.ProviderName)], err)
} else {
err = c.createOrUpdateCrdIPs(podName, ip, mac, subnet, pod.Namespace, pod.Spec.NodeName, podNet.ProviderName, podType)
ipCR := ipsMap[portName]
err = c.createOrUpdateCrdIPs(podName, ip, mac, subnet, pod.Namespace, pod.Spec.NodeName, podNet.ProviderName, podType, &ipCR)
if err != nil {
klog.Errorf("failed to create/update ips CR %s.%s with ip address %s: %v", podName, pod.Namespace, ip, err)
}
}

if err = c.initAppendPodExternalIds(pod); err != nil {
klog.Errorf("failed to init append pod %s.%s externalIds: %v", podName, pod.Namespace, err)
if _, ok := lspWithoutVendor[portName]; ok {
if err = c.initAppendLspExternalIds(portName, pod); err != nil {
klog.Errorf("failed to append external-ids for logical switch port %s: %v", portName, err)
}
}
}
}
}

ips, err := c.ipsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list IPs: %v", err)
return err
}
for _, ip := range ips {
if ip.Spec.PodType != "StatefulSet" {
continue
}

var ipamKey string
if ip.Spec.Namespace != "" {
ipamKey = fmt.Sprintf("%s/%s", ip.Spec.Namespace, ip.Spec.PodName)
} else {
ipamKey = fmt.Sprintf("node-%s", ip.Spec.PodName)
}
if _, _, _, err = c.ipam.GetStaticAddress(ipamKey, ip.Name, ip.Spec.IPAddress, ip.Spec.MacAddress, ip.Spec.Subnet, false); err != nil {
klog.Errorf("failed to init IPAM from IP CR %s: %v", ip.Name, err)
}
}

nodes, err := c.nodesLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list nodes: %v", err)
Expand All @@ -369,7 +390,8 @@ func (c *Controller) InitIPAM() error {
for _, node := range nodes {
if node.Annotations[util.AllocatedAnnotation] == "true" {
portName := fmt.Sprintf("node-%s", node.Name)
v4IP, v6IP, _, err := c.ipam.GetStaticAddress(portName, portName, node.Annotations[util.IpAddressAnnotation],
v4IP, v6IP, _, err := c.ipam.GetStaticAddress(portName, portName,
node.Annotations[util.IpAddressAnnotation],
node.Annotations[util.MacAddressAnnotation],
node.Annotations[util.LogicalSwitchAnnotation], true)
if err != nil {
Expand All @@ -379,12 +401,15 @@ func (c *Controller) InitIPAM() error {
node.Annotations[util.IpAddressAnnotation] = util.GetStringIP(v4IP, v6IP)
}

if err = c.initAppendNodeExternalIds(portName, node.Name); err != nil {
klog.Errorf("failed to init append node %s externalIds: %v", node.Name, err)
if _, ok := lspWithoutVendor[portName]; ok {
if err = c.initAppendLspExternalIds(portName, nil); err != nil {
klog.Errorf("failed to append external-ids for logical switch port %s: %v", portName, err)
}
}
}
}

klog.Infof("take %.2f seconds to initialize IPAM", time.Since(start).Seconds())
return nil
}

Expand Down Expand Up @@ -652,56 +677,18 @@ func (c *Controller) initNodeRoutes() error {
return nil
}

func (c *Controller) initAppendPodExternalIds(pod *v1.Pod) error {
if !isPodAlive(pod) {
return nil
}

podNets, err := c.getPodKubeovnNets(pod)
if err != nil {
klog.Errorf("failed to get pod nets %v", err)
return err
}

podName := c.getNameByPod(pod)
for _, podNet := range podNets {
if !strings.HasSuffix(podNet.ProviderName, util.OvnProvider) {
continue
}
portName := ovs.PodNameToPortName(podName, pod.Namespace, podNet.ProviderName)
externalIds, err := c.ovnClient.OvnGet("logical_switch_port", portName, "external_ids", "")
if err != nil {
klog.Errorf("failed to get lsp external_ids for pod %s/%s, %v", pod.Namespace, podName, err)
return err
}
if strings.Contains(externalIds, "pod") || strings.Contains(externalIds, "vendor") {
continue
}

ovnCommand := []string{"set", "logical_switch_port", portName, fmt.Sprintf("external_ids:pod=%s/%s", pod.Namespace, podName), fmt.Sprintf("external_ids:vendor=%s", util.CniTypeName)}
if err = c.ovnClient.SetLspExternalIds(ovnCommand); err != nil {
klog.Errorf("failed to set lsp external_ids for pod %s/%s, %v", pod.Namespace, podName, err)
return err
}
func (c *Controller) initAppendLspExternalIds(portName string, pod *v1.Pod) error {
externalIDs := make(map[string]string, 2)
externalIDs["vendor"] = util.CniTypeName
if pod != nil {
externalIDs["pod"] = fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
}
return nil
}

func (c *Controller) initAppendNodeExternalIds(portName, nodeName string) error {
externalIds, err := c.ovnClient.OvnGet("logical_switch_port", portName, "external_ids", "")
if err != nil {
klog.Errorf("failed to get lsp external_ids for node %s, %v", nodeName, err)
if err := c.ovnClient.SetLspExternalIds(portName, externalIDs); err != nil {
klog.Errorf("failed to set lsp external_ids for port %s: %v", portName, err)
return err
}
if strings.Contains(externalIds, "vendor") {
return nil
}

ovnCommand := []string{"set", "logical_switch_port", portName, fmt.Sprintf("external_ids:vendor=%s", util.CniTypeName)}
if err = c.ovnClient.SetLspExternalIds(ovnCommand); err != nil {
klog.Errorf("failed to set lsp external_ids for node %s, %v", nodeName, err)
return err
}
return nil
}

Expand Down
83 changes: 48 additions & 35 deletions pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (c *Controller) handleAddNode(key string) error {
return err
}

if err := c.createOrUpdateCrdIPs("", ipStr, mac, c.config.NodeSwitch, "", node.Name, "", ""); err != nil {
if err := c.createOrUpdateCrdIPs("", ipStr, mac, c.config.NodeSwitch, "", node.Name, "", "", nil); err != nil {
klog.Errorf("failed to create or update IPs node-%s: %v", key, err)
return err
}
Expand Down Expand Up @@ -586,7 +586,7 @@ func (c *Controller) handleUpdateNode(key string) error {
return nil
}

func (c *Controller) createOrUpdateCrdIPs(podName, ip, mac, subnetName, ns, nodeName, providerName, podType string) error {
func (c *Controller) createOrUpdateCrdIPs(podName, ip, mac, subnetName, ns, nodeName, providerName, podType string, existingCR **kubeovnv1.IP) error {
var key, ipName string
if subnetName == c.config.NodeSwitch {
key = nodeName
Expand All @@ -596,40 +596,50 @@ func (c *Controller) createOrUpdateCrdIPs(podName, ip, mac, subnetName, ns, node
ipName = ovs.PodNameToPortName(podName, ns, providerName)
}

v4IP, v6IP := util.SplitStringIP(ip)
ipCr, err := c.config.KubeOvnClient.KubeovnV1().IPs().Get(context.Background(), ipName, metav1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
_, err := c.config.KubeOvnClient.KubeovnV1().IPs().Create(context.Background(), &kubeovnv1.IP{
ObjectMeta: metav1.ObjectMeta{
Name: ipName,
Labels: map[string]string{
util.SubnetNameLabel: subnetName,
subnetName: "",
},
},
Spec: kubeovnv1.IPSpec{
PodName: key,
Subnet: subnetName,
NodeName: nodeName,
Namespace: ns,
IPAddress: ip,
V4IPAddress: v4IP,
V6IPAddress: v6IP,
MacAddress: mac,
AttachIPs: []string{},
AttachMacs: []string{},
AttachSubnets: []string{},
PodType: podType,
},
}, metav1.CreateOptions{})
if err != nil {
errMsg := fmt.Errorf("failed to create ip crd for %s, %v", ip, err)
var err error
var ipCr *kubeovnv1.IP
if existingCR != nil {
ipCr = *existingCR
} else {
ipCr, err = c.config.KubeOvnClient.KubeovnV1().IPs().Get(context.Background(), ipName, metav1.GetOptions{})
if err != nil {
if !k8serrors.IsNotFound(err) {
errMsg := fmt.Errorf("failed to get ip CR %s: %v", ipName, err)
klog.Error(errMsg)
return errMsg
}
} else {
errMsg := fmt.Errorf("failed to get ip crd for %s, %v", ip, err)
// the returned pointer is not nil if the CR does not exist
ipCr = nil
}
}

v4IP, v6IP := util.SplitStringIP(ip)
if ipCr == nil {
_, err = c.config.KubeOvnClient.KubeovnV1().IPs().Create(context.Background(), &kubeovnv1.IP{
ObjectMeta: metav1.ObjectMeta{
Name: ipName,
Labels: map[string]string{
util.SubnetNameLabel: subnetName,
subnetName: "",
},
},
Spec: kubeovnv1.IPSpec{
PodName: key,
Subnet: subnetName,
NodeName: nodeName,
Namespace: ns,
IPAddress: ip,
V4IPAddress: v4IP,
V6IPAddress: v6IP,
MacAddress: mac,
AttachIPs: []string{},
AttachMacs: []string{},
AttachSubnets: []string{},
PodType: podType,
},
}, metav1.CreateOptions{})
if err != nil {
errMsg := fmt.Errorf("failed to create ip CR %s: %v", ipName, err)
klog.Error(errMsg)
return errMsg
}
Expand All @@ -650,14 +660,17 @@ func (c *Controller) createOrUpdateCrdIPs(podName, ip, mac, subnetName, ns, node
newIpCr.Spec.V4IPAddress = v4IP
newIpCr.Spec.V6IPAddress = v6IP
newIpCr.Spec.MacAddress = mac
newIpCr.Spec.ContainerID = ""
newIpCr.Spec.AttachIPs = []string{}
newIpCr.Spec.AttachMacs = []string{}
newIpCr.Spec.AttachSubnets = []string{}
newIpCr.Spec.PodType = podType
if reflect.DeepEqual(newIpCr.Labels, ipCr.Labels) && reflect.DeepEqual(newIpCr.Spec, ipCr.Spec) {
return nil
}

_, err := c.config.KubeOvnClient.KubeovnV1().IPs().Update(context.Background(), newIpCr, metav1.UpdateOptions{})
if err != nil {
errMsg := fmt.Errorf("failed to update ips cr %s: %v", newIpCr.Name, err)
errMsg := fmt.Errorf("failed to update ip CR %s: %v", newIpCr.Name, err)
klog.Error(errMsg)
return errMsg
}
Expand Down
17 changes: 8 additions & 9 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func (c *Controller) handleAddPod(key string) error {

podType := getPodType(pod)
podName := c.getNameByPod(pod)
if err := c.createOrUpdateCrdIPs(podName, ipStr, mac, subnet.Name, pod.Namespace, pod.Spec.NodeName, podNet.ProviderName, podType); err != nil {
if err := c.createOrUpdateCrdIPs(podName, ipStr, mac, subnet.Name, pod.Namespace, pod.Spec.NodeName, podNet.ProviderName, podType, nil); err != nil {
klog.Errorf("failed to create IP %s.%s: %v", podName, pod.Namespace, err)
}

Expand Down Expand Up @@ -1434,12 +1434,12 @@ func (c *Controller) isVmPodToDel(pod *v1.Pod, vmName string) bool {
}

func (c *Controller) getNameByPod(pod *v1.Pod) string {
podName := pod.Name
isVmPod, vmName := isVmPod(pod)
if isVmPod && c.config.EnableKeepVmIP {
podName = vmName
if c.config.EnableKeepVmIP {
if isVmPod, vmName := isVmPod(pod); isVmPod {
return vmName
}
}
return podName
return pod.Name
}

func (c *Controller) getNsAvailableSubnets(pod *v1.Pod) ([]*kubeovnNet, error) {
Expand Down Expand Up @@ -1489,9 +1489,8 @@ func (c *Controller) getNsAvailableSubnets(pod *v1.Pod) ([]*kubeovnNet, error) {
}

func getPodType(pod *v1.Pod) string {
var podType string
if ok, _ := isStatefulSetPod(pod); ok {
podType = "StatefulSet"
return "StatefulSet"
}
return podType
return ""
}

0 comments on commit 2e681af

Please sign in to comment.