Skip to content

Commit

Permalink
fix: gc resource when start controller
Browse files Browse the repository at this point in the history
  • Loading branch information
oilbeater committed Oct 23, 2019
1 parent 5c07216 commit 3791ba2
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 10 deletions.
199 changes: 197 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package controller

import (
"fmt"
"github.com/alauda/kube-ovn/pkg/util"
"k8s.io/apimachinery/pkg/labels"
"strings"
"time"

kubeovninformer "github.com/alauda/kube-ovn/pkg/client/informers/externalversions"
Expand Down Expand Up @@ -51,7 +54,8 @@ type Controller struct {
updateSubnetQueue workqueue.RateLimitingInterface
updateSubnetStatusQueue workqueue.RateLimitingInterface

ipSynced cache.InformerSynced
ipsLister kubeovnlister.IPLister
ipSynced cache.InformerSynced

namespacesLister v1.NamespaceLister
namespacesSynced cache.InformerSynced
Expand Down Expand Up @@ -123,7 +127,8 @@ func NewController(config *Configuration) *Controller {
updateSubnetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateSubnet"),
updateSubnetStatusQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateSubnetStatus"),

ipSynced: ipInformer.Informer().HasSynced,
ipsLister: ipInformer.Lister(),
ipSynced: ipInformer.Informer().HasSynced,

podsLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
Expand Down Expand Up @@ -287,6 +292,11 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {
return fmt.Errorf("failed to wait for caches to sync")
}

c.gcLogicalSwitch()
c.gcNode()
c.gcLogicalSwitchPort()
c.gcPortGroup()

klog.Info("Starting workers")

// Launch workers to process resources
Expand Down Expand Up @@ -334,3 +344,188 @@ func (c *Controller) isLeader() bool {
func (c *Controller) hasLeader() bool {
return c.elector.GetLeader() != ""
}

func (c *Controller) gcLogicalSwitch() error {
klog.Infof("start to gc logical switch")
subnets, err := c.subnetsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list subnet, %v", err)
return err
}
subnetNames := make([]string, 0, len(subnets))
for _, s := range subnets {
subnetNames = append(subnetNames, s.Name)
}
lss, err := c.ovnClient.ListLogicalSwitch()
if err != nil {
klog.Errorf("failed to list logical switch, %v", err)
return err
}
klog.Infof("ls in ovn %v", lss)
klog.Infof("subnet in kubernetes %v", subnetNames)
for _, ls := range lss {
if !util.IsStringIn(ls, subnetNames) {
klog.Infof("gc subnet %s", ls)
if err := c.handleDeleteSubnet(ls); err != nil {
klog.Errorf("failed to gc subnet %s, %v", ls, err)
return err
}
}
}
return nil
}

func (c *Controller) gcNode() error {
klog.Infof("start to gc nodes")
nodes, err := c.nodesLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list node, %v", err)
return err
}
nodeNames := make([]string, 0, len(nodes))
for _, no := range nodes {
nodeNames = append(nodeNames, no.Name)
}
ips, err := c.ipsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list ip, %v", err)
return err
}
ipNodeNames := make([]string, 0, len(ips))
for _, ip := range ips {
if !strings.Contains(ip.Name, ".") {
ipNodeNames = append(ipNodeNames, strings.TrimPrefix(ip.Name, "node-"))
}
}
for _, no := range ipNodeNames {
if !util.IsStringIn(no, nodeNames) {
klog.Infof("gc node %s", no)
if err := c.handleDeleteNode(no); err != nil {
klog.Errorf("failed to gc node %s, %v", no, err)
return err
}
}
}
return nil
}

func (c *Controller) gcLogicalSwitchPort() error {
klog.Infof("start to gc logical switch ports")
ips, err := c.ipsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list ip, %v", err)
return err
}
ipNames := make([]string, 0, len(ips))
for _, ip := range ips {
ipNames = append(ipNames, ip.Name)
}
lsps, err := c.ovnClient.ListLogicalSwitchPort()
if err != nil {
klog.Errorf("failed to list logical switch port, %v", err)
return err
}
for _, lsp := range lsps {
if !util.IsStringIn(lsp, ipNames) {
if strings.Contains(lsp, ".") {
klog.Infof("gc logical switch port %s", lsp)
podName := strings.Split(lsp, ".")[0]
podNameSpace := strings.Split(lsp, ".")[1]
if err := c.handleDeletePod(fmt.Sprintf("%s/%s", podNameSpace, podName)); err != nil {
klog.Errorf("failed to gc port %s, %v", lsp, err)
return err
}
}
}
}
return nil
}

func (c *Controller) gcLoadBalancer() error {
klog.Infof("start to gc loadbalancers")
svcs, err := c.servicesLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list svc, %v", err)
return err
}
tcpVips := []string{}
udpVips := []string{}
for _, svc := range svcs {
ip := svc.Spec.ClusterIP
for _, port := range svc.Spec.Ports {
if port.Protocol == corev1.ProtocolTCP {
tcpVips = append(tcpVips, fmt.Sprintf("%s:%d", ip, port.Port))
} else {
udpVips = append(udpVips, fmt.Sprintf("%s:%d", ip, port.Port))
}
}
}

lbUuid, err := c.ovnClient.FindLoadbalancer(c.config.ClusterTcpLoadBalancer)
if err != nil {
klog.Errorf("failed to get lb %v", err)
}
vips, err := c.ovnClient.GetLoadBalancerVips(lbUuid)
if err != nil {
klog.Errorf("failed to get udp lb vips %v", err)
return err
}
for _, vip := range vips {
if !util.IsStringIn(vip, tcpVips) {
err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterTcpLoadBalancer)
if err != nil {
klog.Errorf("failed to delete vip %s from tcp lb, %v", vip, err)
return err
}
}
}

lbUuid, err = c.ovnClient.FindLoadbalancer(c.config.ClusterUdpLoadBalancer)
if err != nil {
klog.Errorf("failed to get lb %v", err)
return err
}
vips, err = c.ovnClient.GetLoadBalancerVips(lbUuid)
if err != nil {
klog.Errorf("failed to get udp lb vips %v", err)
return err
}
for _, vip := range vips {
if !util.IsStringIn(vip, udpVips) {
err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterUdpLoadBalancer)
if err != nil {
klog.Errorf("failed to delete vip %s from tcp lb, %v", vip, err)
return err
}
}
}
return nil
}

func (c *Controller) gcPortGroup() error {
klog.Infof("start to gc network policy")
nps, err := c.npsLister.List(labels.Everything())
npNames := make([]string, 0, len(nps))
for _, np := range nps {
npNames = append(npNames, fmt.Sprintf("%s/%s", np.Namespace, np.Name))
}
if err != nil {
klog.Errorf("failed to list network policy, %v", err)
return err
}
pgs, err := c.ovnClient.ListPortGroup()
if err != nil {
klog.Errorf("failed to list port-group, %v", err)
return err
}
for _, pg := range pgs {
if !util.IsStringIn(fmt.Sprintf("%s/%s", pg.NpNamespace, pg.NpName), npNames) {
klog.Infof("gc port group %s", pg.Name)
if err := c.handleDeleteNp(fmt.Sprintf("%s/%s", pg.NpNamespace, pg.NpName)); err != nil {
klog.Errorf("failed to gc np %s/%s, %v", pg.NpNamespace, pg.NpName, err)
return err
}
}
}
return nil
}
2 changes: 1 addition & 1 deletion pkg/controller/network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (c *Controller) handleUpdateNp(key string) error {
egressAllowAsName := strings.Replace(fmt.Sprintf("%s.%s.egress.allow", np.Name, np.Namespace), "-", ".", -1)
egressExceptAsName := strings.Replace(fmt.Sprintf("%s.%s.egress.except", np.Name, np.Namespace), "-", ".", -1)

if err := c.ovnClient.CreatePortGroup(pgName); err != nil {
if err := c.ovnClient.CreatePortGroup(pgName, np.Namespace, np.Name); err != nil {
klog.Errorf("failed to create port group for np %s, %v", key, err)
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,12 @@ func (c *Controller) handleAddNode(key string) error {
return err
}

ipCr, err := c.config.KubeOvnClient.KubeovnV1().IPs().Get(key, metav1.GetOptions{})
ipCr, err := c.config.KubeOvnClient.KubeovnV1().IPs().Get(fmt.Sprintf("node-%s", key), metav1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
_, err := c.config.KubeOvnClient.KubeovnV1().IPs().Create(&kubeovnv1.IP{
ObjectMeta: metav1.ObjectMeta{
Name: key,
Name: fmt.Sprintf("node-%s", key),
Labels: map[string]string{
util.SubnetNameLabel: c.config.NodeSwitch,
},
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func (c *Controller) handleUpdateService(key string) error {
lbUuid, err := c.ovnClient.FindLoadbalancer(c.config.ClusterTcpLoadBalancer)
if err != nil {
klog.Errorf("failed to get lb %v", err)
return err
}
vips, err := c.ovnClient.GetLoadBalancerVips(lbUuid)
if err != nil {
Expand Down Expand Up @@ -249,6 +250,7 @@ func (c *Controller) handleUpdateService(key string) error {
lbUuid, err = c.ovnClient.FindLoadbalancer(c.config.ClusterUdpLoadBalancer)
if err != nil {
klog.Errorf("failed to get lb %v", err)
return err
}
vips, err = c.ovnClient.GetLoadBalancerVips(lbUuid)
if err != nil {
Expand Down
63 changes: 58 additions & 5 deletions pkg/ovs/ovn-nbctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,23 @@ func (c Client) LogicalSwitchExists(logicalSwitch string) (bool, error) {
return false, nil
}

func (c Client) ListLogicalSwitchPort() ([]string, error) {
output, err := c.ovnNbCommand("--data=bare", "--no-heading", "--columns=name", "list", "logical_switch_port")
if err != nil {
klog.Errorf("failed to list logical switch port, %v", err)
return nil, err
}
lines := strings.Split(output, "\n")
result := make([]string, 0, len(lines))
for _, l := range lines {
if len(strings.TrimSpace(l)) == 0 {
continue
}
result = append(result, strings.TrimSpace(l))
}
return result, nil
}

// ListLogicalRouter list logical router names
func (c Client) ListLogicalRouter() ([]string, error) {
output, err := c.ovnNbCommand("lr-list")
Expand Down Expand Up @@ -406,16 +423,20 @@ func (c Client) GetPortAddr(port string) ([]string, error) {
return address, nil
}

func (c Client) CreatePortGroup(pgName string) error {
output, err := c.ovnNbCommand("--data=bare", "--no-heading", "--columns=_uuid", "find", "port_group", fmt.Sprintf("name=%s", pgName))
func (c Client) CreatePortGroup(pgName, npNs, npName string) error {
output, err := c.ovnNbCommand(
"--data=bare", "--no-heading", "--columns=_uuid", "find", "port_group", fmt.Sprintf("name=%s", pgName))
if err != nil {
klog.Errorf("failed to find port_group %s", pgName)
return err
}
if output != "" {
return nil
}
_, err = c.ovnNbCommand("pg-add", pgName)
_, err = c.ovnNbCommand(
"pg-add", pgName,
"--", "set", "port_group", pgName, fmt.Sprintf("external_ids:np=%s/%s", npNs, npName),
)
return err
}

Expand All @@ -432,6 +453,38 @@ func (c Client) DeletePortGroup(pgName string) error {
return err
}

type portGroup struct {
Name string
NpName string
NpNamespace string
}

func (c Client) ListPortGroup() ([]portGroup, error) {
output, err := c.ovnNbCommand("--data=bare", "--format=csv", "--no-heading", "--columns=name,external_ids", "list", "port_group")
if err != nil {
klog.Errorf("failed to list logical port-group, %v", err)
return nil, err
}
lines := strings.Split(output, "\n")
result := make([]portGroup, 0, len(lines))
for _, l := range lines {
if len(strings.TrimSpace(l)) == 0 {
continue
}
parts := strings.Split(strings.TrimSpace(l), ",")
if len(parts) != 2 {
continue
}
name := strings.TrimSpace(parts[0])
np := strings.Split(strings.TrimSpace(parts[1]), "/")
if len(np) != 2 {
continue
}
result = append(result, portGroup{Name: name, NpNamespace: np[0], NpName: np[1]})
}
return result, nil
}

func (c Client) CreateAddressSet(asName string) error {
output, err := c.ovnNbCommand("--data=bare", "--no-heading", "--columns=_uuid", "find", "address_set", fmt.Sprintf("name=%s", asName))
if err != nil {
Expand Down Expand Up @@ -560,8 +613,8 @@ func (c Client) GetLogicalSwitchExcludeIPS(logicalSwitch string) ([]string, erro

// SetLogicalSwitchExcludeIPS set a logical switch exclude ips
// ovn-nbctl set logical_switch ovn-default other_config:exclude_ips="10.17.0.2 10.17.0.1"
func (c Client) SetLogicalSwitchExcludeIPS(logicalSwtich string, excludeIPS []string) error {
_, err := c.ovnNbCommand("set", "logical_switch", logicalSwtich,
func (c Client) SetLogicalSwitchExcludeIPS(logicalSwitch string, excludeIPS []string) error {
_, err := c.ovnNbCommand("set", "logical_switch", logicalSwitch,
fmt.Sprintf(`other_config:exclude_ips="%s"`, strings.Join(excludeIPS, " ")))
return err
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/util/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,12 @@ func IsStringsOverlap(a, b []string) bool {
}
return false
}

func IsStringIn(str string, slice []string) bool {
for _, s := range slice {
if s == str {
return true
}
}
return false
}

0 comments on commit 3791ba2

Please sign in to comment.