Skip to content

Commit

Permalink
add 'virtual' port for vip (#1278)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanriming committed Feb 8, 2022
1 parent 36c43c4 commit b220f0c
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 3 deletions.
4 changes: 4 additions & 0 deletions dist/images/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,10 @@ spec:
type: array
items:
type: string
vips:
type: array
items:
type: string
gatewayType:
type: string
allowSubnets:
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/kubeovn/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ type SubnetSpec struct {
Vlan string `json:"vlan,omitempty"`
HtbQos string `json:"htbqos,omitempty"`

Vips []string `json:"vips,omitempty"`

LogicalGateway bool `json:"logicalGateway"`
DisableGatewayCheck bool `json:"disableGatewayCheck"`
DisableInterConnection bool `json:"disableInterConnection"`
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type Controller struct {
deleteSubnetQueue workqueue.RateLimitingInterface
deleteRouteQueue workqueue.RateLimitingInterface
updateSubnetStatusQueue workqueue.RateLimitingInterface
syncVirtualPortsQueue workqueue.RateLimitingInterface

ipsLister kubeovnlister.IPLister
ipSynced cache.InformerSynced
Expand Down Expand Up @@ -195,6 +196,7 @@ func NewController(config *Configuration) *Controller {
deleteSubnetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteSubnet"),
deleteRouteQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteRoute"),
updateSubnetStatusQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateSubnetStatus"),
syncVirtualPortsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "SyncVirtualPort"),

ipsLister: ipInformer.Lister(),
ipSynced: ipInformer.Informer().HasSynced,
Expand Down Expand Up @@ -434,6 +436,7 @@ func (c *Controller) shutdown() {
c.deleteSubnetQueue.ShutDown()
c.deleteRouteQueue.ShutDown()
c.updateSubnetStatusQueue.ShutDown()
c.syncVirtualPortsQueue.ShutDown()

c.addNodeQueue.ShutDown()
c.updateNodeQueue.ShutDown()
Expand Down Expand Up @@ -549,6 +552,7 @@ func (c *Controller) startWorkers(stopCh <-chan struct{}) {
go wait.Until(c.runDeleteSubnetWorker, time.Second, stopCh)
go wait.Until(c.runDeleteRouteWorker, time.Second, stopCh)
go wait.Until(c.runUpdateSubnetStatusWorker, time.Second, stopCh)
go wait.Until(c.runSyncVirtualPortsWorker, time.Second, stopCh)

if c.config.EnableLb {
go wait.Until(c.runUpdateServiceWorker, time.Second, stopCh)
Expand Down
12 changes: 11 additions & 1 deletion pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,15 @@ func (c *Controller) handleDeletePod(pod *v1.Pod) error {
}
}
c.ipam.ReleaseAddressByPod(key)

podNets, err := c.getPodKubeovnNets(pod)
if err != nil {
klog.Errorf("failed to get pod nets %v", err)
} else {
for _, podNet := range podNets {
c.syncVirtualPortsQueue.Add(podNet.Subnet.Name)
}
}
return nil
}

Expand Down Expand Up @@ -676,10 +685,11 @@ func (c *Controller) handleUpdatePodSecurity(key string) error {
mac := pod.Annotations[fmt.Sprintf(util.MacAddressAnnotationTemplate, podNet.ProviderName)]
ipStr := pod.Annotations[fmt.Sprintf(util.IpAddressAnnotationTemplate, podNet.ProviderName)]
vips := pod.Annotations[fmt.Sprintf(util.PortVipAnnotationTemplate, podNet.ProviderName)]
if err = c.ovnClient.SetPortSecurity(portSecurity, ovs.PodNameToPortName(name, namespace, podNet.ProviderName), mac, ipStr, vips); err != nil {
if err = c.ovnClient.SetPortSecurity(portSecurity, podNet.Subnet.Name, ovs.PodNameToPortName(name, namespace, podNet.ProviderName), mac, ipStr, vips); err != nil {
klog.Errorf("setPortSecurity failed. %v", err)
return err
}
c.syncVirtualPortsQueue.Add(podNet.Subnet.Name)

var securityGroups string
if portSecurity {
Expand Down
119 changes: 119 additions & 0 deletions pkg/controller/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func (c *Controller) enqueueUpdateSubnet(old, new interface{}) {
oldSubnet.Spec.LogicalGateway != newSubnet.Spec.LogicalGateway ||
oldSubnet.Spec.Gateway != newSubnet.Spec.Gateway ||
!reflect.DeepEqual(oldSubnet.Spec.ExcludeIps, newSubnet.Spec.ExcludeIps) ||
!reflect.DeepEqual(oldSubnet.Spec.Vips, newSubnet.Spec.Vips) ||
oldSubnet.Spec.Vlan != newSubnet.Spec.Vlan {
klog.V(3).Infof("enqueue update subnet %s", key)
c.addOrUpdateSubnetQueue.Add(key)
Expand All @@ -116,6 +117,41 @@ func (c *Controller) runDeleteSubnetWorker() {
}
}

func (c *Controller) runSyncVirtualPortsWorker() {
for c.processNextSyncVirtualPortsWorkItem() {
}
}

func (c *Controller) processNextSyncVirtualPortsWorkItem() bool {
obj, shutdown := c.syncVirtualPortsQueue.Get()
if shutdown {
return false
}

err := func(obj interface{}) error {
defer c.syncVirtualPortsQueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.syncVirtualPortsQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.syncVirtualPort(key); err != nil {
c.syncVirtualPortsQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.syncVirtualPortsQueue.Forget(obj)
return nil
}(obj)

if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}

func (c *Controller) processNextAddSubnetWorkItem() bool {
obj, shutdown := c.addOrUpdateSubnetQueue.Get()
if shutdown {
Expand Down Expand Up @@ -845,6 +881,89 @@ func (c *Controller) reconcileSubnet(subnet *kubeovnv1.Subnet) error {
klog.Errorf("reconcile vlan for subnet %s failed, %v", subnet.Name, err)
return err
}

if err := c.reconcileVips(subnet); err != nil {
klog.Errorf("reconcile vips for subnet %s failed, %v", subnet.Name, err)
return err
}
return nil
}

func (c *Controller) reconcileVips(subnet *kubeovnv1.Subnet) error {
// 1. get all vip port
results, err := c.ovnClient.CustomFindEntity("logical_switch_port", []string{"name", "options"}, "type=virtual", fmt.Sprintf("external_ids:ls=%s", subnet.Name))
if err != nil {
klog.Errorf("failed to find virtual port, %v", err)
return err
}

// 2. remove no need port
var existVips []string
for _, ret := range results {
options := ret["options"]
for _, value := range options {
if !strings.HasPrefix(value, "virtual-ip=") {
continue
}
vip := strings.TrimPrefix(value, "virtual-ip=")
if vip == "" || net.ParseIP(vip) == nil {
continue
}
if !util.ContainsString(subnet.Spec.Vips, vip) {
if err = c.ovnClient.DeleteLogicalSwitchPort(ret["name"][0]); err != nil {
klog.Errorf("failed to delete virtual port, %v", err)
return err
}
} else {
existVips = append(existVips, vip)
}
}
}

// 3. create new port
newVips := util.DiffStringSlice(existVips, subnet.Spec.Vips)
for _, vip := range newVips {
if err = c.ovnClient.CreateVirtualPort(subnet.Name, vip); err != nil {
klog.Errorf("failed to create virtual port, %v", err)
return err
}
}
c.syncVirtualPortsQueue.Add(subnet.Name)
return nil
}

func (c *Controller) syncVirtualPort(key string) error {
subnet, err := c.subnetsLister.Get(key)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
} else {
klog.Errorf("failed to get subnet %s, %v", key, err)
return err
}
}
results, err := c.ovnClient.CustomFindEntity("logical_switch_port", []string{"name", "port_security"},
fmt.Sprintf("external_ids:ls=%s", subnet.Name), "external_ids:attach-vips=true")
if err != nil {
klog.Errorf("failed to list logical_switch_port, %v", err)
return err
}
for _, vip := range subnet.Spec.Vips {
if !util.CIDRContainIP(subnet.Spec.CIDRBlock, vip) {
klog.Errorf("vip %s is out of range to subnet %s", vip, subnet.Name)
continue
}
var virtualParents []string
for _, ret := range results {
if util.ContainsString(ret["port_security"], vip) {
virtualParents = append(virtualParents, ret["name"][0])
}
}
if err = c.ovnClient.SetVirtualParents(subnet.Name, vip, strings.Join(virtualParents, ",")); err != nil {
klog.Errorf("failed to set vip %s virtual parents, %v", vip, err)
return err
}
}
return nil
}

Expand Down
53 changes: 51 additions & 2 deletions pkg/ovs/ovn-nbctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (c Client) SetPortExternalIds(port, key, value string) error {
return nil
}

func (c Client) SetPortSecurity(portSecurity bool, port, mac, ipStr, vips string) error {
func (c Client) SetPortSecurity(portSecurity bool, ls, port, mac, ipStr, vips string) error {
var addresses []string
ovnCommand := []string{"lsp-set-port-security", port}
if portSecurity {
Expand All @@ -172,14 +172,63 @@ func (c Client) SetPortSecurity(portSecurity bool, port, mac, ipStr, vips string
}
ovnCommand = append(ovnCommand, strings.Join(addresses, " "))
}

ovnCommand = append(ovnCommand, "--", "set", "logical_switch_port", port,
fmt.Sprintf("external_ids:attach-vips=%v", vips != ""),
fmt.Sprintf("external_ids:ls=%s", ls))
if _, err := c.ovnNbCommand(ovnCommand...); err != nil {
klog.Errorf("set port %s security failed: %v", port, err)
return err
}
return nil
}

// CreateVirtualPort create virtual type logical switch port in ovn
func (c Client) CreateVirtualPort(ls, ip string) error {
portName := fmt.Sprintf("%s-vip-%s", ls, ip)
if _, err := c.ovnNbCommand(MayExist, "lsp-add", ls, portName,
"--", "set", "logical_switch_port", portName, "type=virtual",
fmt.Sprintf("options:virtual-ip=\"%s\"", ip),
fmt.Sprintf("external_ids:vendor=%s", util.CniTypeName),
fmt.Sprintf("external_ids:ls=%s", ls)); err != nil {
klog.Errorf("create virtual port %s failed: %v", portName, err)
return err
}
return nil
}

func (c Client) SetVirtualParents(ls, ip, parents string) error {
portName := fmt.Sprintf("%s-vip-%s", ls, ip)
var cmdArg []string
if parents != "" {
cmdArg = append(cmdArg, "set", "logical_switch_port", portName, fmt.Sprintf("options:virtual-parents=%s", parents))
} else {
cmdArg = append(cmdArg, "remove", "logical_switch_port", portName, "options", "virtual-parents")
}
if _, err := c.ovnNbCommand(cmdArg...); err != nil {
klog.Errorf("set vip %s virtual parents failed: %v", ip, err)
return err
}
return nil
}

func (c Client) ListVirtualPort(ls string) ([]string, error) {
cmdArg := []string{"--format=csv", "--data=bare", "--no-heading", "--columns=name", "find", "logical_switch_port", "type=virtual", fmt.Sprintf("external_ids:ls=%s", ls)}
output, err := c.ovnNbCommand(cmdArg...)
if err != nil {
klog.Errorf("failed to list logical switch port, %v", err)
return nil, err
}
lines := strings.Split(output, "\n")
result := make([]string, 0, len(lines))
for _, l := range lines {
if len(strings.TrimSpace(l)) == 0 {
continue
}
result = append(result, strings.TrimSpace(l))
}
return result, nil
}

// CreatePort create logical switch port in ovn
func (c Client) CreatePort(ls, port, ip, mac, pod, namespace string, portSecurity bool, securityGroups string, vips string, liveMigration bool) error {
var ovnCommand []string
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ func ValidateSubnet(subnet kubeovnv1.Subnet) error {
}
}

if len(subnet.Spec.Vips) != 0 {
for _, vip := range subnet.Spec.Vips {
if !CIDRContainIP(subnet.Spec.CIDRBlock, vip) {
return fmt.Errorf("vip %s conflicts with subnet %s cidr %s", vip, subnet.Name, subnet.Spec.CIDRBlock)
}
}
}
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions yamls/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ spec:
type: array
items:
type: string
vips:
type: array
items:
type: string
gatewayType:
type: string
allowSubnets:
Expand Down

0 comments on commit b220f0c

Please sign in to comment.