diff --git a/Makefile.e2e b/Makefile.e2e index 9cc3e2cae25..bfdd40c2d12 100644 --- a/Makefile.e2e +++ b/Makefile.e2e @@ -105,6 +105,14 @@ kube-ovn-lb-svc-conformance-e2e: E2E_NETWORK_MODE=$(E2E_NETWORK_MODE) \ ./test/e2e/lb-svc/e2e.test --ginkgo.focus=CNI:Kube-OVN +.PHONY: kube-ovn-eip-conformance-e2e +kube-ovn-eip-conformance-e2e: + go test ./test/e2e/ovn-eip -c -o test/e2e/ovn-eip/e2e.test + E2E_BRANCH=$(E2E_BRANCH) \ + E2E_IP_FAMILY=$(E2E_IP_FAMILY) \ + E2E_NETWORK_MODE=$(E2E_NETWORK_MODE) \ + ./test/e2e/ovn-eip/e2e.test --ginkgo.focus=CNI:Kube-OVN + .PHONY: kube-ovn-security-e2e kube-ovn-security-e2e: go test ./test/e2e/security -c -o test/e2e/security/e2e.test diff --git a/dist/images/install.sh b/dist/images/install.sh index a1376c72f44..7264154c762 100755 --- a/dist/images/install.sh +++ b/dist/images/install.sh @@ -773,15 +773,21 @@ spec: subresources: status: {} additionalPrinterColumns: - - jsonPath: .spec.v4ip - name: IP + - jsonPath: .status.v4Ip + name: V4IP type: string - - jsonPath: .spec.macAddress + - jsonPath: .status.v6Ip + name: V6IP + type: string + - jsonPath: .status.macAddress name: Mac type: string - jsonPath: .spec.type name: Type type: string + - jsonPath: .status.ready + name: Ready + type: boolean schema: openAPIV3Schema: type: object @@ -789,8 +795,12 @@ spec: status: type: object properties: + ready: + type: boolean v4Ip: type: string + v6Ip: + type: string macAddress: type: string conditions: @@ -817,7 +827,9 @@ spec: type: string type: type: string - v4ip: + v4Ip: + type: string + v6Ip: type: string macAddress: type: string @@ -3170,7 +3182,7 @@ spec: name: host-run-ovn - mountPath: /var/run/netns name: host-ns - mountPropagation: HostToContainer + mountPropagation: Bidirectional - mountPath: /var/log/kube-ovn name: kube-ovn-log - mountPath: /var/log/openvswitch diff --git a/pkg/apis/kubeovn/v1/types.go b/pkg/apis/kubeovn/v1/types.go index 534672a81c7..59944f0a11d 100644 --- a/pkg/apis/kubeovn/v1/types.go +++ b/pkg/apis/kubeovn/v1/types.go @@ -995,10 +995,11 @@ type OvnEip struct { } type OvnEipSpec struct { ExternalSubnet string `json:"externalSubnet"` - V4Ip string `json:"v4ip"` + V4Ip string `json:"v4Ip"` + V6Ip string `json:"v6Ip"` MacAddress string `json:"macAddress"` Type string `json:"type"` - // usage type: fip, snat, lrp + // usage type: fip, snat, lrp, node external gw } // Condition describes the state of an object at a certain point. @@ -1029,7 +1030,9 @@ type OvnEipStatus struct { // +patchStrategy=merge Conditions []OvnEipCondition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"` - V4Ip string `json:"v4ip" patchStrategy:"merge"` + Ready bool `json:"ready" patchStrategy:"merge"` + V4Ip string `json:"v4Ip" patchStrategy:"merge"` + V6Ip string `json:"v6Ip" patchStrategy:"merge"` MacAddress string `json:"macAddress" patchStrategy:"merge"` } diff --git a/pkg/client/clientset/versioned/clientset.go b/pkg/client/clientset/versioned/clientset.go index 6681321648f..4dfab592983 100644 --- a/pkg/client/clientset/versioned/clientset.go +++ b/pkg/client/clientset/versioned/clientset.go @@ -33,7 +33,8 @@ type Interface interface { KubeovnV1() kubeovnv1.KubeovnV1Interface } -// Clientset contains the clients for groups. +// Clientset contains the clients for groups. Each group has exactly one +// version included in a Clientset. type Clientset struct { *discovery.DiscoveryClient kubeovnV1 *kubeovnv1.KubeovnV1Client diff --git a/pkg/client/informers/externalversions/factory.go b/pkg/client/informers/externalversions/factory.go index 83e87c3c836..baba604c99f 100644 --- a/pkg/client/informers/externalversions/factory.go +++ b/pkg/client/informers/externalversions/factory.go @@ -47,11 +47,6 @@ type sharedInformerFactory struct { // startedInformers is used for tracking which informers have been started. // This allows Start() to be called multiple times safely. startedInformers map[reflect.Type]bool - // wg tracks how many goroutines were started. - wg sync.WaitGroup - // shuttingDown is true when Shutdown has been called. It may still be running - // because it needs to wait for goroutines. - shuttingDown bool } // WithCustomResyncConfig sets a custom resync period for the specified informer types. @@ -112,39 +107,20 @@ func NewSharedInformerFactoryWithOptions(client versioned.Interface, defaultResy return factory } +// Start initializes all requested informers. func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { f.lock.Lock() defer f.lock.Unlock() - if f.shuttingDown { - return - } - for informerType, informer := range f.informers { if !f.startedInformers[informerType] { - f.wg.Add(1) - // We need a new variable in each loop iteration, - // otherwise the goroutine would use the loop variable - // and that keeps changing. - informer := informer - go func() { - defer f.wg.Done() - informer.Run(stopCh) - }() + go informer.Run(stopCh) f.startedInformers[informerType] = true } } } -func (f *sharedInformerFactory) Shutdown() { - f.lock.Lock() - f.shuttingDown = true - f.lock.Unlock() - - // Will return immediately if there is nothing to wait for. - f.wg.Wait() -} - +// WaitForCacheSync waits for all started informers' cache were synced. func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { informers := func() map[reflect.Type]cache.SharedIndexInformer { f.lock.Lock() @@ -191,57 +167,10 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal // SharedInformerFactory provides shared informers for resources in all known // API group versions. -// -// It is typically used like this: -// -// ctx, cancel := context.Background() -// defer cancel() -// factory := NewSharedInformerFactory(client, resyncPeriod) -// defer factory.WaitForStop() // Returns immediately if nothing was started. -// genericInformer := factory.ForResource(resource) -// typedInformer := factory.SomeAPIGroup().V1().SomeType() -// factory.Start(ctx.Done()) // Start processing these informers. -// synced := factory.WaitForCacheSync(ctx.Done()) -// for v, ok := range synced { -// if !ok { -// fmt.Fprintf(os.Stderr, "caches failed to sync: %v", v) -// return -// } -// } -// -// // Creating informers can also be created after Start, but then -// // Start must be called again: -// anotherGenericInformer := factory.ForResource(resource) -// factory.Start(ctx.Done()) type SharedInformerFactory interface { internalinterfaces.SharedInformerFactory - - // Start initializes all requested informers. They are handled in goroutines - // which run until the stop channel gets closed. - Start(stopCh <-chan struct{}) - - // Shutdown marks a factory as shutting down. At that point no new - // informers can be started anymore and Start will return without - // doing anything. - // - // In addition, Shutdown blocks until all goroutines have terminated. For that - // to happen, the close channel(s) that they were started with must be closed, - // either before Shutdown gets called or while it is waiting. - // - // Shutdown may be called multiple times, even concurrently. All such calls will - // block until all goroutines have terminated. - Shutdown() - - // WaitForCacheSync blocks until all started informers' caches were synced - // or the stop channel gets closed. - WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool - - // ForResource gives generic access to a shared informer of the matching type. ForResource(resource schema.GroupVersionResource) (GenericInformer, error) - - // InternalInformerFor returns the SharedIndexInformer for obj using an internal - // client. - InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer + WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool Kubeovn() kubeovn.Interface } diff --git a/pkg/controller/external-gw.go b/pkg/controller/external-gw.go index 397bd100951..522a67c7c8b 100644 --- a/pkg/controller/external-gw.go +++ b/pkg/controller/external-gw.go @@ -160,6 +160,11 @@ func (c *Controller) establishExternalGateway(config map[string]string) error { klog.Errorf("failed to create external gateway switch, %v", err) return err } + lrpEipName := fmt.Sprintf("%s-%s", util.DefaultVpc, c.config.ExternalGatewaySwitch) + if err = c.patchOvnEipStatus(lrpEipName, true); err != nil { + klog.Errorf("failed to patch ovn eip cr status for lrp %s, %v", lrpEipName, err) + return err + } return nil } @@ -181,8 +186,13 @@ func (c *Controller) createDefaultVpcLrpEip(config map[string]string) (string, s } var v4ip, mac string if !needCreateEip { - v4ip = cachedEip.Spec.V4Ip - mac = cachedEip.Spec.MacAddress + v4ip = cachedEip.Status.V4Ip + mac = cachedEip.Status.MacAddress + if v4ip == "" || mac == "" { + err = fmt.Errorf("lrp '%s' ip or mac should not be empty", lrpEipName) + klog.Error(err) + return "", "", err + } } else { var v6ip string v4ip, v6ip, mac, err = c.acquireIpAddress(c.config.ExternalGatewaySwitch, lrpEipName, lrpEipName) @@ -194,16 +204,11 @@ func (c *Controller) createDefaultVpcLrpEip(config map[string]string) (string, s klog.Errorf("failed to create ovn eip cr for lrp %s, %v", lrpEipName, err) return "", "", err } - if err = c.patchOvnEipStatus(lrpEipName); err != nil { + if err = c.patchOvnEipStatus(lrpEipName, false); err != nil { klog.Errorf("failed to patch ovn eip cr status for lrp %s, %v", lrpEipName, err) return "", "", err } } - if v4ip == "" || mac == "" { - err = fmt.Errorf("lrp '%s' ip or mac should not be empty", lrpEipName) - klog.Error(err) - return "", "", err - } v4ipCidr := util.GetIpAddrWithMask(v4ip, cachedSubnet.Spec.CIDRBlock) return v4ipCidr, mac, nil } diff --git a/pkg/controller/gc.go b/pkg/controller/gc.go index e376ccc0a7b..630dce7886b 100644 --- a/pkg/controller/gc.go +++ b/pkg/controller/gc.go @@ -327,6 +327,11 @@ func (c *Controller) markAndCleanLSP() error { if node.Annotations[util.AllocatedAnnotation] == "true" { ipMap[fmt.Sprintf("node-%s", node.Name)] = struct{}{} } + + if _, err := c.ovnEipsLister.Get(node.Name); err == nil { + // node external gw lsp is managed by ovn eip cr, skip gc its lsp + ipMap[node.Name] = struct{}{} + } } // The lsp for vm pod should not be deleted if vm still exists diff --git a/pkg/controller/init.go b/pkg/controller/init.go index 6b9bd5da967..bff9b3d9e90 100644 --- a/pkg/controller/init.go +++ b/pkg/controller/init.go @@ -429,9 +429,7 @@ func (c *Controller) InitIPAM() error { return err } for _, oeip := range oeips { - if _, _, _, err = c.ipam.GetStaticAddress(oeip.Name, oeip.Name, oeip.Spec.V4Ip, - oeip.Spec.MacAddress, oeip.Spec.ExternalSubnet, false); err != nil { - + if _, _, _, err = c.ipam.GetStaticAddress(oeip.Name, oeip.Name, oeip.Status.V4Ip, oeip.Status.MacAddress, oeip.Spec.ExternalSubnet, true); err != nil { klog.Errorf("failed to init ipam from ovn eip cr %s: %v", oeip.Name, err) } } diff --git a/pkg/controller/node.go b/pkg/controller/node.go index 6887c50f234..f4e4f6f01b0 100644 --- a/pkg/controller/node.go +++ b/pkg/controller/node.go @@ -195,7 +195,7 @@ func (c *Controller) handleAddNode(key string) error { return err } node := cachedNode.DeepCopy() - klog.Infof("handle add node %v", node.Name) + klog.Infof("handle add node %s", node.Name) subnets, err := c.subnetsLister.List(labels.Everything()) if err != nil { @@ -777,7 +777,7 @@ func (c *Controller) checkGatewayReady() error { if !success { if exist { - klog.Warningf("failed to ping ovn0 %s or node %v is not ready, delete ecmp policy route for node", ip, node.Name) + klog.Warningf("failed to ping ovn0 %s or node %s is not ready, delete ecmp policy route for node", ip, node.Name) nextHops = util.RemoveString(nextHops, ip) delete(nameIpMap, node.Name) klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops) @@ -803,7 +803,7 @@ func (c *Controller) checkGatewayReady() error { } } else { if exist { - klog.Infof("subnet %v gatewayNode does not contains node %v, delete policy route for node ip %s", subnet.Name, node.Name, ip) + klog.Infof("subnet %s gatewayNode does not contains node %v, delete policy route for node ip %s", subnet.Name, node.Name, ip) nextHops = util.RemoveString(nextHops, ip) delete(nameIpMap, node.Name) klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops) diff --git a/pkg/controller/ovn_eip.go b/pkg/controller/ovn_eip.go index eae4773fd53..579cf4547e5 100644 --- a/pkg/controller/ovn_eip.go +++ b/pkg/controller/ovn_eip.go @@ -236,6 +236,15 @@ func (c *Controller) handleAddOvnEip(key string) error { klog.Errorf("failed to acquire ip address, %v", err) return err } + + if cachedEip.Spec.Type == util.NodeExtGwUsingEip { + mergedIp := util.GetStringIP(v4ip, v6ip) + if err := c.ovnLegacyClient.CreatePort(subnet.Name, portName, mergedIp, mac, "", "", false, "", "", false, false, nil, false); err != nil { + klog.Error("failed to create lsp for ovn eip %s, %v", key, err) + return err + } + } + if err = c.createOrUpdateCrdOvnEip(key, subnet.Name, v4ip, v6ip, mac, cachedEip.Spec.Type); err != nil { klog.Errorf("failed to create or update ovn eip '%s', %v", cachedEip.Name, err) return err @@ -244,6 +253,12 @@ func (c *Controller) handleAddOvnEip(key string) error { klog.Errorf("failed to count ovn eip '%s' in subnet, %v", cachedEip.Name, err) return err } + + if err = c.handleAddOvnEipFinalizer(cachedEip); err != nil { + klog.Errorf("failed to add finalizer for ovn eip, %v", err) + return err + } + return nil } @@ -258,7 +273,7 @@ func (c *Controller) handleUpdateOvnEip(key string) error { if !cachedEip.DeletionTimestamp.IsZero() { subnetName := cachedEip.Spec.ExternalSubnet if subnetName == "" { - return fmt.Errorf("failed to create ovn eip '%s', subnet should be set", key) + return fmt.Errorf("failed to update ovn eip '%s', subnet should be set", key) } subnet, err := c.subnetsLister.Get(subnetName) if err != nil { @@ -298,7 +313,24 @@ func (c *Controller) handleResetOvnEip(key string) error { } func (c *Controller) handleDelOvnEip(key string) error { - klog.V(3).Infof("release ovn eip %s", key) + klog.V(3).Infof("handle del ovn eip %s", key) + cachedEip, err := c.ovnEipsLister.Get(key) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + return err + } + if cachedEip.Spec.Type == util.NodeExtGwUsingEip { + if err := c.ovnLegacyClient.DeleteLogicalSwitchPort(cachedEip.Name); err != nil { + klog.Errorf("failed to delete lsp %s, %v", cachedEip.Name, err) + return err + } + } + if err = c.handleDelOvnEipFinalizer(cachedEip); err != nil { + klog.Errorf("failed to remove finalizer from ovn eip, %v", err) + return err + } c.ipam.ReleaseAddressByPod(key) return nil } @@ -318,6 +350,7 @@ func (c *Controller) createOrUpdateCrdOvnEip(key, subnet, v4ip, v6ip, mac, usage Spec: kubeovnv1.OvnEipSpec{ ExternalSubnet: subnet, V4Ip: v4ip, + V6Ip: v6ip, MacAddress: mac, Type: usage, }, @@ -332,10 +365,12 @@ func (c *Controller) createOrUpdateCrdOvnEip(key, subnet, v4ip, v6ip, mac, usage return err } } else { - if cachedEip.Spec.V4Ip == "" && v4ip != "" { - ovnEip := cachedEip.DeepCopy() + ovnEip := cachedEip.DeepCopy() + if ovnEip.Spec.V4Ip == "" && v4ip != "" || + ovnEip.Spec.V6Ip == "" && v6ip != "" { ovnEip.Spec.ExternalSubnet = subnet ovnEip.Spec.V4Ip = v4ip + ovnEip.Spec.V6Ip = v6ip ovnEip.Spec.MacAddress = mac ovnEip.Spec.Type = usage if _, err := c.config.KubeOvnClient.KubeovnV1().OvnEips().Update(context.Background(), ovnEip, metav1.UpdateOptions{}); err != nil { @@ -344,9 +379,28 @@ func (c *Controller) createOrUpdateCrdOvnEip(key, subnet, v4ip, v6ip, mac, usage return errMsg } } + + if ovnEip.Status.MacAddress == "" { + ovnEip.Status.V4Ip = v4ip + ovnEip.Status.V6Ip = v6ip + ovnEip.Status.MacAddress = mac + bytes, err := ovnEip.Status.Bytes() + if err != nil { + klog.Error("failed to marshal ovn eip %s, %v", key, err) + return err + } + if _, err = c.config.KubeOvnClient.KubeovnV1().OvnEips().Patch(context.Background(), key, types.MergePatchType, + bytes, metav1.PatchOptions{}, "status"); err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + klog.Errorf("failed to patch ovn eip %s, %v", ovnEip.Name, err) + return err + } + } + var needUpdateLabel bool var op string - ovnEip := cachedEip.DeepCopy() if len(ovnEip.Labels) == 0 { op = "add" ovnEip.Labels = map[string]string{ @@ -374,7 +428,7 @@ func (c *Controller) createOrUpdateCrdOvnEip(key, subnet, v4ip, v6ip, mac, usage return nil } -func (c *Controller) patchOvnEipStatus(key string) error { +func (c *Controller) patchOvnEipStatus(key string, ready bool) error { cachedOvnEip, err := c.ovnEipsLister.Get(key) if err != nil { klog.Errorf("failed to get cached ovn eip '%s', %v", key, err) @@ -382,9 +436,14 @@ func (c *Controller) patchOvnEipStatus(key string) error { } ovnEip := cachedOvnEip.DeepCopy() changed := false + if ovnEip.Status.Ready != ready { + ovnEip.Status.Ready = ready + changed = true + } if ovnEip.Status.MacAddress == "" { // not support change ip ovnEip.Status.V4Ip = cachedOvnEip.Spec.V4Ip + ovnEip.Status.V6Ip = cachedOvnEip.Spec.V6Ip ovnEip.Status.MacAddress = cachedOvnEip.Spec.MacAddress changed = true } @@ -413,8 +472,9 @@ func (c *Controller) resetOvnEipSpec(key string) error { changed := false if ovnEip.Status.MacAddress != "" { // not support change ip - cachedOvnEip.Spec.V4Ip = ovnEip.Status.V4Ip - cachedOvnEip.Spec.MacAddress = ovnEip.Status.MacAddress + ovnEip.Spec.V4Ip = ovnEip.Status.V4Ip + ovnEip.Spec.V6Ip = ovnEip.Status.V6Ip + ovnEip.Spec.MacAddress = ovnEip.Status.MacAddress changed = true } if changed { diff --git a/pkg/controller/ovn_fip.go b/pkg/controller/ovn_fip.go index a42e142cae8..18b29da8564 100644 --- a/pkg/controller/ovn_fip.go +++ b/pkg/controller/ovn_fip.go @@ -241,7 +241,7 @@ func (c *Controller) handleAddOvnFip(key string) error { klog.Errorf("failed to update label for fip %s, %v", key, err) return err } - if err = c.patchOvnEipStatus(eipName); err != nil { + if err = c.patchOvnEipStatus(eipName, true); err != nil { klog.Errorf("failed to patch status for eip %s, %v", key, err) return err } @@ -324,7 +324,7 @@ func (c *Controller) handleUpdateOvnFip(key string) error { klog.Errorf("failed to patch status for fip '%s', %v", key, err) return err } - if err = c.patchOvnEipStatus(eipName); err != nil { + if err = c.patchOvnEipStatus(eipName, true); err != nil { klog.Errorf("failed to patch status for eip %s, %v", key, err) return err } diff --git a/pkg/controller/ovn_snat.go b/pkg/controller/ovn_snat.go index 020160aacc9..155e0b75abb 100644 --- a/pkg/controller/ovn_snat.go +++ b/pkg/controller/ovn_snat.go @@ -246,7 +246,7 @@ func (c *Controller) handleAddOvnSnatRule(key string) error { klog.Errorf("failed to patch label for snat %s, %v", key, err) return err } - if err = c.patchOvnEipStatus(eipName); err != nil { + if err = c.patchOvnEipStatus(eipName, true); err != nil { klog.Errorf("failed to patch status for eip %s, %v", key, err) return err } @@ -350,7 +350,7 @@ func (c *Controller) handleUpdateOvnSnatRule(key string) error { klog.Errorf("failed to patch label for snat %s, %v", key, err) return err } - if err = c.patchOvnEipStatus(eipName); err != nil { + if err = c.patchOvnEipStatus(eipName, true); err != nil { klog.Errorf("failed to patch status for eip %s, %v", key, err) return err } diff --git a/pkg/controller/vpc.go b/pkg/controller/vpc.go index 862a0ab48bb..60a28bcea84 100644 --- a/pkg/controller/vpc.go +++ b/pkg/controller/vpc.go @@ -619,11 +619,11 @@ func formatVpc(vpc *kubeovnv1.Vpc, c *Controller) error { return fmt.Errorf("invalid cidr %s: %w", item.CIDR, err) } } else if ip := net.ParseIP(item.CIDR); ip == nil { - return fmt.Errorf("invalid IP %s", item.CIDR) + return fmt.Errorf("invalid ip %s", item.CIDR) } // check next hop ip if ip := net.ParseIP(item.NextHopIP); ip == nil { - return fmt.Errorf("invalid next hop IP %s", item.NextHopIP) + return fmt.Errorf("invalid next hop ip %s", item.NextHopIP) } } @@ -634,8 +634,13 @@ func formatVpc(vpc *kubeovnv1.Vpc, c *Controller) error { changed = true } } else { - if ip := net.ParseIP(route.NextHopIP); ip == nil { - return fmt.Errorf("bad next hop ip: %s", route.NextHopIP) + // ecmp policy route may reroute to multiple next hop ips + for _, ipStr := range strings.Split(route.NextHopIP, ",") { + if ip := net.ParseIP(ipStr); ip == nil { + err := fmt.Errorf("invalid next hop ips: %s", route.NextHopIP) + klog.Error(err) + return err + } } } } @@ -819,9 +824,12 @@ func (c *Controller) handleAddVpcExternal(key string) error { mac = cachedEip.Spec.MacAddress } if v4ip == "" || mac == "" { - return fmt.Errorf("lrp '%s' ip or mac should not be empty", lrpEipName) + err := fmt.Errorf("lrp '%s' ip or mac should not be empty", lrpEipName) + klog.Error(err) + return err } - if err = c.patchOvnEipStatus(lrpEipName); err != nil { + if err = c.patchOvnEipStatus(lrpEipName, false); err != nil { + klog.Errorf("failed to patch ovn eip %s: %v", lrpEipName, err) return err } // init lrp gw chassis group @@ -840,11 +848,16 @@ func (c *Controller) handleAddVpcExternal(key string) error { klog.Errorf("failed to connect router '%s' to external, %v", key, err) return err } + if err = c.patchOvnEipStatus(lrpEipName, true); err != nil { + klog.Errorf("failed to patch ovn eip %s: %v", lrpEipName, err) + return err + } cachedVpc, err := c.vpcsLister.Get(key) if err != nil { if k8serrors.IsNotFound(err) { return nil } + klog.Error("failed to get vpc %s, %v", key, err) return err } vpc := cachedVpc.DeepCopy() diff --git a/pkg/daemon/controller.go b/pkg/daemon/controller.go index 5b12705b7c3..fabacbbce31 100644 --- a/pkg/daemon/controller.go +++ b/pkg/daemon/controller.go @@ -43,6 +43,9 @@ type Controller struct { subnetsSynced cache.InformerSynced subnetQueue workqueue.RateLimitingInterface + ovnEipsLister kubeovnlister.OvnEipLister + ovnEipsSynced cache.InformerSynced + podsLister listerv1.PodLister podsSynced cache.InformerSynced podQueue workqueue.RateLimitingInterface @@ -68,6 +71,7 @@ func NewController(config *Configuration, podInformerFactory informers.SharedInf providerNetworkInformer := kubeovnInformerFactory.Kubeovn().V1().ProviderNetworks() subnetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets() + ovnEipInformer := kubeovnInformerFactory.Kubeovn().V1().OvnEips() podInformer := podInformerFactory.Core().V1().Pods() nodeInformer := nodeInformerFactory.Core().V1().Nodes() @@ -83,6 +87,9 @@ func NewController(config *Configuration, podInformerFactory informers.SharedInf subnetsSynced: subnetInformer.Informer().HasSynced, subnetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Subnet"), + ovnEipsLister: ovnEipInformer.Lister(), + ovnEipsSynced: ovnEipInformer.Informer().HasSynced, + podsLister: podInformer.Lister(), podsSynced: podInformer.Informer().HasSynced, podQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Pod"), @@ -581,6 +588,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) { klog.Info("Started workers") go wait.Until(c.loopOvn0Check, 5*time.Second, stopCh) + go wait.Until(c.loopOvnExt0Check, 5*time.Second, stopCh) go wait.Until(c.runAddOrUpdateProviderNetworkWorker, time.Second, stopCh) go wait.Until(c.runDeleteProviderNetworkWorker, time.Second, stopCh) go wait.Until(c.runSubnetWorker, time.Second, stopCh) diff --git a/pkg/daemon/netns_linux.go b/pkg/daemon/netns_linux.go new file mode 100644 index 00000000000..493c234d919 --- /dev/null +++ b/pkg/daemon/netns_linux.go @@ -0,0 +1,61 @@ +package daemon + +import ( + "fmt" + "os" + "path" + + "github.com/kubeovn/kube-ovn/pkg/util" + "golang.org/x/sys/unix" +) + +// NsHandle is a handle to a network namespace. It can be cast directly +// to an int and used as a file descriptor. +type NsHandle int + +// GetFromPath gets a handle to a network namespace +// identified by the path +func GetNsFromPath(path string) (NsHandle, error) { + fd, err := unix.Open(path, unix.O_RDONLY|unix.O_CLOEXEC, 0) + if err != nil { + return -1, err + } + return NsHandle(fd), nil +} + +// GetFromThread gets a handle to the network namespace of a given pid and tid. +func GetNsFromThread(pid, tid int) (NsHandle, error) { + return GetNsFromPath(fmt.Sprintf("/proc/%d/task/%d/ns/net", pid, tid)) +} + +// Get gets a handle to the current threads network namespace. +func GetNs() (NsHandle, error) { + return GetNsFromThread(os.Getpid(), unix.Gettid()) +} + +// GetFromName gets a handle to a named network namespace such as one +// created by `ip netns add`. +func GetNsFromName(name string) (NsHandle, error) { + return GetNsFromPath(fmt.Sprintf("/var/run/netns/%s", name)) +} + +// None gets an empty (closed) NsHandle. +func ClosedNs() NsHandle { + return NsHandle(-1) +} + +// DeleteNamed deletes a named network namespace +// ip netns del +func DeleteNamedNs(name string) error { + namedPath := path.Join(util.BindMountPath, name) + if _, err := os.Stat(namedPath); os.IsNotExist(err) { + // already deleted + return nil + } + err := unix.Unmount(namedPath, unix.MNT_DETACH) + if err != nil { + return err + } + + return os.Remove(namedPath) +} diff --git a/pkg/daemon/ovs_linux.go b/pkg/daemon/ovs_linux.go index 11124a59f12..0a2a07eb6c5 100644 --- a/pkg/daemon/ovs_linux.go +++ b/pkg/daemon/ovs_linux.go @@ -1,6 +1,8 @@ package daemon import ( + "context" + "encoding/json" "errors" "fmt" "net" @@ -19,6 +21,8 @@ import ( "github.com/containernetworking/plugins/pkg/utils/sysctl" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" @@ -426,6 +430,277 @@ func (c *Controller) loopOvn0Check() { } } +func checkNodeGwNicInNs(ip, gw string, gwNS ns.NetNS) error { + exists, err := ovs.PortExists(util.NodeGwNic) + if err != nil { + klog.Error(err) + return err + } + if exists { + return ns.WithNetNSPath(gwNS.Path(), func(_ ns.NetNS) error { + return waitNetworkReady(util.NodeGwNic, ip, gw, true, false, 3) + }) + } else { + err := fmt.Errorf("node external gw not ready") + klog.Error(err) + return err + } +} + +func configureNodeGwNic(portName, ip, gw string, macAddr net.HardwareAddr, mtu int, gwNS ns.NetNS) error { + ipStr := util.GetIpWithoutMask(ip) + output, err := ovs.Exec(ovs.MayExist, "add-port", "br-int", util.NodeGwNic, "--", + "set", "interface", util.NodeGwNic, "type=internal", "--", + "set", "interface", util.NodeGwNic, fmt.Sprintf("external_ids:iface-id=%s", portName), + fmt.Sprintf("external_ids:ip=%s", ipStr), + fmt.Sprintf("external_ids:pod_netns=%s", util.NodeGwNsPath)) + if err != nil { + klog.Errorf("failed to configure node nic %s: %v, %q", portName, err, output) + return fmt.Errorf(output) + } + gwLink, err := netlink.LinkByName(util.NodeGwNic) + if err != nil { + return fmt.Errorf("can not found node gw nic %s, %v", util.NodeGwNic, err) + } + if err = netlink.LinkSetNsFd(gwLink, int(gwNS.Fd())); err != nil { + return fmt.Errorf("failed to move link into netns: %v", err) + } + return ns.WithNetNSPath(gwNS.Path(), func(_ ns.NetNS) error { + if util.CheckProtocol(ip) == kubeovnv1.ProtocolDual || util.CheckProtocol(ip) == kubeovnv1.ProtocolIPv6 { + // For docker version >=17.x the "none" network will disable ipv6 by default. + // We have to enable ipv6 here to add v6 address and gateway. + // See https://github.com/containernetworking/cni/issues/531 + value, err := sysctl.Sysctl("net.ipv6.conf.all.disable_ipv6") + if err != nil { + return fmt.Errorf("failed to get sysctl net.ipv6.conf.all.disable_ipv6: %v", err) + } + if value != "0" { + if _, err = sysctl.Sysctl("net.ipv6.conf.all.disable_ipv6", "0"); err != nil { + return fmt.Errorf("failed to enable ipv6 on all nic: %v", err) + } + } + } + + if err = configureNic(util.NodeGwNic, ip, macAddr, mtu, true); err != nil { + klog.Errorf("failed to congigure node gw nic %s, %v", util.NodeGwNic, err) + return err + } + + if err = configureLoNic(); err != nil { + klog.Errorf("failed to configure nic %s, %v", util.LoNic, err) + return err + } + switch util.CheckProtocol(ip) { + case kubeovnv1.ProtocolIPv4: + _, defaultNet, _ := net.ParseCIDR("0.0.0.0/0") + err = netlink.RouteReplace(&netlink.Route{ + LinkIndex: gwLink.Attrs().Index, + Scope: netlink.SCOPE_UNIVERSE, + Dst: defaultNet, + Gw: net.ParseIP(gw), + }) + case kubeovnv1.ProtocolIPv6: + _, defaultNet, _ := net.ParseCIDR("::/0") + err = netlink.RouteReplace(&netlink.Route{ + LinkIndex: gwLink.Attrs().Index, + Scope: netlink.SCOPE_UNIVERSE, + Dst: defaultNet, + Gw: net.ParseIP(gw), + }) + case kubeovnv1.ProtocolDual: + gws := strings.Split(gw, ",") + _, defaultNet, _ := net.ParseCIDR("0.0.0.0/0") + err = netlink.RouteReplace(&netlink.Route{ + LinkIndex: gwLink.Attrs().Index, + Scope: netlink.SCOPE_UNIVERSE, + Dst: defaultNet, + Gw: net.ParseIP(gws[0]), + }) + if err != nil { + return fmt.Errorf("config v4 gateway failed: %v", err) + } + + _, defaultNet, _ = net.ParseCIDR("::/0") + err = netlink.RouteReplace(&netlink.Route{ + LinkIndex: gwLink.Attrs().Index, + Scope: netlink.SCOPE_UNIVERSE, + Dst: defaultNet, + Gw: net.ParseIP(gws[1]), + }) + } + if err != nil { + return fmt.Errorf("failed to configure gateway: %v", err) + } + return waitNetworkReady(util.NodeGwNic, ip, gw, true, true, 3) + }) +} + +func removeNodeGwNic() error { + if _, err := ovs.Exec(ovs.IfExists, "del-port", "br-int", util.NodeGwNic); err != nil { + return fmt.Errorf("failed to remove ecmp external port %s from OVS bridge %s: %v", "br-int", util.NodeGwNic, err) + } + klog.Infof("removed node external gw nic %q", util.NodeGwNic) + return nil +} + +func removeNodeGwNs() error { + if err := DeleteNamedNs(util.NodeGwNs); err != nil { + return fmt.Errorf("failed to remove node external gw ns %s: %v", util.NodeGwNs, err) + } + klog.Infof("node external gw ns %s removed", util.NodeGwNs) + return nil +} + +// If OVS restart, the ovnext0 port will down and prevent host to pod network, +// Restart the kube-ovn-cni when this happens +func (c *Controller) loopOvnExt0Check() { + node, err := c.nodesLister.Get(c.config.NodeName) + if err != nil { + klog.Errorf("failed to get node %s: %v", c.config.NodeName, err) + return + } + + portName := node.Name + cachedEip, err := c.ovnEipsLister.Get(portName) + needClean := false + if err != nil { + if k8serrors.IsNotFound(err) { + if _, ok := node.Labels[util.NodeExtGwLabel]; !ok { + return + } + if node.Labels[util.NodeExtGwLabel] == "false" { + // not gw node and already clean + return + } + needClean = true + } else { + klog.Errorf("failed to get ecmp gateway ovn eip, %v", err) + return + } + } + + if needClean { + if err := removeNodeGwNic(); err != nil { + klog.Error(err) + return + } + if err := removeNodeGwNs(); err != nil { + klog.Error(err) + return + } + if err = c.patchNodeExternalGwLabel(node.Name, false); err != nil { + klog.Errorf("failed to patch label on node %s, %v", node, err) + return + } + return + } + + if cachedEip.Status.MacAddress == "" { + klog.Errorf("ecmp gateway ovn eip not ready, can not setup ecmp gateway") + return + } + ips := util.GetStringIP(cachedEip.Status.V4Ip, cachedEip.Status.V6Ip) + cachedSubnet, err := c.subnetsLister.Get(cachedEip.Spec.ExternalSubnet) + if err != nil { + klog.Errorf("failed to get external subnet %s, %v", cachedEip.Spec.ExternalSubnet, err) + return + } + gw := cachedSubnet.Spec.Gateway + mac, err := net.ParseMAC(cachedEip.Status.MacAddress) + if err != nil { + klog.Errorf("failed to parse mac %s, %v", cachedEip.Status.MacAddress, err) + return + } + gwNS, err := ns.GetNS(util.NodeGwNsPath) + if err != nil { + // ns not exist, create node external gw ns + cmd := exec.Command("sh", "-c", fmt.Sprintf("/usr/sbin/ip netns add %s", util.NodeGwNs)) + if err := cmd.Run(); err != nil { + err := fmt.Errorf("failed to get create gw ns %s, %v", util.NodeGwNs, err) + klog.Error(err) + return + } + if gwNS, err = ns.GetNS(util.NodeGwNsPath); err != nil { + err := fmt.Errorf("failed to get node gw ns %s, %v", util.NodeGwNs, err) + klog.Error(err) + return + } + } + ipAddr := util.GetIpAddrWithMask(ips, cachedSubnet.Spec.CIDRBlock) + if err := checkNodeGwNicInNs(ipAddr, gw, gwNS); err == nil { + // already ready + return + } + klog.Infof("setup nic ovnext0 ip %s, mac %v, mtu %d", ipAddr, mac, c.config.MTU) + if err := configureNodeGwNic(portName, ipAddr, gw, mac, c.config.MTU, gwNS); err != nil { + klog.Errorf("failed to setup ovnext0, %v", err) + return + } + if err = c.patchNodeExternalGwLabel(portName, true); err != nil { + klog.Errorf("failed to patch label on node %s, %v", node, err) + return + } + if err = c.patchOvnEipStatus(portName, true); err != nil { + klog.Errorf("failed to patch status for eip %s, %v", portName, err) + return + } +} + +func (c *Controller) patchOvnEipStatus(key string, ready bool) error { + cachedOvnEip, err := c.ovnEipsLister.Get(key) + if err != nil { + klog.Errorf("failed to get cached ovn eip '%s', %v", key, err) + return err + } + ovnEip := cachedOvnEip.DeepCopy() + changed := false + if ovnEip.Status.Ready != ready { + ovnEip.Status.Ready = ready + changed = true + } + if changed { + bytes, err := ovnEip.Status.Bytes() + if err != nil { + klog.Errorf("failed to marshal ovn eip status '%s', %v", key, err) + return err + } + if _, err = c.config.KubeOvnClient.KubeovnV1().OvnEips().Patch(context.Background(), ovnEip.Name, + types.MergePatchType, bytes, metav1.PatchOptions{}, "status"); err != nil { + klog.Errorf("failed to patch status for ovn eip '%s', %v", key, err) + return err + } + } + return nil +} + +func (c *Controller) patchNodeExternalGwLabel(key string, enabled bool) error { + node, err := c.nodesLister.Get(c.config.NodeName) + if err != nil { + klog.Errorf("failed to get node %s: %v", c.config.NodeName, err) + return err + } + + if enabled { + node.Labels[util.NodeExtGwLabel] = "true" + } else { + node.Labels[util.NodeExtGwLabel] = "false" + } + + patchPayloadTemplate := `[{ "op": "%s", "path": "/metadata/labels", "value": %s }]` + op := "replace" + if len(node.Labels) == 0 { + op = "add" + } + + raw, _ := json.Marshal(node.Labels) + patchPayload := fmt.Sprintf(patchPayloadTemplate, op, raw) + if _, err = c.config.KubeClient.CoreV1().Nodes().Patch(context.Background(), key, types.JSONPatchType, []byte(patchPayload), metav1.PatchOptions{}); err != nil { + klog.Errorf("failed to patch node %s: %v", node.Name, err) + return err + } + return nil +} + func configureMirrorLink(portName string, mtu int) error { mirrorLink, err := netlink.LinkByName(portName) if err != nil { @@ -525,6 +800,25 @@ func configureNic(link, ip string, macAddr net.HardwareAddr, mtu int, detectIPCo return nil } +func configureLoNic() error { + loLink, err := netlink.LinkByName(util.LoNic) + if err != nil { + err := fmt.Errorf("can not find nic %s, %v", util.LoNic, err) + klog.Error(err) + return err + } + + if loLink.Attrs().OperState != netlink.OperUp { + if err = netlink.LinkSetUp(loLink); err != nil { + err := fmt.Errorf("failed to set up nic %s, %v", util.LoNic, err) + klog.Error(err) + return err + } + } + + return nil +} + // Add host nic to external bridge // Mac address, MTU, IP addresses & routes will be copied/transferred to the external bridge func configProviderNic(nicName, brName string) (int, error) { diff --git a/pkg/daemon/ovs_windows.go b/pkg/daemon/ovs_windows.go index b214537ae40..3fbfb2e0e75 100644 --- a/pkg/daemon/ovs_windows.go +++ b/pkg/daemon/ovs_windows.go @@ -280,6 +280,10 @@ func (c *Controller) loopOvn0Check() { // no need to check ovn0 on Windows } +func (c *Controller) loopOvnExt0Check() { + // no need to check ovnext0 on Windows +} + func configureMirrorLink(portName string, mtu int) error { adapter, err := util.GetNetAdapter(portName, false) if err != nil { diff --git a/pkg/util/const.go b/pkg/util/const.go index 06cb9636aa4..54dcd4456f3 100644 --- a/pkg/util/const.go +++ b/pkg/util/const.go @@ -23,6 +23,13 @@ const ( VipAnnotation = "ovn.kubernetes.io/vip" ChassisAnnotation = "ovn.kubernetes.io/chassis" + ExternalIpAnnotation = "ovn.kubernetes.io/external_ip" + ExternalMacAnnotation = "ovn.kubernetes.io/external_mac" + ExternalCidrAnnotation = "ovn.kubernetes.io/external_cidr" + ExternalSwitchAnnotation = "ovn.kubernetes.io/external_switch" + ExternalGatewayAnnotation = "ovn.kubernetes.io/external_gateway" + ExternalGwPortNameAnnotation = "ovn.kubernetes.io/external_gw_port_name" + VpcNatGatewayAnnotation = "ovn.kubernetes.io/vpc_nat_gw" VpcNatGatewayInitAnnotation = "ovn.kubernetes.io/vpc_nat_gw_init" VpcEipsAnnotation = "ovn.kubernetes.io/vpc_eips" @@ -91,6 +98,7 @@ const ( SubnetNameLabel = "ovn.kubernetes.io/subnet" ICGatewayLabel = "ovn.kubernetes.io/ic-gw" ExGatewayLabel = "ovn.kubernetes.io/external-gw" + NodeExtGwLabel = "ovn.kubernetes.io/node-ext-gw" VpcNatGatewayLabel = "ovn.kubernetes.io/vpc-nat-gw" IpReservedLabel = "ovn.kubernetes.io/ip_reserved" VpcNatGatewayNameLabel = "ovn.kubernetes.io/vpc-nat-gw-name" @@ -107,6 +115,12 @@ const ( NetworkTypeVxlan = "vxlan" NetworkTypeStt = "stt" + LoNic = "lo" + NodeGwNic = "ovnext0" + NodeGwNs = "ovnext" + NodeGwNsPath = "/var/run/netns/ovnext" + BindMountPath = "/run/netns" + NodeNic = "ovn0" NodeAllowPriority = "3000" @@ -150,10 +164,11 @@ const ( EcmpRouteType = "ecmp" NormalRouteType = "normal" - LrpUsingEip = "lrp" - FipUsingEip = "fip" - SnatUsingEip = "snat" - DnatUsingEip = "dnat" + LrpUsingEip = "lrp" + FipUsingEip = "fip" + SnatUsingEip = "snat" + DnatUsingEip = "dnat" + NodeExtGwUsingEip = "node-ext-gw" OvnFip = "ovn" IptablesFip = "iptables" diff --git a/test/e2e/framework/ovn-eip.go b/test/e2e/framework/ovn-eip.go new file mode 100644 index 00000000000..274cc17380f --- /dev/null +++ b/test/e2e/framework/ovn-eip.go @@ -0,0 +1,179 @@ +package framework + +import ( + "context" + "fmt" + "math/big" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/onsi/gomega" + + apiv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" + v1 "github.com/kubeovn/kube-ovn/pkg/client/clientset/versioned/typed/kubeovn/v1" + "github.com/kubeovn/kube-ovn/pkg/util" +) + +// OvnEipClient is a struct for ovn eip client. +type OvnEipClient struct { + f *Framework + v1.OvnEipInterface +} + +func (f *Framework) OvnEipClient() *OvnEipClient { + return &OvnEipClient{ + f: f, + OvnEipInterface: f.KubeOVNClientSet.KubeovnV1().OvnEips(), + } +} + +func (s *OvnEipClient) Get(name string) *apiv1.OvnEip { + eip, err := s.OvnEipInterface.Get(context.TODO(), name, metav1.GetOptions{}) + ExpectNoError(err) + return eip +} + +// Create creates a new ovn eip according to the framework specifications +func (c *OvnEipClient) Create(eip *apiv1.OvnEip) *apiv1.OvnEip { + eip, err := c.OvnEipInterface.Create(context.TODO(), eip, metav1.CreateOptions{}) + ExpectNoError(err, "Error creating ovn eip") + return eip.DeepCopy() +} + +// CreateSync creates a new ovn eip according to the framework specifications, and waits for it to be ready. +func (c *OvnEipClient) CreateSync(eip *apiv1.OvnEip) *apiv1.OvnEip { + eip = c.Create(eip) + ExpectTrue(c.WaitToBeReady(eip.Name, timeout)) + // Get the newest ovn eip after it becomes ready + return c.Get(eip.Name).DeepCopy() +} + +// Patch patches the ovn eip +func (c *OvnEipClient) Patch(original, modified *apiv1.OvnEip) *apiv1.OvnEip { + patch, err := util.GenerateMergePatchPayload(original, modified) + ExpectNoError(err) + + var patchedOvnEip *apiv1.OvnEip + err = wait.PollImmediate(2*time.Second, timeout, func() (bool, error) { + eip, err := c.OvnEipInterface.Patch(context.TODO(), original.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "") + if err != nil { + return handleWaitingAPIError(err, false, "patch ovn eip %q", original.Name) + } + patchedOvnEip = eip + return true, nil + }) + if err == nil { + return patchedOvnEip.DeepCopy() + } + + if IsTimeout(err) { + Failf("timed out while retrying to patch ovn eip %s", original.Name) + } + ExpectNoError(maybeTimeoutError(err, "patching ovn eip %s", original.Name)) + + return nil +} + +// PatchSync patches the ovn eip and waits for the ovn eip to be ready for `timeout`. +// If the ovn eip doesn't become ready before the timeout, it will fail the test. +func (c *OvnEipClient) PatchSync(original, modified *apiv1.OvnEip, requiredNodes []string, timeout time.Duration) *apiv1.OvnEip { + eip := c.Patch(original, modified) + ExpectTrue(c.WaitToBeUpdated(eip, timeout)) + ExpectTrue(c.WaitToBeReady(eip.Name, timeout)) + // Get the newest ovn eip after it becomes ready + return c.Get(eip.Name).DeepCopy() +} + +// Delete deletes a ovn eip if the ovn eip exists +func (c *OvnEipClient) Delete(name string) { + err := c.OvnEipInterface.Delete(context.TODO(), name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + Failf("Failed to delete ovn eip %q: %v", name, err) + } +} + +// DeleteSync deletes the ovn eip and waits for the ovn eip to disappear for `timeout`. +// If the ovn eip doesn't disappear before the timeout, it will fail the test. +func (c *OvnEipClient) DeleteSync(name string) { + c.Delete(name) + gomega.Expect(c.WaitToDisappear(name, 2*time.Second, timeout)).To(gomega.Succeed(), "wait for ovn eip %q to disappear", name) +} + +// WaitToBeReady returns whether the ovn eip is ready within timeout. +func (c *OvnEipClient) WaitToBeReady(name string, timeout time.Duration) bool { + for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) { + if c.Get(name).Status.Ready { + return true + } + } + return false +} + +// WaitToBeUpdated returns whether the ovn eip is updated within timeout. +func (c *OvnEipClient) WaitToBeUpdated(eip *apiv1.OvnEip, timeout time.Duration) bool { + Logf("Waiting up to %v for ovn eip %s to be updated", timeout, eip.Name) + rv, _ := big.NewInt(0).SetString(eip.ResourceVersion, 10) + for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) { + s := c.Get(eip.Name) + if current, _ := big.NewInt(0).SetString(s.ResourceVersion, 10); current.Cmp(rv) > 0 { + return true + } + } + Logf("OvnEip %s was not updated within %v", eip.Name, timeout) + return false +} + +// WaitToDisappear waits the given timeout duration for the specified ovn eip to disappear. +func (c *OvnEipClient) WaitToDisappear(name string, interval, timeout time.Duration) error { + var lastOvnEip *apiv1.OvnEip + err := wait.PollImmediate(interval, timeout, func() (bool, error) { + Logf("Waiting for ovn eip %s to disappear", name) + subnets, err := c.List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return handleWaitingAPIError(err, true, "listing subnets") + } + found := false + for i, subnet := range subnets.Items { + if subnet.Name == name { + Logf("ovn eip %s still exists", name) + found = true + lastOvnEip = &(subnets.Items[i]) + break + } + } + if !found { + Logf("ovn eip %s no longer exists", name) + return true, nil + } + return false, nil + }) + if err == nil { + return nil + } + if IsTimeout(err) { + return TimeoutError(fmt.Sprintf("timed out while waiting for subnet %s to disappear", name), + lastOvnEip, + ) + } + return maybeTimeoutError(err, "waiting for subnet %s to disappear", name) +} + +func MakeOvnEip(name, subnet, v4ip, v6ip, mac, usage string) *apiv1.OvnEip { + eip := &apiv1.OvnEip{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: apiv1.OvnEipSpec{ + ExternalSubnet: subnet, + V4Ip: v4ip, + V6Ip: v6ip, + MacAddress: mac, + Type: usage, + }, + } + return eip +} diff --git a/test/e2e/ovn-eip/e2e_test.go b/test/e2e/ovn-eip/e2e_test.go new file mode 100644 index 00000000000..bccfcf5667d --- /dev/null +++ b/test/e2e/ovn-eip/e2e_test.go @@ -0,0 +1,350 @@ +package ovn_eip + +import ( + "flag" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + "testing" + "time" + + dockertypes "github.com/docker/docker/api/types" + "github.com/onsi/ginkgo/v2" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + "k8s.io/kubernetes/test/e2e" + k8sframework "k8s.io/kubernetes/test/e2e/framework" + "k8s.io/kubernetes/test/e2e/framework/config" + e2enode "k8s.io/kubernetes/test/e2e/framework/node" + + apiv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1" + "github.com/kubeovn/kube-ovn/pkg/util" + "github.com/kubeovn/kube-ovn/test/e2e/framework" + "github.com/kubeovn/kube-ovn/test/e2e/framework/docker" + "github.com/kubeovn/kube-ovn/test/e2e/framework/iproute" + "github.com/kubeovn/kube-ovn/test/e2e/framework/kind" +) + +const dockerNetworkName = "kube-ovn-vlan" + +func makeProviderNetwork(providerNetworkName string, exchangeLinkName bool, linkMap map[string]*iproute.Link) *apiv1.ProviderNetwork { + var defaultInterface string + customInterfaces := make(map[string][]string, 0) + for node, link := range linkMap { + if !strings.ContainsRune(node, '-') { + continue + } + + if defaultInterface == "" { + defaultInterface = link.IfName + } else if link.IfName != defaultInterface { + customInterfaces[link.IfName] = append(customInterfaces[link.IfName], node) + } + } + + return framework.MakeProviderNetwork(providerNetworkName, exchangeLinkName, defaultInterface, customInterfaces, nil) +} + +func makeOvnEip(name, subnet, v4ip, v6ip, mac, usage string) *apiv1.OvnEip { + return framework.MakeOvnEip(name, subnet, v4ip, v6ip, mac, usage) +} + +var _ = framework.Describe("[group:ovn-eip]", func() { + f := framework.NewDefaultFramework("ovn-eip") + + var skip bool + var itFn func(bool) + var cs clientset.Interface + var nodeNames []string + var clusterName, providerNetworkName, vlanName, subnetName, podName, namespaceName string + var linkMap map[string]*iproute.Link + var podClient *framework.PodClient + var subnetClient *framework.SubnetClient + var vlanClient *framework.VlanClient + var providerNetworkClient *framework.ProviderNetworkClient + var ovnEipClient *framework.OvnEipClient + var dockerNetwork *dockertypes.NetworkResource + var containerID string + var image string + + ginkgo.BeforeEach(func() { + cs = f.ClientSet + podClient = f.PodClient() + subnetClient = f.SubnetClient() + vlanClient = f.VlanClient() + providerNetworkClient = f.ProviderNetworkClient() + ovnEipClient = f.OvnEipClient() + namespaceName = f.Namespace.Name + podName = "pod-" + framework.RandomSuffix() + subnetName = "subnet-" + framework.RandomSuffix() + vlanName = "vlan-" + framework.RandomSuffix() + providerNetworkName = "pn-" + framework.RandomSuffix() + containerID = "" + if image == "" { + image = framework.GetKubeOvnImage(cs) + } + + if skip { + ginkgo.Skip("underlay spec only runs on kind clusters") + } + + if clusterName == "" { + ginkgo.By("Getting k8s nodes") + k8sNodes, err := e2enode.GetReadySchedulableNodes(cs) + framework.ExpectNoError(err) + + cluster, ok := kind.IsKindProvided(k8sNodes.Items[0].Spec.ProviderID) + if !ok { + skip = true + ginkgo.Skip("underlay spec only runs on kind clusters") + } + clusterName = cluster + } + + if dockerNetwork == nil { + ginkgo.By("Ensuring docker network " + dockerNetworkName + " exists") + network, err := docker.NetworkCreate(dockerNetworkName, true, true) + framework.ExpectNoError(err, "creating docker network "+dockerNetworkName) + dockerNetwork = network + } + + ginkgo.By("Getting kind nodes") + nodes, err := kind.ListNodes(clusterName, "") + framework.ExpectNoError(err, "getting nodes in kind cluster") + framework.ExpectNotEmpty(nodes) + + ginkgo.By("Connecting nodes to the docker network") + err = kind.NetworkConnect(dockerNetwork.ID, nodes) + framework.ExpectNoError(err, "connecting nodes to network "+dockerNetworkName) + + ginkgo.By("Getting node links that belong to the docker network") + nodes, err = kind.ListNodes(clusterName, "") + framework.ExpectNoError(err, "getting nodes in kind cluster") + + linkMap = make(map[string]*iproute.Link, len(nodes)) + nodeNames = make([]string, 0, len(nodes)) + // ovn eip name is the same as node name in this scenario + + for _, node := range nodes { + links, err := node.ListLinks() + framework.ExpectNoError(err, "failed to list links on node %s: %v", node.Name(), err) + + for _, link := range links { + if link.Address == node.NetworkSettings.Networks[dockerNetworkName].MacAddress { + linkMap[node.ID] = &link + break + } + } + framework.ExpectHaveKey(linkMap, node.ID) + linkMap[node.Name()] = linkMap[node.ID] + nodeNames = append(nodeNames, node.Name()) + } + + itFn = func(exchangeLinkName bool) { + ginkgo.By("Creating provider network") + pn := makeProviderNetwork(providerNetworkName, exchangeLinkName, linkMap) + pn = providerNetworkClient.CreateSync(pn) + + ginkgo.By("Getting k8s nodes") + k8sNodes, err := e2enode.GetReadySchedulableNodes(cs) + framework.ExpectNoError(err) + + ginkgo.By("Validating node labels") + for _, node := range k8sNodes.Items { + link := linkMap[node.Name] + framework.ExpectHaveKeyWithValue(node.Labels, fmt.Sprintf(util.ProviderNetworkInterfaceTemplate, providerNetworkName), link.IfName) + framework.ExpectHaveKeyWithValue(node.Labels, fmt.Sprintf(util.ProviderNetworkReadyTemplate, providerNetworkName), "true") + framework.ExpectHaveKeyWithValue(node.Labels, fmt.Sprintf(util.ProviderNetworkMtuTemplate, providerNetworkName), strconv.Itoa(link.Mtu)) + framework.ExpectNotHaveKey(node.Labels, fmt.Sprintf(util.ProviderNetworkExcludeTemplate, providerNetworkName)) + } + + ginkgo.By("Validating provider network status") + framework.ExpectEqual(pn.Status.Ready, true, "field .status.ready should be true") + framework.ExpectConsistOf(pn.Status.ReadyNodes, nodeNames) + framework.ExpectEmpty(pn.Status.Vlans) + + ginkgo.By("Getting kind nodes") + kindNodes, err := kind.ListNodes(clusterName, "") + framework.ExpectNoError(err) + + ginkgo.By("Validating node links") + linkNameMap := make(map[string]string, len(kindNodes)) + bridgeName := util.ExternalBridgeName(providerNetworkName) + for _, node := range kindNodes { + if exchangeLinkName { + bridgeName = linkMap[node.ID].IfName + } + + links, err := node.ListLinks() + framework.ExpectNoError(err, "failed to list links on node %s: %v", node.Name(), err) + + var port, bridge *iproute.Link + for i, link := range links { + if link.IfIndex == linkMap[node.ID].IfIndex { + port = &links[i] + } else if link.IfName == bridgeName { + bridge = &links[i] + } + if port != nil && bridge != nil { + break + } + } + framework.ExpectNotNil(port) + framework.ExpectEqual(port.Address, linkMap[node.ID].Address) + framework.ExpectEqual(port.Mtu, linkMap[node.ID].Mtu) + framework.ExpectEqual(port.Master, "ovs-system") + framework.ExpectEqual(port.OperState, "UP") + if exchangeLinkName { + framework.ExpectEqual(port.IfName, util.ExternalBridgeName(providerNetworkName)) + } + + framework.ExpectNotNil(bridge) + framework.ExpectEqual(bridge.LinkInfo.InfoKind, "openvswitch") + framework.ExpectEqual(bridge.Address, port.Address) + framework.ExpectEqual(bridge.Mtu, port.Mtu) + framework.ExpectEqual(bridge.OperState, "UNKNOWN") + framework.ExpectContainElement(bridge.Flags, "UP") + + framework.ExpectEmpty(port.NonLinkLocalAddresses()) + framework.ExpectConsistOf(bridge.NonLinkLocalAddresses(), linkMap[node.ID].NonLinkLocalAddresses()) + + linkNameMap[node.ID] = port.IfName + } + } + }) + ginkgo.AfterEach(func() { + if containerID != "" { + ginkgo.By("Deleting container " + containerID) + err := docker.ContainerRemove(containerID) + framework.ExpectNoError(err) + } + + ginkgo.By("Deleting pod " + podName) + podClient.DeleteSync(podName) + + ginkgo.By("Deleting subnet " + subnetName) + subnetClient.DeleteSync(subnetName) + + ginkgo.By("Deleting vlan " + vlanName) + vlanClient.Delete(vlanName, metav1.DeleteOptions{}) + + ginkgo.By("Deleting provider network") + providerNetworkClient.DeleteSync(providerNetworkName) + + ginkgo.By("Getting nodes") + nodes, err := kind.ListNodes(clusterName, "") + framework.ExpectNoError(err, "getting nodes in cluster") + + ginkgo.By("Waiting for ovs bridge to disappear") + deadline := time.Now().Add(time.Minute) + for _, node := range nodes { + err = node.WaitLinkToDisappear(util.ExternalBridgeName(providerNetworkName), 2*time.Second, deadline) + framework.ExpectNoError(err, "timed out waiting for ovs bridge to disappear in node %s", node.Name()) + } + + if dockerNetwork != nil { + ginkgo.By("Disconnecting nodes from the docker network") + err = kind.NetworkDisconnect(dockerNetwork.ID, nodes) + framework.ExpectNoError(err, "disconnecting nodes from network "+dockerNetworkName) + } + }) + + framework.ConformanceIt("should be able to sync node external", func() { + // create provider network + itFn(false) + + // create vlan subnet eip + ginkgo.By("Getting docker network " + dockerNetworkName) + network, err := docker.NetworkInspect(dockerNetworkName) + framework.ExpectNoError(err, "getting docker network "+dockerNetworkName) + + ginkgo.By("Creating vlan " + vlanName) + vlan := framework.MakeVlan(vlanName, providerNetworkName, 0) + _ = vlanClient.Create(vlan) + + ginkgo.By("Creating subnet " + subnetName) + cidr := make([]string, 0, 2) + gateway := make([]string, 0, 2) + for _, config := range dockerNetwork.IPAM.Config { + switch util.CheckProtocol(config.Subnet) { + case apiv1.ProtocolIPv4: + if f.ClusterIpFamily != "ipv6" { + cidr = append(cidr, config.Subnet) + gateway = append(gateway, config.Gateway) + } + case apiv1.ProtocolIPv6: + if f.ClusterIpFamily != "ipv4" { + cidr = append(cidr, config.Subnet) + gateway = append(gateway, config.Gateway) + } + } + } + excludeIPs := make([]string, 0, len(network.Containers)*2) + for _, container := range network.Containers { + if container.IPv4Address != "" && f.ClusterIpFamily != "ipv6" { + excludeIPs = append(excludeIPs, strings.Split(container.IPv4Address, "/")[0]) + } + if container.IPv6Address != "" && f.ClusterIpFamily != "ipv4" { + excludeIPs = append(excludeIPs, strings.Split(container.IPv6Address, "/")[0]) + } + } + subnet := framework.MakeSubnet(subnetName, vlanName, strings.Join(cidr, ","), strings.Join(gateway, ","), excludeIPs, nil, []string{namespaceName}) + _ = subnetClient.CreateSync(subnet) + + ginkgo.By("Getting k8s nodes") + k8sNodes, err := e2enode.GetReadySchedulableNodes(cs) + framework.ExpectNoError(err) + + for _, node := range k8sNodes.Items { + ginkgo.By("Creating ovn eip " + node.Name) + eip := makeOvnEip(node.Name, subnetName, "", "", "", util.NodeExtGwUsingEip) + _ = ovnEipClient.CreateSync(eip) + } + + k8sNodes, err = e2enode.GetReadySchedulableNodes(cs) + framework.ExpectNoError(err) + for _, node := range k8sNodes.Items { + // label should be true after setup node external gw + framework.ExpectHaveKeyWithValue(node.Labels, util.NodeExtGwLabel, "true") + } + + k8sNodes, err = e2enode.GetReadySchedulableNodes(cs) + framework.ExpectNoError(err) + for _, node := range k8sNodes.Items { + ginkgo.By("Deleting ovn eip " + node.Name) + ovnEipClient.DeleteSync(node.Name) + } + + time.Sleep(10 * time.Second) + // clean process should be finished in 10s + k8sNodes, err = e2enode.GetReadySchedulableNodes(cs) + framework.ExpectNoError(err) + for _, node := range k8sNodes.Items { + // label should be false after remove node external gw + framework.ExpectHaveKeyWithValue(node.Labels, util.NodeExtGwLabel, "false") + } + }) +}) + +func init() { + klog.SetOutput(ginkgo.GinkgoWriter) + + // Register flags. + config.CopyFlags(config.Flags, flag.CommandLine) + k8sframework.RegisterCommonFlags(flag.CommandLine) + k8sframework.RegisterClusterFlags(flag.CommandLine) + + // Parse all the flags + flag.Parse() + if k8sframework.TestContext.KubeConfig == "" { + k8sframework.TestContext.KubeConfig = filepath.Join(os.Getenv("HOME"), ".kube", "config") + } + k8sframework.AfterReadingAllFlags(&k8sframework.TestContext) +} + +func TestE2E(t *testing.T) { + e2e.RunE2ETests(t) +}