Skip to content

Commit

Permalink
feat: update provider network via node annotation
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangzujian committed Feb 14, 2022
1 parent 57f1657 commit 3f818b7
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 24 deletions.
53 changes: 53 additions & 0 deletions pkg/controller/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,12 @@ func (c *Controller) initDefaultProviderNetwork() error {
return err
}

nodes, err := c.nodesLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to get nodes: %v", err)
return err
}

pn := kubeovnv1.ProviderNetwork{
ObjectMeta: metav1.ObjectMeta{
Name: c.config.DefaultProviderName,
Expand All @@ -396,6 +402,53 @@ func (c *Controller) initDefaultProviderNetwork() error {
},
}

excludeAnno := fmt.Sprintf(util.ProviderNetworkExcludeTemplate, c.config.DefaultProviderName)
interfaceAnno := fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, c.config.DefaultProviderName)
newNodes := make([]*v1.Node, 0, len(nodes))
for _, node := range nodes {
if len(node.Annotations) == 0 {
continue
}

var newNode *v1.Node
if node.Annotations[excludeAnno] == "true" {
pn.Spec.ExcludeNodes = append(pn.Spec.ExcludeNodes, node.Name)
newNode = node.DeepCopy()
} else if s := node.Annotations[interfaceAnno]; s != "" {
var index int
for index = range pn.Spec.CustomInterfaces {
if pn.Spec.CustomInterfaces[index].Interface == s {
break
}
}
if index != len(pn.Spec.CustomInterfaces) {
pn.Spec.CustomInterfaces[index].Nodes = append(pn.Spec.CustomInterfaces[index].Nodes, node.Name)
} else {
ci := kubeovnv1.CustomInterface{Interface: s, Nodes: []string{node.Name}}
pn.Spec.CustomInterfaces = append(pn.Spec.CustomInterfaces, ci)
}
newNode = node.DeepCopy()
}
if newNode != nil {
delete(newNode.Annotations, excludeAnno)
delete(newNode.Annotations, interfaceAnno)
newNodes = append(newNodes, newNode)
}
}

defer func() {
if err == nil {
return
}

// update nodes only when provider network has been created successfully
for _, node := range newNodes {
if _, err := c.config.KubeClient.CoreV1().Nodes().Update(context.Background(), node, metav1.UpdateOptions{}); err != nil {
klog.Errorf("failed to update node %s: %v", node.Name, err)
}
}
}()

_, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().Create(context.Background(), &pn, metav1.CreateOptions{})
return err
}
Expand Down
153 changes: 129 additions & 24 deletions pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"reflect"
"strings"
"time"

Expand Down Expand Up @@ -55,7 +56,7 @@ func (c *Controller) enqueueUpdateNode(oldObj, newObj interface{}) {
newNode := newObj.(*v1.Node)

if nodeReady(oldNode) != nodeReady(newNode) ||
oldNode.Annotations[util.ChassisAnnotation] != newNode.Annotations[util.ChassisAnnotation] {
!reflect.DeepEqual(oldNode.Annotations, newNode.Annotations) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(newObj); err != nil {
Expand Down Expand Up @@ -249,29 +250,10 @@ func (c *Controller) handleAddNode(key string) error {
klog.Errorf("failed to create address set for node %s: %v", node.Name, err)
return err
}

providerNetworks, err := c.providerNetworksLister.List(labels.Everything())
if err != nil && !k8serrors.IsNotFound(err) {
klog.Errorf("failed to list provider networks: %v", err)
if err = c.handleNodeAnnotationsForProviderNetworks(node); err != nil {
klog.Errorf("failed to handle annotations of node %s for provider networks: %v", node.Name, err)
return err
}
for _, pn := range providerNetworks {
if !util.ContainsString(pn.Spec.ExcludeNodes, node.Name) {
status := pn.Status.DeepCopy()
if status.EnsureNodeStandardConditions(key) {
bytes, err := status.Bytes()
if err != nil {
klog.Error(err)
return err
}
_, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().Patch(context.Background(), pn.Name, types.MergePatchType, bytes, metav1.PatchOptions{})
if err != nil {
klog.Errorf("failed to patch provider network %s: %v", pn.Name, err)
return err
}
}
}
}

subnet, err := c.subnetsLister.Get(c.config.NodeSwitch)
if err != nil {
Expand Down Expand Up @@ -370,6 +352,91 @@ func (c *Controller) handleAddNode(key string) error {
return nil
}

func (c *Controller) handleNodeAnnotationsForProviderNetworks(node *v1.Node) error {
providerNetworks, err := c.providerNetworksLister.List(labels.Everything())
if err != nil && !k8serrors.IsNotFound(err) {
klog.Errorf("failed to list provider networks: %v", err)
return err
}

for _, pn := range providerNetworks {
excludeAnno := fmt.Sprintf(util.ProviderNetworkExcludeTemplate, pn.Name)
interfaceAnno := fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, pn.Name)

var newPn *kubeovnv1.ProviderNetwork
excluded := util.ContainsString(pn.Spec.ExcludeNodes, node.Name)
if !excluded && len(node.Annotations) != 0 && node.Annotations[excludeAnno] == "true" {
newPn = pn.DeepCopy()
newPn.Spec.ExcludeNodes = append(newPn.Spec.ExcludeNodes, node.Name)
excluded = true
}

var customInterface string
for _, v := range pn.Spec.CustomInterfaces {
if util.ContainsString(v.Nodes, node.Name) {
customInterface = v.Interface
break
}
}
if customInterface == "" && len(node.Annotations) != 0 {
if customInterface = node.Annotations[interfaceAnno]; customInterface != "" {
if newPn == nil {
newPn = pn.DeepCopy()
}
var index int
for index = range newPn.Spec.CustomInterfaces {
if newPn.Spec.CustomInterfaces[index].Interface == customInterface {
break
}
}
if index != len(newPn.Spec.CustomInterfaces) {
newPn.Spec.CustomInterfaces[index].Nodes = append(newPn.Spec.CustomInterfaces[index].Nodes, node.Name)
} else {
ci := kubeovnv1.CustomInterface{Interface: customInterface, Nodes: []string{node.Name}}
newPn.Spec.CustomInterfaces = append(newPn.Spec.CustomInterfaces, ci)
}
}
}

if newPn != nil {
if _, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().Update(context.Background(), newPn, metav1.UpdateOptions{}); err != nil {
klog.Errorf("failed to update provider network %s: %v", pn.Name, err)
return err
}
}

if len(node.Annotations) != 0 {
newNode := node.DeepCopy()
delete(newNode.Annotations, excludeAnno)
delete(newNode.Annotations, interfaceAnno)
if len(newNode.Annotations) != len(node.Annotations) {
if _, err = c.config.KubeClient.CoreV1().Nodes().Update(context.Background(), newNode, metav1.UpdateOptions{}); err != nil {
klog.Errorf("failed to update node %s: %v", node.Name, err)
return err
}
}
}

if excluded {
status := pn.Status.DeepCopy()
if status.EnsureNodeStandardConditions(node.Name) {
bytes, err := status.Bytes()
if err != nil {
klog.Error(err)
return err
}
_, err = c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().Patch(context.Background(), pn.Name, types.MergePatchType, bytes, metav1.PatchOptions{})
if err != nil {
klog.Errorf("failed to patch provider network %s: %v", pn.Name, err)
return err
}
}
}
}

return nil
}

func (c *Controller) handleDeleteNode(key string) error {
portName := fmt.Sprintf("node-%s", key)
if err := c.ovnClient.DeleteLogicalSwitchPort(portName); err != nil {
Expand Down Expand Up @@ -420,15 +487,16 @@ func (c *Controller) handleDeleteNode(key string) error {
}

for _, pn := range providerNetworks {
if err = c.updateProviderNetworkStatusForNodeDeletion(pn, key); err != nil {
if err = c.updateProviderNetworkForNodeDeletion(pn, key); err != nil {
return err
}
}

return nil
}

func (c *Controller) updateProviderNetworkStatusForNodeDeletion(pn *kubeovnv1.ProviderNetwork, node string) error {
func (c *Controller) updateProviderNetworkForNodeDeletion(pn *kubeovnv1.ProviderNetwork, node string) error {
// update provider network status
status := pn.Status.DeepCopy()
if util.ContainsString(status.ReadyNodes, node) {
status.ReadyNodes = util.RemoveString(status.ReadyNodes, node)
Expand Down Expand Up @@ -474,6 +542,37 @@ func (c *Controller) updateProviderNetworkStatusForNodeDeletion(pn *kubeovnv1.Pr
}
}

// update provider network spec
var newPn *kubeovnv1.ProviderNetwork
if excludeNodes := util.RemoveString(pn.Spec.ExcludeNodes, node); len(excludeNodes) != len(pn.Spec.ExcludeNodes) {
newPn := pn.DeepCopy()
newPn.Spec.ExcludeNodes = excludeNodes
}

var changed bool
customInterfaces := make([]kubeovnv1.CustomInterface, 0, len(pn.Spec.CustomInterfaces))
for _, ci := range pn.Spec.CustomInterfaces {
nodes := util.RemoveString(ci.Nodes, node)
if !changed {
changed = len(nodes) == 0 || len(nodes) != len(ci.Nodes)
}
if len(nodes) != 0 {
customInterfaces = append(customInterfaces, kubeovnv1.CustomInterface{Interface: ci.Interface, Nodes: nodes})
}
}
if changed {
if newPn == nil {
newPn = pn.DeepCopy()
}
newPn.Spec.CustomInterfaces = customInterfaces
}
if newPn != nil {
if _, err := c.config.KubeOvnClient.KubeovnV1().ProviderNetworks().Update(context.Background(), newPn, metav1.UpdateOptions{}); err != nil {
klog.Errorf("failed to update provider network %s: %v", pn.Name, err)
return err
}
}

return nil
}

Expand All @@ -485,6 +584,12 @@ func (c *Controller) handleUpdateNode(key string) error {
}
return err
}

if err = c.handleNodeAnnotationsForProviderNetworks(node); err != nil {
klog.Errorf("failed to handle annotations of node %s for provider networks: %v", node.Name, err)
return err
}

subnets, err := c.subnetsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to get subnets %v", err)
Expand Down
40 changes: 40 additions & 0 deletions test/e2e/underlay/underlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"os/exec"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -171,6 +172,45 @@ var _ = Describe("[Underlay]", func() {
}
}
})

It("node annotation", func() {
By("add exclude annotation")
nodes, err := f.KubeClientSet.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())

for _, node := range nodes.Items {
newNode := node.DeepCopy()
newNode.Annotations[fmt.Sprintf(util.ProviderNetworkExcludeTemplate, ProviderNetwork)] = "true"
_, err = f.KubeClientSet.CoreV1().Nodes().Update(context.Background(), newNode, metav1.UpdateOptions{})
Expect(err).NotTo(HaveOccurred())
}

time.Sleep(3 * time.Second)

By("validate provider network")
pn, err := f.OvnClientSet.KubeovnV1().ProviderNetworks().Get(context.Background(), ProviderNetwork, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
for _, node := range nodes.Items {
Expect(util.ContainsString(pn.Spec.ExcludeNodes, node.Name)).To(BeTrue())
}

By("validate node annotation")
nodes, err = f.KubeClientSet.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())

for _, node := range nodes.Items {
Expect(node.Annotations).NotTo(HaveKey(fmt.Sprintf(util.ProviderNetworkExcludeTemplate, ProviderNetwork)))
}

By("restore provider network")
pn, err = f.OvnClientSet.KubeovnV1().ProviderNetworks().Get(context.Background(), ProviderNetwork, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())

newPn := pn.DeepCopy()
newPn.Spec.ExcludeNodes = nil
_, err = f.OvnClientSet.KubeovnV1().ProviderNetworks().Update(context.Background(), newPn, metav1.UpdateOptions{})
Expect(err).NotTo(HaveOccurred())
})
})

Context("[Subnet]", func() {
Expand Down

0 comments on commit 3f818b7

Please sign in to comment.