Skip to content

Commit

Permalink
replace lr/ls/lrp/lsp function call with ovnClient (#2477)
Browse files Browse the repository at this point in the history
Co-authored-by: liguo <li.guo@99cloud.net>
  • Loading branch information
gugulee and liguo committed Mar 16, 2023
1 parent 599ed23 commit 443dd58
Show file tree
Hide file tree
Showing 24 changed files with 736 additions and 1,262 deletions.
37 changes: 24 additions & 13 deletions pkg/controller/controller.go
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -868,17 +869,14 @@ func (c *Controller) startWorkers(ctx context.Context) {
go wait.Until(c.runAddSubnetWorker, time.Second, ctx.Done())
go wait.Until(c.runAddVlanWorker, time.Second, ctx.Done())
go wait.Until(c.runAddNamespaceWorker, time.Second, ctx.Done())
for {
klog.Infof("wait for %s and %s ready", c.config.DefaultLogicalSwitch, c.config.NodeSwitch)
time.Sleep(3 * time.Second)
lss, err := c.ovnLegacyClient.ListLogicalSwitch(c.config.EnableExternalVpc)
if err != nil {
util.LogFatalAndExit(err, "failed to list logical switch")
}

if util.IsStringIn(c.config.DefaultLogicalSwitch, lss) && util.IsStringIn(c.config.NodeSwitch, lss) && c.addNamespaceQueue.Len() == 0 {
break
}
err := wait.PollUntil(3*time.Second, func() (done bool, err error) {
subnets := []string{c.config.DefaultLogicalSwitch, c.config.NodeSwitch}
klog.Infof("wait for subnets %v ready", subnets)

return c.allSubnetReady(subnets...)
}, ctx.Done())
if err != nil {
klog.Fatalf("wait default/join subnet ready error: %v", err)
}

go wait.Until(c.runAddSgWorker, time.Second, ctx.Done())
Expand Down Expand Up @@ -1009,8 +1007,6 @@ func (c *Controller) startWorkers(ctx context.Context) {
go wait.Until(c.CheckNodePortGroup, time.Duration(c.config.NodePgProbeTime)*time.Minute, ctx.Done())
}

go wait.Until(c.syncVmLiveMigrationPort, 15*time.Second, ctx.Done())

go wait.Until(c.runAddVirtualIpWorker, time.Second, ctx.Done())
go wait.Until(c.runUpdateVirtualIpWorker, time.Second, ctx.Done())
go wait.Until(c.runDelVirtualIpWorker, time.Second, ctx.Done())
Expand Down Expand Up @@ -1040,3 +1036,18 @@ func (c *Controller) startWorkers(ctx context.Context) {
go wait.Until(c.runDelPodAnnotatedIptablesFipWorker, time.Second, ctx.Done())
}
}

func (c *Controller) allSubnetReady(subnets ...string) (bool, error) {
for _, lsName := range subnets {
exist, err := c.ovnClient.LogicalSwitchExists(lsName)
if err != nil {
return false, fmt.Errorf("check logical switch %s exist: %v", lsName, err)
}

if !exist {
return false, nil
}
}

return true, nil
}
85 changes: 85 additions & 0 deletions pkg/controller/controller_test.go
@@ -0,0 +1,85 @@
package controller

import (
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"k8s.io/client-go/util/workqueue"

mockovs "github.com/kubeovn/kube-ovn/mocks/pkg/ovs"
"github.com/kubeovn/kube-ovn/pkg/client/clientset/versioned/fake"
informerfactory "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions/kubeovn/v1"
)

type fakeControllerInformers struct {
vpcInformer kubeovninformer.VpcInformer
sbunetInformer kubeovninformer.SubnetInformer
}

type fakeController struct {
fakeController *Controller
fakeinformers *fakeControllerInformers
mockOvnClient *mockovs.MockOvnClient
}

func alwaysReady() bool { return true }

func newFakeController(t *testing.T) *fakeController {
/* kube ovn fake client */
kubeovnClient := fake.NewSimpleClientset()
kubeovnInformerFactory := informerfactory.NewSharedInformerFactory(kubeovnClient, 0)
vpcInformer := kubeovnInformerFactory.Kubeovn().V1().Vpcs()
sbunetInformer := kubeovnInformerFactory.Kubeovn().V1().Subnets()

fakeInformers := &fakeControllerInformers{
vpcInformer: vpcInformer,
sbunetInformer: sbunetInformer,
}

/* ovn fake client */
mockOvnClient := mockovs.NewMockOvnClient(gomock.NewController(t))

ctrl := &Controller{
vpcsLister: vpcInformer.Lister(),
vpcSynced: alwaysReady,
subnetsLister: sbunetInformer.Lister(),
subnetSynced: alwaysReady,
ovnClient: mockOvnClient,
syncVirtualPortsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ""),
}

return &fakeController{
fakeController: ctrl,
fakeinformers: fakeInformers,
mockOvnClient: mockOvnClient,
}
}

func Test_allSubnetReady(t *testing.T) {
t.Parallel()

fakeController := newFakeController(t)
ctrl := fakeController.fakeController
mockOvnClient := fakeController.mockOvnClient

subnets := []string{"ovn-default", "join"}

t.Run("all subnet ready", func(t *testing.T) {
mockOvnClient.EXPECT().LogicalSwitchExists(gomock.Any()).Return(true, nil).Times(2)

ready, err := ctrl.allSubnetReady(subnets...)
require.NoError(t, err)
require.True(t, ready)
})

t.Run("some subnet are not ready", func(t *testing.T) {
mockOvnClient.EXPECT().LogicalSwitchExists(subnets[0]).Return(true, nil)
mockOvnClient.EXPECT().LogicalSwitchExists(subnets[1]).Return(false, nil)

ready, err := ctrl.allSubnetReady(subnets...)
require.NoError(t, err)
require.False(t, ready)
})
}
12 changes: 7 additions & 5 deletions pkg/controller/external-gw.go
Expand Up @@ -119,15 +119,15 @@ func (c *Controller) removeExternalGateway() error {

if !keepExternalSubnet {
klog.Infof("delete external gateway switch %s", c.config.ExternalGatewaySwitch)
if err := c.ovnLegacyClient.DeleteGatewaySwitch(c.config.ExternalGatewaySwitch); err != nil {
klog.Errorf("failed to delete external gateway switch, %v", err)
if err := c.ovnClient.DeleteLogicalGatewaySwitch(util.ExternalGatewaySwitch, c.config.ClusterRouter); err != nil {
klog.Errorf("delete external gateway switch %s: %v", util.ExternalGatewaySwitch, err)
return err
}
} else {
klog.Infof("should keep provider network vlan underlay external gateway switch %s", c.config.ExternalGatewaySwitch)
lrpName := fmt.Sprintf("%s-%s", c.config.ClusterRouter, c.config.ExternalGatewaySwitch)
klog.Infof("delete logical router port %s", lrpName)
if err := c.ovnLegacyClient.DeleteLogicalRouterPort(lrpName); err != nil {
if err := c.ovnClient.DeleteLogicalRouterPort(lrpName); err != nil {
klog.Errorf("failed to delete lrp %s, %v", lrpName, err)
return err
}
Expand Down Expand Up @@ -160,10 +160,12 @@ func (c *Controller) establishExternalGateway(config map[string]string) error {
klog.Infof("lrp %s exist", lrpName)
return nil
}
if err := c.ovnLegacyClient.CreateGatewaySwitch(c.config.ExternalGatewaySwitch, c.config.ExternalGatewayNet, c.config.ExternalGatewayVlanID, lrpIp, lrpMac, chassises); err != nil {
klog.Errorf("failed to create external gateway switch, %v", err)

if err := c.ovnClient.CreateGatewayLogicalSwitch(c.config.ExternalGatewaySwitch, c.config.ClusterRouter, c.config.ExternalGatewayNet, lrpIp, lrpMac, c.config.ExternalGatewayVlanID, chassises...); err != nil {
klog.Errorf("create external gateway switch %s: %v", c.config.ExternalGatewaySwitch, err)
return err
}

return nil
}

Expand Down
60 changes: 34 additions & 26 deletions pkg/controller/gc.go
Expand Up @@ -15,6 +15,7 @@ import (

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/ovs"
"github.com/kubeovn/kube-ovn/pkg/ovsdb/ovnnb"
"github.com/kubeovn/kube-ovn/pkg/util"
)

Expand Down Expand Up @@ -52,26 +53,17 @@ func (c *Controller) gcLogicalRouterPort() error {
return err
}

var exceptPeerPorts []string
exceptPeerPorts := make(map[string]struct{})
for _, vpc := range vpcs {
for _, peer := range vpc.Status.VpcPeerings {
exceptPeerPorts = append(exceptPeerPorts, fmt.Sprintf("%s-%s", vpc.Name, peer))
exceptPeerPorts[fmt.Sprintf("%s-%s", vpc.Name, peer)] = struct{}{}
}
}
lrps, err := c.ovnLegacyClient.ListLogicalEntity("logical_router_port", "peer!=[]")
if err != nil {
klog.Errorf("failed to list logical router port, %v", err)

if err = c.ovnClient.DeleteLogicalRouterPorts(nil, logicalRouterPortFilter(exceptPeerPorts)); err != nil {
klog.Errorf("delete non-existent peer logical router port: %v", err)
return err
}
for _, lrp := range lrps {
if !util.ContainsString(exceptPeerPorts, lrp) {
klog.Infof("gc logical router port %s", lrp)
if err = c.ovnLegacyClient.DeleteLogicalRouterPort(lrp); err != nil {
klog.Errorf("failed to delete logical router port %s, %v", lrp, err)
return err
}
}
}
return nil
}

Expand Down Expand Up @@ -132,25 +124,27 @@ func (c *Controller) gcLogicalSwitch() error {
subnetMap[s.Name] = s
subnetNames = append(subnetNames, s.Name)
}
lss, err := c.ovnLegacyClient.ListLogicalSwitch(c.config.EnableExternalVpc)

lss, err := c.ovnClient.ListLogicalSwitch(c.config.EnableExternalVpc, nil)
if err != nil {
klog.Errorf("failed to list logical switch, %v", err)
klog.Errorf("list logical switch: %v", err)
return err
}

klog.Infof("ls in ovn %v", lss)
klog.Infof("subnet in kubernetes %v", subnetNames)
for _, ls := range lss {
if ls == util.InterconnectionSwitch ||
ls == util.ExternalGatewaySwitch ||
ls == c.config.ExternalGatewaySwitch {
if ls.Name == util.InterconnectionSwitch ||
ls.Name == util.ExternalGatewaySwitch ||
ls.Name == c.config.ExternalGatewaySwitch {
continue
}
if s := subnetMap[ls]; s != nil && isOvnSubnet(s) {
if s := subnetMap[ls.Name]; s != nil && isOvnSubnet(s) {
continue
}

klog.Infof("gc subnet %s", ls)
if err := c.handleDeleteLogicalSwitch(ls); err != nil {
if err := c.handleDeleteLogicalSwitch(ls.Name); err != nil {
klog.Errorf("failed to gc subnet %s, %v", ls, err)
return err
}
Expand Down Expand Up @@ -190,20 +184,23 @@ func (c *Controller) gcCustomLogicalRouter() error {
for _, s := range vpcs {
vpcNames = append(vpcNames, s.Name)
}
lrs, err := c.ovnLegacyClient.ListLogicalRouter(c.config.EnableExternalVpc)

lrs, err := c.ovnClient.ListLogicalRouter(c.config.EnableExternalVpc, nil)
if err != nil {
klog.Errorf("failed to list logical router, %v", err)
return err
}

klog.Infof("lr in ovn %v", lrs)
klog.Infof("vpc in kubernetes %v", vpcNames)

for _, lr := range lrs {
if lr == util.DefaultVpc {
if lr.Name == util.DefaultVpc {
continue
}
if !util.IsStringIn(lr, vpcNames) {
if !util.IsStringIn(lr.Name, vpcNames) {
klog.Infof("gc router %s", lr)
if err := c.deleteVpcRouter(lr); err != nil {
if err := c.deleteVpcRouter(lr.Name); err != nil {
klog.Errorf("failed to delete router %s, %v", lr, err)
return err
}
Expand Down Expand Up @@ -359,10 +356,11 @@ func (c *Controller) markAndCleanLSP() error {
}

klog.Infof("gc logical switch port %s", lsp.Name)
if err := c.ovnLegacyClient.DeleteLogicalSwitchPort(lsp.Name); err != nil {
if err := c.ovnClient.DeleteLogicalSwitchPort(lsp.Name); err != nil {
klog.Errorf("failed to delete lsp %s, %v", lsp, err)
return err
}

if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(context.Background(), lsp.Name, metav1.DeleteOptions{}); err != nil {
if !k8serrors.IsNotFound(err) {
klog.Errorf("failed to delete ip %s, %v", lsp.Name, err)
Expand Down Expand Up @@ -842,3 +840,13 @@ func (c *Controller) gcVpcDns() error {
}
return nil
}

func logicalRouterPortFilter(exceptPeerPorts map[string]struct{}) func(lrp *ovnnb.LogicalRouterPort) bool {
return func(lrp *ovnnb.LogicalRouterPort) bool {
if _, ok := exceptPeerPorts[lrp.Name]; ok {
return false // ignore except lrp
}

return lrp.Peer != nil && len(*lrp.Peer) != 0
}
}
48 changes: 48 additions & 0 deletions pkg/controller/gc_test.go
@@ -0,0 +1,48 @@
package controller

import (
"fmt"
"testing"

"github.com/kubeovn/kube-ovn/pkg/ovsdb/ovnnb"
"github.com/stretchr/testify/require"
)

func newLogicalRouterPort(lrName, lrpName, mac string, networks []string) *ovnnb.LogicalRouterPort {
return &ovnnb.LogicalRouterPort{
Name: lrpName,
MAC: mac,
Networks: networks,
ExternalIDs: map[string]string{
"lr": lrName,
},
}
}

func Test_logicalRouterPortFilter(t *testing.T) {
t.Parallel()

exceptPeerPorts := map[string]struct{}{
"except-lrp-0": {},
"except-lrp-1": {},
}

lrpNames := []string{"other-0", "other-1", "other-2", "except-lrp-0", "except-lrp-1"}
lrps := make([]*ovnnb.LogicalRouterPort, 0)
for _, lrpName := range lrpNames {
lrp := newLogicalRouterPort("", lrpName, "", nil)
peer := fmt.Sprintf("%s-peer", lrpName)
lrp.Peer = &peer
lrps = append(lrps, lrp)
}

filterFunc := logicalRouterPortFilter(exceptPeerPorts)

for _, lrp := range lrps {
if _, ok := exceptPeerPorts[lrp.Name]; ok {
require.False(t, filterFunc(lrp))
} else {
require.True(t, filterFunc(lrp))
}
}
}
16 changes: 3 additions & 13 deletions pkg/controller/init.go
Expand Up @@ -197,17 +197,7 @@ func (c *Controller) initNodeSwitch() error {

// InitClusterRouter init cluster router to connect different logical switches
func (c *Controller) initClusterRouter() error {
lrs, err := c.ovnLegacyClient.ListLogicalRouter(c.config.EnableExternalVpc)
if err != nil {
return err
}
klog.Infof("exists routers: %v", lrs)
for _, r := range lrs {
if c.config.ClusterRouter == r {
return nil
}
}
return c.ovnLegacyClient.CreateLogicalRouter(c.config.ClusterRouter)
return c.ovnClient.CreateLogicalRouter(c.config.ClusterRouter)
}

func (c *Controller) initLB(name, protocol string, sessionAffinity bool) error {
Expand Down Expand Up @@ -798,8 +788,8 @@ func (c *Controller) initAppendLspExternalIds(portName string, pod *v1.Pod) erro
externalIDs["pod"] = fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
}

if err := c.ovnLegacyClient.SetLspExternalIds(portName, externalIDs); err != nil {
klog.Errorf("failed to set lsp external_ids for port %s: %v", portName, err)
if err := c.ovnClient.SetLogicalSwitchPortExternalIds(portName, externalIDs); err != nil {
klog.Errorf("set lsp external_ids for logical switch port %s: %v", portName, err)
return err
}

Expand Down

0 comments on commit 443dd58

Please sign in to comment.