Skip to content

Commit

Permalink
feat: change pod route when update gateway type
Browse files Browse the repository at this point in the history
  • Loading branch information
oilbeater committed May 12, 2020
1 parent 462aa15 commit f0b17a6
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 64 deletions.
4 changes: 2 additions & 2 deletions pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,15 +352,15 @@ func (c *Controller) handleUpdateNode(key string) error {
if nodeReady(node) {
for _, subnet := range subnets {
if subnet.Status.ActivateGateway == "" && gatewayContains(subnet.Spec.GatewayNode, node.Name) {
if err := c.reconcileCentralizedGateway(subnet); err != nil {
if err := c.reconcileGateway(subnet); err != nil {
return err
}
}
}
} else {
for _, subnet := range subnets {
if subnet.Status.ActivateGateway == node.Name {
if err := c.reconcileCentralizedGateway(subnet); err != nil {
if err := c.reconcileGateway(subnet); err != nil {
return err
}
}
Expand Down
138 changes: 90 additions & 48 deletions pkg/controller/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func formatSubnet(subnet *kubeovnv1.Subnet, c *Controller) error {
changed = true
} else {
gwExists := false
for _, ip := range ovs.ExpandExcludeIPs(subnet.Spec.ExcludeIps) {
for _, ip := range ovs.ExpandExcludeIPs(subnet.Spec.ExcludeIps, subnet.Spec.CIDRBlock) {
if ip == subnet.Spec.Gateway {
gwExists = true
break
Expand Down Expand Up @@ -372,6 +372,36 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
return nil
}

subnetList, err := c.subnetsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list subnets %v", err)
return err
}
for _, sub := range subnetList {
if sub.Name != subnet.Name && util.CIDRConflict(sub.Spec.CIDRBlock, subnet.Spec.CIDRBlock) {
err = fmt.Errorf("subnet %s cidr %s conflict with subnet %s cidr %s", subnet.Name, subnet.Spec.CIDRBlock, sub.Name, sub.Spec.CIDRBlock)
klog.Error(err)
c.patchSubnetStatus(subnet, "ValidateLogicalSwitchFailed", err.Error())
return err
}
}

nodes, err := c.nodesLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list nodes %v", err)
return err
}
for _, node := range nodes {
for _, addr := range node.Status.Addresses {
if addr.Type == v1.NodeInternalIP && util.CIDRContainIP(subnet.Spec.CIDRBlock, addr.Address) {
err = fmt.Errorf("subnet %s cidr %s conflict with node %s address %s", subnet.Name, subnet.Spec.CIDRBlock, node.Name, addr.Address)
klog.Error(err)
c.patchSubnetStatus(subnet, "ValidateLogicalSwitchFailed", err.Error())
return err
}
}
}

exist, err := c.ovnClient.LogicalSwitchExists(subnet.Name)
if err != nil {
klog.Errorf("failed to list logical switch, %v", err)
Expand All @@ -388,46 +418,21 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
} else {
c.patchSubnetStatus(subnet, "ValidateLogicalSwitchSuccess", "")
}

subnetList, err := c.subnetsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list subnets %v", err)
return err
}
for _, sub := range subnetList {
if sub.Name != subnet.Name && util.CIDRConflict(sub.Spec.CIDRBlock, subnet.Spec.CIDRBlock) {
err = fmt.Errorf("subnet %s cidr %s conflict with subnet %s cidr %s", subnet.Name, subnet.Spec.CIDRBlock, sub.Name, sub.Spec.CIDRBlock)
klog.Error(err)
c.patchSubnetStatus(subnet, "ValidateLogicalSwitchFailed", err.Error())
return err
}
}

nodes, err := c.nodesLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list nodes %v", err)
return err
}
for _, node := range nodes {
for _, addr := range node.Status.Addresses {
if addr.Type == v1.NodeInternalIP && util.CIDRContainIP(subnet.Spec.CIDRBlock, addr.Address) {
err = fmt.Errorf("subnet %s cidr %s conflict with node %s address %s", subnet.Name, subnet.Spec.CIDRBlock, node.Name, addr.Address)
klog.Error(err)
c.patchSubnetStatus(subnet, "ValidateLogicalSwitchFailed", err.Error())
return err
}
}
}
// If multiple namespace use same ls name, only first one will success
err = c.ovnClient.CreateLogicalSwitch(subnet.Name, subnet.Spec.Protocol, subnet.Spec.CIDRBlock, subnet.Spec.Gateway, subnet.Spec.ExcludeIps)
if err != nil {
if err := c.ovnClient.CreateLogicalSwitch(subnet.Name, subnet.Spec.Protocol, subnet.Spec.CIDRBlock, subnet.Spec.Gateway, subnet.Spec.ExcludeIps); err != nil {
c.patchSubnetStatus(subnet, "CreateLogicalSwitchFailed", err.Error())
return err
}
} else {
// logical switch exists, only update other_config
err := c.ovnClient.SetLogicalSwitchConfig(subnet.Name, subnet.Spec.Protocol, subnet.Spec.CIDRBlock, subnet.Spec.Gateway, subnet.Spec.ExcludeIps)
if err != nil {
if err = util.ValidateSubnet(*subnet); err != nil {
klog.Errorf("failed to validate subnet %s, %v", subnet.Name, err)
c.patchSubnetStatus(subnet, "ValidateLogicalSwitchFailed", err.Error())
return err
} else {
c.patchSubnetStatus(subnet, "ValidateLogicalSwitchSuccess", "")
}
if err := c.ovnClient.SetLogicalSwitchConfig(subnet.Name, subnet.Spec.Protocol, subnet.Spec.CIDRBlock, subnet.Spec.Gateway, subnet.Spec.ExcludeIps); err != nil {
c.patchSubnetStatus(subnet, "SetLogicalSwitchConfigFailed", err.Error())
return err
}
Expand All @@ -439,16 +444,14 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
}

if subnet.Spec.Private {
err = c.ovnClient.SetPrivateLogicalSwitch(subnet.Name, subnet.Spec.Protocol, subnet.Spec.CIDRBlock, subnet.Spec.AllowSubnets)
if err != nil {
if err := c.ovnClient.SetPrivateLogicalSwitch(subnet.Name, subnet.Spec.Protocol, subnet.Spec.CIDRBlock, subnet.Spec.AllowSubnets); err != nil {
c.patchSubnetStatus(subnet, "SetPrivateLogicalSwitchFailed", err.Error())
return err
} else {
c.patchSubnetStatus(subnet, "SetPrivateLogicalSwitchSuccess", "")
}
} else {
err = c.ovnClient.ResetLogicalSwitchAcl(subnet.Name, subnet.Spec.Protocol)
if err != nil {
if err := c.ovnClient.ResetLogicalSwitchAcl(subnet.Name, subnet.Spec.Protocol); err != nil {
c.patchSubnetStatus(subnet, "ResetLogicalSwitchAclFailed", err.Error())
return err
} else {
Expand Down Expand Up @@ -544,9 +547,12 @@ func (c *Controller) reconcileSubnet(subnet *kubeovnv1.Subnet) error {
klog.Errorf("reconcile namespaces for subnet %s failed, %v", subnet.Name, err)
return err
}
if err := c.reconcileCentralizedGateway(subnet); err != nil {
klog.Errorf("reconcile centralized gateway for subnet %s failed, %v", subnet.Name, err)
return err

if subnet.Name != c.config.NodeSwitch {
if err := c.reconcileGateway(subnet); err != nil {
klog.Errorf("reconcile centralized gateway for subnet %s failed, %v", subnet.Name, err)
return err
}
}

if err := c.reconcileVlan(subnet); err != nil {
Expand Down Expand Up @@ -614,7 +620,13 @@ func (c *Controller) reconcileNamespaces(subnet *kubeovnv1.Subnet) error {
return nil
}

func (c *Controller) reconcileCentralizedGateway(subnet *kubeovnv1.Subnet) error {
func (c *Controller) reconcileGateway(subnet *kubeovnv1.Subnet) error {
ips, err := c.config.KubeOvnClient.KubeovnV1().IPs().List(metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", util.SubnetNameLabel, subnet.Name)})
if err != nil {
klog.Errorf("failed to list ip of subnet %s, %v", subnet.Name, err)
return err
}

// if gw is distributed remove activateGateway field
if subnet.Spec.GatewayType == kubeovnv1.GWDistributedType {
if subnet.Status.ActivateGateway == "" {
Expand All @@ -626,7 +638,34 @@ func (c *Controller) reconcileCentralizedGateway(subnet *kubeovnv1.Subnet) error
return err
}
_, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(subnet.Name, types.MergePatchType, bytes, "status")
return err
if err != nil {
return err
}

for _, ip := range ips.Items {
node, err := c.nodesLister.Get(ip.Spec.NodeName)
if err != nil {
if k8serrors.IsNotFound(err) {
continue
} else {
klog.Errorf("failed to get node %s, %v", ip.Spec.NodeName, err)
return err
}
}
nextHop, err := getNodeTunlIP(node)
if err != nil {
klog.Errorf("failed to get node %s tunl ip, %v", node.Name, err)
return err
}
if err := c.ovnClient.AddStaticRoute(ovs.PolicySrcIP, ip.Spec.IPAddress, nextHop.String(), c.config.ClusterRouter); err != nil {
return errors.Annotate(err, "add static route failed")
}
}
if err := c.ovnClient.DeleteStaticRoute(subnet.Spec.CIDRBlock, c.config.ClusterRouter); err != nil {
klog.Errorf("failed to delete static route %s, %v", subnet.Spec.CIDRBlock, err)
return err
}
return nil
}
klog.Infof("start to init centralized gateway for subnet %s", subnet.Name)

Expand Down Expand Up @@ -668,12 +707,15 @@ func (c *Controller) reconcileCentralizedGateway(subnet *kubeovnv1.Subnet) error
return err
}

if err := c.ovnClient.DeleteStaticRoute(subnet.Spec.CIDRBlock, c.config.ClusterRouter); err != nil {
return errors.Annotate(err, "del static route failed")
}
if err := c.ovnClient.AddStaticRoute(ovs.PolicySrcIP, subnet.Spec.CIDRBlock, nodeTunlIPAddr.String(), c.config.ClusterRouter); err != nil {
return errors.Annotate(err, "add static route failed")
}
for _, ip := range ips.Items {
if err := c.ovnClient.DeleteStaticRoute(ip.Spec.IPAddress, c.config.ClusterRouter); err != nil {
klog.Errorf("failed to delete static route, %v", err)
return err
}
}

subnet.Status.ActivateGateway = newActivateNode
bytes, err := subnet.Status.Bytes()
Expand Down Expand Up @@ -731,7 +773,7 @@ func calcSubnetStatusIP(subnet *kubeovnv1.Subnet, c *Controller) error {
return err
}
// gateway always in excludeIPs
toSubIPs := ovs.ExpandExcludeIPs(subnet.Spec.ExcludeIps)
toSubIPs := ovs.ExpandExcludeIPs(subnet.Spec.ExcludeIps, subnet.Spec.CIDRBlock)
for _, podUsedIP := range podUsedIPs.Items {
toSubIPs = append(toSubIPs, podUsedIP.Spec.IPAddress)
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/ovs/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@ func trimCommandOutput(raw []byte) string {
}

// ExpandExcludeIPs parse ovn exclude_ips to ip slice
func ExpandExcludeIPs(excludeIPs []string) []string {
func ExpandExcludeIPs(excludeIPs []string, cidr string) []string {
rv := []string{}
for _, excludeIP := range excludeIPs {
if strings.Index(excludeIP, "..") != -1 {
parts := strings.Split(excludeIP, "..")
s := util.Ip2BigInt(parts[0])
e := util.Ip2BigInt(parts[1])
for s.Cmp(e) <= 0 {
rv = append(rv, util.BigInt2Ip(s))
ipStr := util.BigInt2Ip(s)
if util.CIDRContainIP(cidr, ipStr) {
rv = append(rv, ipStr)
}
s.Add(s, big.NewInt(1))
}
} else {
Expand Down
30 changes: 18 additions & 12 deletions pkg/webhook/static_ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,17 @@ func (v *ValidatingHook) PodCreateHook(ctx context.Context, req admission.Reques
}
// Get logical switch name
lsName := v.opt.DefaultLS
var subnet *ovnv1.Subnet
subnetList := &ovnv1.SubnetList{}
err := v.cache.List(ctx, subnetList)
if err != nil {
return ctrlwebhook.Errored(http.StatusBadRequest, err)
}
for _, subnet := range subnetList.Items {
for _, ns := range subnet.Spec.Namespaces {
for _, s := range subnetList.Items {
for _, ns := range s.Spec.Namespaces {
if ns == o.GetNamespace() {
lsName = subnet.Name
lsName = s.Name
subnet = &s
break
}
}
Expand All @@ -118,7 +120,7 @@ func (v *ValidatingHook) PodCreateHook(ctx context.Context, req admission.Reques
if err != nil {
return ctrlwebhook.Errored(http.StatusBadRequest, err)
}
parsedExcludeIPs := ovs.ExpandExcludeIPs(excludeIPs)
parsedExcludeIPs := ovs.ExpandExcludeIPs(excludeIPs, subnet.Spec.CIDRBlock)
usedIPs = append(usedIPs, parsedExcludeIPs...)
// Check static ips overlap
if util.IsStringsOverlap([]string{staticIP}, usedIPs) {
Expand Down Expand Up @@ -281,14 +283,16 @@ func (v *ValidatingHook) podControllerCreate(ctx context.Context, staticIPSAnno,
// Get logical switch name
lsName := v.opt.DefaultLS
subnetList := &ovnv1.SubnetList{}
var subnet *ovnv1.Subnet
err := v.cache.List(ctx, subnetList)
if err != nil {
return ctrlwebhook.Errored(http.StatusBadRequest, err)
}
for _, subnet := range subnetList.Items {
for _, ns := range subnet.Spec.Namespaces {
for _, s := range subnetList.Items {
for _, ns := range s.Spec.Namespaces {
if ns == namespace {
lsName = subnet.Name
lsName = s.Name
subnet = &s
break
}
}
Expand All @@ -314,7 +318,7 @@ func (v *ValidatingHook) podControllerCreate(ctx context.Context, staticIPSAnno,
if err != nil {
return ctrlwebhook.Errored(http.StatusBadRequest, err)
}
parsedExcludeIPs := ovs.ExpandExcludeIPs(excludeIPs)
parsedExcludeIPs := ovs.ExpandExcludeIPs(excludeIPs, subnet.Spec.CIDRBlock)
usedIPs = append(usedIPs, parsedExcludeIPs...)
// Check static ips overlap
if util.IsStringsOverlap(staticIPs, usedIPs) {
Expand Down Expand Up @@ -359,15 +363,17 @@ func (v *ValidatingHook) podControllerUpdate(ctx context.Context, oldStaticIPSAn
}
// Get logical switch name
lsName := v.opt.DefaultLS
var subnet *ovnv1.Subnet
subnetList := &ovnv1.SubnetList{}
err := v.cache.List(ctx, subnetList)
if err != nil {
return ctrlwebhook.Errored(http.StatusBadRequest, err)
}
for _, subnet := range subnetList.Items {
for _, ns := range subnet.Spec.Namespaces {
for _, s := range subnetList.Items {
for _, ns := range s.Spec.Namespaces {
if ns == namespace {
lsName = subnet.Name
lsName = s.Name
subnet = &s
break
}
}
Expand All @@ -377,7 +383,7 @@ func (v *ValidatingHook) podControllerUpdate(ctx context.Context, oldStaticIPSAn
if err != nil {
return ctrlwebhook.Errored(http.StatusBadRequest, err)
}
parsedExcludeIPs := ovs.ExpandExcludeIPs(excludeIPs)
parsedExcludeIPs := ovs.ExpandExcludeIPs(excludeIPs, subnet.Spec.CIDRBlock)
// Check static ips overlap
if util.IsStringsOverlap(toAdd, parsedExcludeIPs) {
return ctrlwebhook.Denied("overlap")
Expand Down

0 comments on commit f0b17a6

Please sign in to comment.