Skip to content

Commit

Permalink
feat: add subnet annotation to ns and automatically unbind ns from su…
Browse files Browse the repository at this point in the history
…bnet.
  • Loading branch information
oilbeater committed Aug 13, 2019
1 parent 948e130 commit fde683e
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 6 deletions.
13 changes: 9 additions & 4 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ type Controller struct {
deleteSubnetQueue workqueue.RateLimitingInterface
updateSubnetQueue workqueue.RateLimitingInterface

namespacesLister v1.NamespaceLister
namespacesSynced cache.InformerSynced
namespacesLister v1.NamespaceLister
namespacesSynced cache.InformerSynced
addNamespaceQueue workqueue.RateLimitingInterface

nodesLister v1.NodeLister
nodesSynced cache.InformerSynced
Expand Down Expand Up @@ -121,8 +122,9 @@ func NewController(config *Configuration) *Controller {
deletePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeletePod"),
updatePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdatePod"),

namespacesLister: namespaceInformer.Lister(),
namespacesSynced: namespaceInformer.Informer().HasSynced,
namespacesLister: namespaceInformer.Lister(),
namespacesSynced: namespaceInformer.Informer().HasSynced,
addNamespaceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddNamespace"),

nodesLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
Expand Down Expand Up @@ -204,6 +206,8 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {
defer c.deletePodQueue.ShutDown()
defer c.updatePodQueue.ShutDown()

defer c.addNamespaceQueue.ShutDown()

defer c.addSubnetQueue.ShutDown()
defer c.updateSubnetQueue.ShutDown()
defer c.deleteSubnetQueue.ShutDown()
Expand Down Expand Up @@ -254,6 +258,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {
time.Sleep(3 * time.Second)

go wait.Until(c.runAddIpPoolPodWorker, time.Second, stopCh)
go wait.Until(c.runAddNamespaceWorker, time.Second, stopCh)
for i := 0; i < c.config.WorkerNum; i++ {
go wait.Until(c.runAddPodWorker, time.Second, stopCh)
go wait.Until(c.runDeletePodWorker, time.Second, stopCh)
Expand Down
111 changes: 110 additions & 1 deletion pkg/controller/namespace.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
package controller

import (
"encoding/json"
"fmt"
"reflect"

"github.com/alauda/kube-ovn/pkg/util"
v1 "k8s.io/api/core/v1"
"reflect"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
)

func (c *Controller) enqueueAddNamespace(obj interface{}) {
Expand All @@ -14,6 +23,13 @@ func (c *Controller) enqueueAddNamespace(obj interface{}) {
for _, np := range c.namespaceMatchNetworkPolicies(ns) {
c.updateNpQueue.AddRateLimited(np)
}
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
c.addNamespaceQueue.AddRateLimited(key)
}

func (c *Controller) enqueueDeleteNamespace(obj interface{}) {
Expand Down Expand Up @@ -45,3 +61,96 @@ func (c *Controller) enqueueUpdateNamespace(old, new interface{}) {
}
}
}

func (c *Controller) runAddNamespaceWorker() {
for c.processNextAddNamespaceWorkItem() {
}
}

func (c *Controller) processNextAddNamespaceWorkItem() bool {
obj, shutdown := c.addNamespaceQueue.Get()

if shutdown {
return false
}

err := func(obj interface{}) error {
defer c.addNamespaceQueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.addNamespaceQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.handleAddNamespace(key); err != nil {
c.addNamespaceQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.addNamespaceQueue.Forget(obj)
return nil
}(obj)

if err != nil {
utilruntime.HandleError(err)
return true
}

return true
}

func (c *Controller) handleAddNamespace(key string) error {
namespace, err := c.namespacesLister.Get(key)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
subnet, err := c.subnetsLister.Get(c.config.DefaultLogicalSwitch)
if err != nil {
klog.Errorf("failed to get default subnet %v", err)
return err
}
subnets, err := c.subnetsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list subnets %v", err)
return err
}
for _, s := range subnets {
for _, ns := range s.Spec.Namespaces {
if ns == key {
subnet = s
break
}
}
}

op := "replace"
if namespace.Annotations == nil {
op = "add"
namespace.Annotations = map[string]string{}
} else {
if namespace.Annotations[util.LogicalSwitchAnnotation] == subnet.Name &&
namespace.Annotations[util.CidrAnnotation] == subnet.Spec.CIDRBlock {
return nil
}
}

namespace.Annotations[util.LogicalSwitchAnnotation] = subnet.Name
namespace.Annotations[util.CidrAnnotation] = subnet.Spec.CIDRBlock
patchPayloadTemplate :=
`[{
"op": "%s",
"path": "/metadata/annotations",
"value": %s
}]`

raw, _ := json.Marshal(namespace.Annotations)
patchPayload := fmt.Sprintf(patchPayloadTemplate, op, raw)
_, err = c.config.KubeClient.CoreV1().Namespaces().Patch(key, types.JSONPatchType, []byte(patchPayload))
if err != nil {
klog.Errorf("patch namespace %s failed %v", key, err)
}
return err
}
69 changes: 68 additions & 1 deletion pkg/controller/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ func (c *Controller) handleAddSubnet(key string) error {
}
}

if err := c.reconcileSubnet(subnet); err != nil {
klog.Errorf("failed to reconcile subnet %s, %v", subnet.Name, err)
return err
}

if subnet.Spec.Private {
return c.ovnClient.SetPrivateLogicalSwitch(subnet.Name, subnet.Spec.Protocol, subnet.Spec.AllowSubnets)
}
Expand All @@ -275,9 +280,9 @@ func (c *Controller) handleUpdateSubnet(key string) error {
return err
}
if !exist {
c.addSubnetQueue.AddRateLimited(key)
return nil
}

if err = util.ValidateSubnet(*subnet); err != nil {
klog.Error(err)
subnet.TypeMeta.Kind = "Subnet"
Expand All @@ -286,6 +291,11 @@ func (c *Controller) handleUpdateSubnet(key string) error {
return err
}

if err := c.reconcileSubnet(subnet); err != nil {
klog.Errorf("failed to reconcile subnet %s, %v", subnet.Name, err)
return err
}

if subnet.Spec.Private {
return c.ovnClient.SetPrivateLogicalSwitch(subnet.Name, subnet.Spec.Protocol, subnet.Spec.AllowSubnets)
}
Expand Down Expand Up @@ -315,3 +325,60 @@ func (c *Controller) handleDeleteSubnet(key string) error {
}
return nil
}

func (c *Controller) reconcileSubnet(subnet *kubeovnv1.Subnet) error {
// 1. unbind from previous subnet
subnets, err := c.subnetsLister.List(labels.Everything())
if err != nil {
return err
}

namespaceMap := map[string]bool{}
for _, ns := range subnet.Spec.Namespaces {
namespaceMap[ns] = true
}

for _, sub := range subnets {
if sub.Name == subnet.Name || len(sub.Spec.Namespaces) == 0 {
continue
}

changed := false
reservedNamespaces := []string{}
for _, ns := range sub.Spec.Namespaces {
if namespaceMap[ns] {
changed = true
} else {
reservedNamespaces = append(reservedNamespaces, ns)
}
}
if changed {
sub.Spec.Namespaces = reservedNamespaces
_, err := c.config.KubeOvnClient.KubeovnV1().Subnets().Update(sub)
if err != nil {
klog.Errorf("failed to unbind namespace from subnet %s, %v", sub.Name, err)
return err
}
}
}

// 2. add annotations to bind namespace
for _, ns := range subnet.Spec.Namespaces {
c.addNamespaceQueue.AddRateLimited(ns)
}

// 3. update unbind namespace annotation
namespaces, err := c.namespacesLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list namespaces, %v", err)
return err
}

for _, ns := range namespaces {
if ns.Annotations != nil && ns.Annotations[util.LogicalSwitchAnnotation] == subnet.Name && !namespaceMap[ns.Name] {
c.addNamespaceQueue.AddRateLimited(ns.Name)
}
}

return nil
}

0 comments on commit fde683e

Please sign in to comment.