Skip to content

Commit

Permalink
feat: add basic allocation function for multus-cni
Browse files Browse the repository at this point in the history
This patch allow kube-ovn-controller combine multus-cni to allocate addresses for subnet providers other than ovn by annotate pods with allocated addresses. Users need to create subnet CRs and set the provider with <name>.<namespace> of NetworkAttachmentDefinition defined by multus-cni. Then kube-ovn-controller will combine the `k8s.v1.cni.cncf.io/networks` annotation and address information in subnet cr to determine an address.

This patch only implement the controller side logical, we need other patches to complete the cni part that communicates to apiserver and fetch the address information to the main cni.
  • Loading branch information
oilbeater committed Mar 15, 2020
1 parent 6a7eeb8 commit 1319eb5
Show file tree
Hide file tree
Showing 378 changed files with 28,250 additions and 12,298 deletions.
2 changes: 1 addition & 1 deletion cmd/cni/cni.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func init() {
}

func main() {
skel.PluginMain(cmdAdd, cmdDel, version.All)
skel.PluginMain(cmdAdd, nil, cmdDel, version.All, "")
}

func cmdAdd(args *skel.CmdArgs) error {
Expand Down
27 changes: 9 additions & 18 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,25 @@ module github.com/alauda/kube-ovn
go 1.12

require (
github.com/containernetworking/cni v0.6.0
github.com/containernetworking/plugins v0.7.5
github.com/coreos/go-iptables v0.4.0
github.com/containernetworking/cni v0.7.1
github.com/containernetworking/plugins v0.8.2
github.com/coreos/go-iptables v0.4.2
github.com/elazarl/goproxy v0.0.0-20190630181448-f1e96bc0f4c5 // indirect
github.com/elazarl/goproxy/ext v0.0.0-20190630181448-f1e96bc0f4c5 // indirect
github.com/emicklei/go-restful v2.9.5+incompatible
github.com/emicklei/go-restful v2.11.1+incompatible
github.com/go-ini/ini v1.42.0 // indirect
github.com/go-logr/zapr v0.1.1 // indirect
github.com/gogo/protobuf v1.2.1 // indirect
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef // indirect
github.com/golang/protobuf v1.3.1 // indirect
github.com/hashicorp/go-version v1.2.0 // indirect
github.com/hashicorp/golang-lru v0.5.1 // indirect
github.com/imdario/mergo v0.3.7 // indirect
github.com/intel/multus-cni v0.0.0-20200313031649-eaf6ff6e20bb
github.com/juju/errors v0.0.0-20190207033735-e65537c515d7
github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8 // indirect
github.com/juju/testing v0.0.0-20190613124551-e81189438503 // indirect
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/moul/http2curl v1.0.0 // indirect
github.com/onsi/ginkgo v1.8.0
github.com/onsi/gomega v1.5.0
github.com/onsi/ginkgo v1.10.1
github.com/onsi/gomega v1.7.0
github.com/parnurzeal/gorequest v0.2.15
github.com/projectcalico/felix v3.6.1+incompatible
github.com/projectcalico/go-json v0.0.0-20161128004156-6219dc7339ba // indirect
Expand All @@ -39,20 +35,15 @@ require (
github.com/sirupsen/logrus v1.4.2
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a // indirect
github.com/sparrc/go-ping v0.0.0-20190613174326-4e5b6552494c
github.com/spf13/pflag v1.0.3
github.com/spf13/pflag v1.0.5
github.com/vishvananda/netlink v1.0.0
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc // indirect
go.uber.org/zap v1.10.0 // indirect
golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8 // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
google.golang.org/grpc v1.21.1 // indirect
gopkg.in/ini.v1 v1.42.0 // indirect
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce // indirect
k8s.io/api v0.0.0-20190703205437-39734b2a72fe
k8s.io/apimachinery v0.0.0-20190703205208-4cfb76a8bf76
k8s.io/client-go v11.0.1-0.20190409021438-1a26190bd76a+incompatible
k8s.io/klog v0.3.1
k8s.io/kube-openapi v0.0.0-20190401085232-94e1e7b7574c // indirect
k8s.io/klog v1.0.0
k8s.io/sample-controller v0.0.0-20190326030654-b8f621986e45
k8s.io/utils v0.0.0-20190607212802-c55fbcfc754a // indirect
sigs.k8s.io/controller-runtime v0.2.0-alpha.0
Expand Down
183 changes: 183 additions & 0 deletions go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/apis/kubeovn/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type SubnetSpec struct {
CIDRBlock string `json:"cidrBlock"`
Gateway string `json:"gateway"`
ExcludeIps []string `json:"excludeIps,omitempty"`
Provider string `json:"provider,omitempty"`

GatewayType string `json:"gatewayType"`
GatewayNode string `json:"gatewayNode"`
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (c *Controller) initDefaultLogicalSwitch() error {
ObjectMeta: v1.ObjectMeta{Name: c.config.DefaultLogicalSwitch},
Spec: kubeovnv1.SubnetSpec{
Default: true,
Provider: util.OvnProvider,
CIDRBlock: c.config.DefaultCIDR,
Gateway: c.config.DefaultGateway,
ExcludeIps: strings.Split(c.config.DefaultExcludeIps, ","),
Expand Down Expand Up @@ -78,6 +79,7 @@ func (c *Controller) initNodeSwitch() error {
ObjectMeta: v1.ObjectMeta{Name: c.config.NodeSwitch},
Spec: kubeovnv1.SubnetSpec{
Default: false,
Provider: util.OvnProvider,
CIDRBlock: c.config.NodeSwitchCIDR,
Gateway: c.config.NodeSwitchGateway,
ExcludeIps: []string{c.config.NodeSwitchGateway},
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ func (c *Controller) handleDeleteNode(key string) error {
return err
}

ip, _, exist := c.ipam.GetPodAddress(portName)
if exist {
if err := c.ovnClient.DeleteStaticRouteByNextHop(ip); err != nil {
ips, _ := c.ipam.GetPodAddress(portName)
if len(ips) > 0 {
if err := c.ovnClient.DeleteStaticRouteByNextHop(ips[0]); err != nil {
return err
}
}
Expand Down
127 changes: 84 additions & 43 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,40 +306,48 @@ func (c *Controller) handleAddPod(key string) error {
return err
}

// Avoid create lsp for already running pod in ovn-nb when controller restart
if needCreateLogicalSwitchPort(pod) {
subnet, err := c.getPodSubnet(pod)
if err != nil {
return err
}
defaultSubnet, err := c.getPodDefaultSubnet(pod)
if err != nil {
return err
}

op := "replace"
if pod.Annotations == nil || len(pod.Annotations) == 0 {
op = "add"
pod.Annotations = map[string]string{}
}
attachmentSubnets, err := c.getPodAttachmentSubnet(pod)
if err != nil {
return err
}

ip, mac, err := c.acquireAddress(pod)
op := "replace"
if pod.Annotations == nil || len(pod.Annotations) == 0 {
op = "add"
pod.Annotations = map[string]string{}
}

// Avoid create lsp for already running pod in ovn-nb when controller restart
for _, subnet := range needAllocateSubnets(pod, append(attachmentSubnets, defaultSubnet)) {
ip, mac, err := c.acquireAddress(pod, subnet)
if err != nil {
c.recorder.Eventf(pod, v1.EventTypeWarning, "AcquireAddressFailed", err.Error())
return err
}

if err := c.ovnClient.CreatePort(subnet.Name, ovs.PodNameToPortName(name, namespace), ip, subnet.Spec.CIDRBlock, mac); err != nil {
c.recorder.Eventf(pod, v1.EventTypeWarning, "CreateOVNPortFailed", err.Error())
return err
pod.Annotations[fmt.Sprintf(util.IpAddressAnnotationTemplate, subnet.Spec.Provider)] = ip
pod.Annotations[fmt.Sprintf(util.MacAddressAnnotationTemplate, subnet.Spec.Provider)] = mac
pod.Annotations[fmt.Sprintf(util.CidrAnnotationTemplate, subnet.Spec.Provider)] = subnet.Spec.CIDRBlock
pod.Annotations[fmt.Sprintf(util.GatewayAnnotationTemplate, subnet.Spec.Provider)] = subnet.Spec.Gateway
pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, subnet.Spec.Provider)] = subnet.Name
pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, subnet.Spec.Provider)] = "true"

if isOvnSubnet(subnet) {
if err := c.ovnClient.CreatePort(subnet.Name, ovs.PodNameToPortName(name, namespace), ip, subnet.Spec.CIDRBlock, mac); err != nil {
c.recorder.Eventf(pod, v1.EventTypeWarning, "CreateOVNPortFailed", err.Error())
return err
}
}
}

pod.Annotations[util.IpAddressAnnotation] = ip
pod.Annotations[util.MacAddressAnnotation] = mac
pod.Annotations[util.CidrAnnotation] = subnet.Spec.CIDRBlock
pod.Annotations[util.GatewayAnnotation] = subnet.Spec.Gateway
pod.Annotations[util.LogicalSwitchAnnotation] = subnet.Name
pod.Annotations[util.AllocatedAnnotation] = "true"
if _, err := c.config.KubeClient.CoreV1().Pods(namespace).Patch(name, types.JSONPatchType, generatePatchPayload(pod.Annotations, op)); err != nil {
klog.Errorf("patch pod %s/%s failed %v", name, namespace, err)
return err
}
if _, err := c.config.KubeClient.CoreV1().Pods(namespace).Patch(name, types.JSONPatchType, generatePatchPayload(pod.Annotations, op)); err != nil {
klog.Errorf("patch pod %s/%s failed %v", name, namespace, err)
return err
}

// In case update event might lost during leader election
Expand All @@ -348,12 +356,13 @@ func (c *Controller) handleAddPod(key string) error {
pod.Spec.NodeName != "" {
return c.handleUpdatePod(key)
}

return nil
}

func (c *Controller) handleDeletePod(key string) error {
ip, _, exist := c.ipam.GetPodAddress(key)
if exist {
ips, _ := c.ipam.GetPodAddress(key)
for _, ip := range ips {
if err := c.ovnClient.DeleteStaticRoute(ip, c.config.ClusterRouter); err != nil {
return err
}
Expand Down Expand Up @@ -398,7 +407,7 @@ func (c *Controller) handleUpdatePod(key string) error {
klog.Infof("update pod %s/%s", namespace, name)
podIP := pod.Annotations[util.IpAddressAnnotation]

subnet, err := c.getPodSubnet(pod)
subnet, err := c.getPodDefaultSubnet(pod)
if err != nil {
klog.Errorf("failed to get subnet %v", err)
return err
Expand Down Expand Up @@ -449,17 +458,28 @@ func getNodeTunlIP(node *v1.Node) (net.IP, error) {
return nodeTunlIPAddr, nil
}

func needCreateLogicalSwitchPort(pod *v1.Pod) bool {
func needAllocateSubnets(pod *v1.Pod, subnets []*kubeovnv1.Subnet) []*kubeovnv1.Subnet {
if pod.Status.Phase == v1.PodRunning ||
pod.Status.Phase == v1.PodSucceeded ||
pod.Status.Phase == v1.PodFailed ||
pod.Annotations != nil && pod.Annotations[util.AllocatedAnnotation] == "true" {
return false
pod.Status.Phase == v1.PodFailed {
return nil
}
return true

if pod.Annotations == nil {
return subnets
}

result := make([]*kubeovnv1.Subnet, 0, len(subnets))
for _, subnet := range subnets {
if pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, subnet.Spec.Provider)] != "true" {
result = append(result, subnet)
}
}

return result
}

func (c *Controller) getPodSubnet(pod *v1.Pod) (*kubeovnv1.Subnet, error) {
func (c *Controller) getPodDefaultSubnet(pod *v1.Pod) (*kubeovnv1.Subnet, error) {
subnet, err := c.subnetsLister.Get(c.config.DefaultLogicalSwitch)
if err != nil {
klog.Errorf("failed to get default subnet %v", err)
Expand All @@ -482,32 +502,53 @@ func (c *Controller) getPodSubnet(pod *v1.Pod) (*kubeovnv1.Subnet, error) {
return subnet, nil
}

func (c *Controller) acquireAddress(pod *v1.Pod) (string, string, error) {
subnet, err := c.getPodSubnet(pod)
func (c *Controller) getPodAttachmentSubnet(pod *v1.Pod) ([]*kubeovnv1.Subnet, error) {
attachments, err := util.ParsePodNetworkAnnotation(pod.Annotations[util.AttachmentNetworkAnnotation], pod.Namespace)
if err != nil {
return "", "", err
return nil, err
}
subnets, err := c.subnetsLister.List(labels.Everything())
if err != nil {
return nil, err
}

result := make([]*kubeovnv1.Subnet, 0, len(attachments))
for _, attach := range attachments {
provider := fmt.Sprintf("%s.%s", attach.Name, attach.Namespace)
for _, subnet := range subnets {
if subnet.Spec.Provider == provider {
result = append(result, subnet)
break
}
}
}
return result, nil
}

func (c *Controller) acquireAddress(pod *v1.Pod, subnet *kubeovnv1.Subnet) (string, string, error) {
key := fmt.Sprintf("%s/%s", pod.Name, pod.Namespace)

// Random allocate
if pod.Annotations[util.IpAddressAnnotation] == "" && pod.Annotations[util.IpPoolAnnotation] == "" {
if pod.Annotations[fmt.Sprintf(util.IpAddressAnnotationTemplate, subnet.Spec.Provider)] == "" &&
pod.Annotations[fmt.Sprintf(util.IpPoolAnnotationTemplate, subnet.Spec.Provider)] == "" {
return c.ipam.GetRandomAddress(key, subnet.Name)
}

// Static allocate
if pod.Annotations[util.IpAddressAnnotation] != "" {
return c.ipam.GetStaticAddress(key, pod.Annotations[util.IpAddressAnnotation], pod.Annotations[util.MacAddressAnnotation], subnet.Name)
if pod.Annotations[fmt.Sprintf(util.IpAddressAnnotationTemplate, subnet.Spec.Provider)] != "" {
return c.ipam.GetStaticAddress(key, pod.Annotations[fmt.Sprintf(util.IpAddressAnnotationTemplate, subnet.Spec.Provider)],
pod.Annotations[fmt.Sprintf(util.MacAddressAnnotationTemplate, subnet.Spec.Provider)], subnet.Name)
}

// IPPool allocate
ipPool := strings.Split(pod.Annotations[util.IpPoolAnnotation], ",")
ipPool := strings.Split(pod.Annotations[fmt.Sprintf(util.IpPoolAnnotationTemplate, subnet.Spec.Provider)], ",")
for i, ip := range ipPool {
ipPool[i] = strings.TrimSpace(ip)
}

if ok, _ := isStatefulSetPod(pod); !ok {
for _, staticIP := range ipPool {
if ip, mac, err := c.ipam.GetStaticAddress(key, staticIP, pod.Annotations[util.MacAddressAnnotation], subnet.Name); err == nil {
if ip, mac, err := c.ipam.GetStaticAddress(key, staticIP, pod.Annotations[fmt.Sprintf(util.MacAddressAnnotationTemplate, subnet.Spec.Provider)], subnet.Name); err == nil {
return ip, mac, nil
}
}
Expand All @@ -517,7 +558,7 @@ func (c *Controller) acquireAddress(pod *v1.Pod) (string, string, error) {
numStr := strings.Split(pod.Name, "-")[numIndex]
index, _ := strconv.Atoi(numStr)
if index < len(ipPool) {
return c.ipam.GetStaticAddress(key, ipPool[index], pod.Annotations[util.MacAddressAnnotation], subnet.Name)
return c.ipam.GetStaticAddress(key, ipPool[index], pod.Annotations[fmt.Sprintf(util.MacAddressAnnotationTemplate, subnet.Spec.Provider)], subnet.Name)
}
}
return "", "", ipam.NoAvailableError
Expand Down
19 changes: 19 additions & 0 deletions pkg/controller/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ func formatSubnet(subnet *kubeovnv1.Subnet, c *Controller) error {
subnet.Spec.CIDRBlock = ipNet.String()
changed = true
}
if subnet.Spec.Provider == "" {
subnet.Spec.Provider = util.OvnProvider
changed = true
}
if subnet.Spec.Protocol == "" || subnet.Spec.Protocol != util.CheckProtocol(subnet.Spec.CIDRBlock) {
subnet.Spec.Protocol = util.CheckProtocol(subnet.Spec.CIDRBlock)
changed = true
Expand Down Expand Up @@ -350,6 +354,10 @@ func (c *Controller) handleAddSubnet(key string) error {
return err
}

if !isOvnSubnet(subnet) {
return nil
}

if err := c.ipam.AddOrUpdateSubnet(subnet.Name, subnet.Spec.CIDRBlock, subnet.Spec.ExcludeIps); err != nil {
return err
}
Expand Down Expand Up @@ -598,6 +606,10 @@ func (c *Controller) handleUpdateSubnet(key string) error {
return err
}

if !isOvnSubnet(subnet) {
return nil
}

exist, err := c.ovnClient.LogicalSwitchExists(subnet.Name)
if err != nil {
klog.Errorf("failed to list logical switch, %v", err)
Expand Down Expand Up @@ -858,3 +870,10 @@ func calcSubnetStatusIP(subnet *kubeovnv1.Subnet, c *Controller) error {
subnet, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(subnet.Name, types.MergePatchType, bytes, "status")
return err
}

func isOvnSubnet(subnet *kubeovnv1.Subnet) bool {
if subnet.Spec.Provider == util.OvnProvider || subnet.Spec.Provider == "" {
return true
}
return false
}
8 changes: 5 additions & 3 deletions pkg/ipam/ipam.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,17 @@ func (ipam *IPAM) DeleteSubnet(subnetName string) {
delete(ipam.Subnets, subnetName)
}

func (ipam *IPAM) GetPodAddress(podName string) (string, string, bool) {
func (ipam *IPAM) GetPodAddress(podName string) ([]string, []string) {
ipam.mutex.RLock()
defer ipam.mutex.RUnlock()
ips, macs := []string{}, []string{}
for _, subnet := range ipam.Subnets {
if ip, mac, exist := subnet.GetPodAddress(podName); exist {
return string(ip), mac, exist
ips = append(ips, string(ip))
macs = append(macs, mac)
}
}
return "", "", false
return ips, macs
}

func (ipam *IPAM) ContainAddress(address string) bool {
Expand Down

0 comments on commit 1319eb5

Please sign in to comment.