Skip to content

Commit

Permalink
feature: support custom vpc
Browse files Browse the repository at this point in the history
  • Loading branch information
fanriming committed Nov 4, 2020
1 parent 9d821bc commit 99217ce
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 30 deletions.
10 changes: 10 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package controller

import (
"sync"

kubeovnv1 "github.com/alauda/kube-ovn/pkg/apis/kubeovn/v1"
kubeovninformer "github.com/alauda/kube-ovn/pkg/client/informers/externalversions"
kubeovnlister "github.com/alauda/kube-ovn/pkg/client/listers/kubeovn/v1"
Expand Down Expand Up @@ -34,6 +36,7 @@ const controllerAgentName = "ovn-controller"
// Controller is kube-ovn main controller that watch ns/pod/node/svc/ep and operate ovn
type Controller struct {
config *Configuration
vpcs *sync.Map
ovnClient *ovs.Client
ipam *ovnipam.IPAM

Expand Down Expand Up @@ -131,6 +134,7 @@ func NewController(config *Configuration) *Controller {

controller := &Controller{
config: config,
vpcs: &sync.Map{},
ovnClient: ovs.NewClient(config.OvnNbHost, config.OvnNbPort, config.OvnTimeout, config.OvnSbHost, config.OvnSbPort, config.ClusterRouter, config.ClusterTcpLoadBalancer, config.ClusterUdpLoadBalancer, config.ClusterTcpSessionLoadBalancer, config.ClusterUdpSessionLoadBalancer, config.NodeSwitch, config.NodeSwitchCIDR),
ipam: ovnipam.NewIPAM(),

Expand Down Expand Up @@ -192,6 +196,12 @@ func NewController(config *Configuration) *Controller {
kubeovnInformerFactory: kubeovnInformerFactory,
}

controller.vpcs.Store("default", &Vpc{
Default: true,
Name: "default",
DefaultLogicalSwitch: config.DefaultLogicalSwitch,
Router: config.ClusterRouter})

podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueAddPod,
DeleteFunc: controller.enqueueDeletePod,
Expand Down
32 changes: 25 additions & 7 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package controller
import (
"encoding/json"
"fmt"
"github.com/alauda/kube-ovn/pkg/ipam"
"net"
"reflect"
"strconv"
"strings"
"time"

"github.com/alauda/kube-ovn/pkg/ipam"

kubeovnv1 "github.com/alauda/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/alauda/kube-ovn/pkg/ovs"
"github.com/alauda/kube-ovn/pkg/util"
Expand Down Expand Up @@ -425,12 +426,23 @@ func (c *Controller) handleDeletePod(key string) error {
return nil
}

subnet, err := c.getPodDefaultSubnet(pod)
if err != nil {
klog.Errorf("failed to get pod subnet, %v", err)
return err
}
vpc, err := c.parseSubnetVpc(subnet)
if err != nil {
klog.Errorf("failed to get vpc ubnet, %v", err)
return err
}

ips, _ := c.ipam.GetPodAddress(key)
for _, ip := range ips {
if err := c.ovnClient.DeleteStaticRoute(ip, c.config.ClusterRouter); err != nil {
if err := c.ovnClient.DeleteStaticRoute(ip, vpc.Router); err != nil {
return err
}
if err := c.ovnClient.DeleteNatRule(ip, c.config.ClusterRouter); err != nil {
if err := c.ovnClient.DeleteNatRule(ip, vpc.Router); err != nil {
return err
}
}
Expand Down Expand Up @@ -475,8 +487,14 @@ func (c *Controller) handleUpdatePod(key string) error {
klog.Errorf("failed to get subnet %v", err)
return err
}
vpc, err := c.parseSubnetVpc(subnet)
if err != nil {
klog.Errorf("failed to get vpc %v", err)
return err
}

if !subnet.Spec.UnderlayGateway {
if pod.Annotations[util.EipAnnotation] != "" {
if pod.Annotations[util.EipAnnotation] != "" && vpc.Default {
cm, err := c.configMapsLister.ConfigMaps("kube-system").Get(util.ExternalGatewayConfig)
if err != nil {
klog.Errorf("failed to get ex-gateway config, %v", err)
Expand All @@ -498,7 +516,7 @@ func (c *Controller) handleUpdatePod(key string) error {
klog.Errorf("failed to add nat rules, %v", err)
return err
}
} else if pod.Annotations[util.SnatAnnotation] != "" {
} else if pod.Annotations[util.SnatAnnotation] != "" && vpc.Default {
cm, err := c.configMapsLister.ConfigMaps("kube-system").Get(util.ExternalGatewayConfig)
if err != nil {
klog.Errorf("failed to get ex-gateway config, %v", err)
Expand All @@ -521,11 +539,11 @@ func (c *Controller) handleUpdatePod(key string) error {
return err
}
} else if pod.Annotations[util.NorthGatewayAnnotation] != "" {
if err := c.ovnClient.AddStaticRoute(ovs.PolicySrcIP, podIP, pod.Annotations[util.NorthGatewayAnnotation], c.config.ClusterRouter); err != nil {
if err := c.ovnClient.AddStaticRoute(ovs.PolicySrcIP, podIP, pod.Annotations[util.NorthGatewayAnnotation], vpc.Router); err != nil {
klog.Errorf("failed to add static route, %v", err)
return err
}
} else {
} else if vpc.Default {
if subnet.Spec.GatewayType == kubeovnv1.GWDistributedType {
node, err := c.nodesLister.Get(pod.Spec.NodeName)
if err != nil {
Expand Down
94 changes: 84 additions & 10 deletions pkg/controller/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package controller

import (
"fmt"
kubeovnv1 "github.com/alauda/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/alauda/kube-ovn/pkg/ovs"
"github.com/alauda/kube-ovn/pkg/util"
"net"
"reflect"
"strconv"
"strings"
"time"

kubeovnv1 "github.com/alauda/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/alauda/kube-ovn/pkg/ovs"
"github.com/alauda/kube-ovn/pkg/util"

v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -251,9 +252,12 @@ func formatSubnet(subnet *kubeovnv1.Subnet, c *Controller) error {
subnet.Spec.GatewayType = kubeovnv1.GWDistributedType
changed = true
}
if subnet.Spec.Default && subnet.Name != c.config.DefaultLogicalSwitch {
subnet.Spec.Default = false
changed = true
vpc, err := c.parseSubnetVpc(subnet)
if err == nil && vpc.Default {
if subnet.Spec.Default && subnet.Name != vpc.DefaultLogicalSwitch {
subnet.Spec.Default = false
changed = true
}
}
if subnet.Spec.Gateway == "" {
gw, err := util.FirstSubnetIP(subnet.Spec.CIDRBlock)
Expand Down Expand Up @@ -297,6 +301,26 @@ func formatSubnet(subnet *kubeovnv1.Subnet, c *Controller) error {
}
}

if len(subnet.Labels) == 0 {
subnet.Labels = make(map[string]string)
}
if _, labelSet := subnet.Labels[util.VpcNameLabel]; !labelSet {
if vpcName, ok := subnet.Annotations[util.CustomVpcAnnotation]; ok {
subnet.Labels[util.VpcNameLabel] = vpcName

}
subnet.Labels[util.VpcNameLabel] = "default"
changed = true
}

if _, labelSet := subnet.Labels[util.VpcDefaultSubnetLabel]; !labelSet {
if vpcDefaultSubnet, ok := subnet.Annotations[util.CustomVpcDefaultSubnetAnnotation]; ok {
subnet.Labels[util.VpcDefaultSubnetLabel] = vpcDefaultSubnet
}
subnet.Labels[util.VpcDefaultSubnetLabel] = c.config.DefaultLogicalSwitch
changed = true
}

if changed {
_, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Update(subnet)
if err != nil {
Expand Down Expand Up @@ -404,11 +428,34 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
c.patchSubnetStatus(subnet, "ValidateLogicalSwitchSuccess", "")
}

subnetList, err := c.subnetsLister.List(labels.Everything())
// init and check vpc
vpc, err := c.parseSubnetVpc(subnet)
if err != nil {
vpcName, customVpc := subnet.Annotations[util.CustomVpcAnnotation]
_, isVpcDefaultSubnet := subnet.Annotations[util.CustomVpcDefaultSubnetAnnotation]
if customVpc {
if isVpcDefaultSubnet {
// init vpc
vpc, err = c.initCustomVpc(vpcName, subnet.Name)
if err != nil {
klog.Errorf("failed to init vpc %v", err)
return err
}
} else {
err = fmt.Errorf("vpc default subnet not found")
klog.Error(err)
return err
}
}
}

labelSelector := labels.SelectorFromSet(map[string]string{util.VpcNameLabel: vpc.Name})
subnetList, err := c.subnetsLister.List(labelSelector)
if err != nil {
klog.Errorf("failed to list subnets %v", err)
return err
}

for _, sub := range subnetList {
if sub.Name != subnet.Name && util.CIDRConflict(sub.Spec.CIDRBlock, subnet.Spec.CIDRBlock) {
err = fmt.Errorf("subnet %s cidr %s conflict with subnet %s cidr %s", subnet.Name, subnet.Spec.CIDRBlock, sub.Name, sub.Spec.CIDRBlock)
Expand Down Expand Up @@ -446,18 +493,18 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
if !exist {
subnet.Status.EnsureStandardConditions()
// If multiple namespace use same ls name, only first one will success
if err := c.ovnClient.CreateLogicalSwitch(subnet.Name, subnet.Spec.Protocol, subnet.Spec.CIDRBlock, subnet.Spec.Gateway, subnet.Spec.ExcludeIps, subnet.Spec.UnderlayGateway); err != nil {
if err := c.ovnClient.CreateLogicalSwitch(subnet.Name, vpc.Router, subnet.Spec.Protocol, subnet.Spec.CIDRBlock, subnet.Spec.Gateway, subnet.Spec.ExcludeIps, subnet.Spec.UnderlayGateway, vpc.Default); err != nil {
c.patchSubnetStatus(subnet, "CreateLogicalSwitchFailed", err.Error())
return err
}
} else {
// logical switch exists, only update other_config
if err := c.ovnClient.SetLogicalSwitchConfig(subnet.Name, subnet.Spec.Protocol, subnet.Spec.CIDRBlock, subnet.Spec.Gateway, subnet.Spec.ExcludeIps); err != nil {
if err := c.ovnClient.SetLogicalSwitchConfig(subnet.Name, vpc.Router, subnet.Spec.Protocol, subnet.Spec.CIDRBlock, subnet.Spec.Gateway, subnet.Spec.ExcludeIps); err != nil {
c.patchSubnetStatus(subnet, "SetLogicalSwitchConfigFailed", err.Error())
return err
}
if subnet.Spec.UnderlayGateway {
if err := c.ovnClient.RemoveRouterPort(subnet.Name); err != nil {
if err := c.ovnClient.RemoveRouterPort(subnet.Name, vpc.Router); err != nil {
klog.Errorf("failed to remove router port from %s, %v", subnet.Name, err)
return err
}
Expand Down Expand Up @@ -508,6 +555,33 @@ func (c *Controller) handleDeleteRoute(key string) error {
}

func (c *Controller) handleDeleteSubnet(key string) error {
subnet, err := c.subnetsLister.Get(key)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}

vpc, err := c.parseSubnetVpc(subnet)
isCustomVpcDefaultSubnet := false
if err == nil && !vpc.Default && vpc.DefaultLogicalSwitch == subnet.Name {
isCustomVpcDefaultSubnet = true
}

if isCustomVpcDefaultSubnet {
labelSelector := labels.SelectorFromSet(labels.Set{util.VpcNameLabel: vpc.Name})
subnetList, err := c.subnetsLister.List(labelSelector)
if err != nil {
klog.Errorf("failed to list subnets %v", err)
return err
}
if len(subnetList) > 1 {
// The default VPC subnet should be removed last
return fmt.Errorf("the default VPC subnet should be removed last")
}
}

c.ipam.DeleteSubnet(key)

exist, err := c.ovnClient.LogicalSwitchExists(key)
Expand Down
100 changes: 100 additions & 0 deletions pkg/controller/vpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package controller

import (
"fmt"

v1 "github.com/alauda/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/alauda/kube-ovn/pkg/util"
"k8s.io/klog"
)

type Vpc struct {
Default bool
Name string
DefaultLogicalSwitch string
Router string
TcpLoadBalancer string
UdpLoadBalancer string
TcpSessionLoadBalancer string
UdpSessionLoadBalancer string
}

func (c *Controller) initCustomVpc(name, defaultLs string) (*Vpc, error) {
vpc := &Vpc{
Name: name,
DefaultLogicalSwitch: defaultLs,
Router: fmt.Sprintf("ovn-vpc-%s", name),
TcpLoadBalancer: fmt.Sprintf("vpc-%s-tcp-load", name),
UdpLoadBalancer: fmt.Sprintf("vpc-%s-udp-load", name),
TcpSessionLoadBalancer: fmt.Sprintf("vpc-%s-tcp-sess", name),
UdpSessionLoadBalancer: fmt.Sprintf("vpc-%s-udp-sess", name),
}

if err := c.createVpcRouter(vpc.Router); err != nil {
klog.Errorf("init router failed %v", err)
return nil, err
}
c.vpcs.Store(name, vpc)
return vpc, nil
}

func (c *Controller) deleteCustomVpc(name string) error {
vpcObj, ok := c.vpcs.Load(name)
if !ok {
err := fmt.Errorf("vpc '%s' not found", name)
klog.Error(err)
return err
}
vpc := vpcObj.(*Vpc)
err := c.deleteVpcRouter(vpc.Router)
if err != nil {
klog.Errorf("delete router failed %v", err)
return err
}
return nil
}

// createVpcRouter create router to connect logical switches in vpc
func (c *Controller) createVpcRouter(lr string) error {
lrs, err := c.ovnClient.ListLogicalRouter()
if err != nil {
return err
}
klog.Infof("exists routers %v", lrs)
for _, r := range lrs {
if lr == r {
return nil
}
}
return c.ovnClient.CreateLogicalRouter(lr)
}

// deleteVpcRouter delete router to connect logical switches in vpc
func (c *Controller) deleteVpcRouter(lr string) error {
lrs, err := c.ovnClient.ListLogicalRouter()
if err != nil {
return err
}
klog.Infof("exists routers %v", lrs)
for _, r := range lrs {
if lr == r {
return nil
}
}
return c.ovnClient.DeleteLogicalRouter(lr)
}

func (c *Controller) parseSubnetVpc(subnet *v1.Subnet) (*Vpc, error) {
vpcName, customVpc := subnet.Annotations[util.CustomVpcAnnotation]
if !customVpc {
vpcName = "default"
}

vpc, ok := c.vpcs.Load(vpcName)
if !ok {
err := fmt.Errorf("vpc %s not found", vpcName)
klog.Error(err)
return nil, err
}
return vpc.(*Vpc), nil
}

0 comments on commit 99217ce

Please sign in to comment.