Skip to content

Commit

Permalink
clean up vpc service
Browse files Browse the repository at this point in the history
  • Loading branch information
fanriming committed Jun 20, 2021
1 parent fde8991 commit 872340c
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 93 deletions.
24 changes: 10 additions & 14 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,10 @@ type Controller struct {
updateNodeQueue workqueue.RateLimitingInterface
deleteNodeQueue workqueue.RateLimitingInterface

servicesLister v1.ServiceLister
serviceSynced cache.InformerSynced
deleteTcpServiceQueue workqueue.RateLimitingInterface
deleteUdpServiceQueue workqueue.RateLimitingInterface
updateServiceQueue workqueue.RateLimitingInterface
servicesLister v1.ServiceLister
serviceSynced cache.InformerSynced
deleteServiceQueue workqueue.RateLimitingInterface
updateServiceQueue workqueue.RateLimitingInterface

endpointsLister v1.EndpointsLister
endpointsSynced cache.InformerSynced
Expand Down Expand Up @@ -210,11 +209,10 @@ func NewController(config *Configuration) *Controller {
updateNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateNode"),
deleteNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteNode"),

servicesLister: serviceInformer.Lister(),
serviceSynced: serviceInformer.Informer().HasSynced,
deleteTcpServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteTcpService"),
deleteUdpServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteUdpService"),
updateServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateService"),
servicesLister: serviceInformer.Lister(),
serviceSynced: serviceInformer.Informer().HasSynced,
deleteServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteService"),
updateServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateService"),

endpointsLister: endpointInformer.Lister(),
endpointsSynced: endpointInformer.Informer().HasSynced,
Expand Down Expand Up @@ -372,8 +370,7 @@ func (c *Controller) shutdown() {
c.updateNodeQueue.ShutDown()
c.deleteNodeQueue.ShutDown()

c.deleteTcpServiceQueue.ShutDown()
c.deleteUdpServiceQueue.ShutDown()
c.deleteServiceQueue.ShutDown()
c.updateServiceQueue.ShutDown()
c.updateEndpointQueue.ShutDown()

Expand Down Expand Up @@ -460,8 +457,7 @@ func (c *Controller) startWorkers(stopCh <-chan struct{}) {
go wait.Until(c.runUpdateVpcStatusWorker, time.Second, stopCh)

// run in a single worker to avoid delete the last vip, which will lead ovn to delete the loadbalancer
go wait.Until(c.runDeleteTcpServiceWorker, time.Second, stopCh)
go wait.Until(c.runDeleteUdpServiceWorker, time.Second, stopCh)
go wait.Until(c.runDeleteServiceWorker, time.Second, stopCh)
for i := 0; i < c.config.WorkerNum; i++ {
go wait.Until(c.runAddPodWorker, time.Second, stopCh)
go wait.Until(c.runDeletePodWorker, time.Second, stopCh)
Expand Down
19 changes: 0 additions & 19 deletions pkg/controller/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,25 +279,6 @@ func (c *Controller) gcLoadBalancer() error {
}
}

lbUuid, err := c.ovnClient.FindLoadbalancer(c.config.ClusterTcpLoadBalancer)
if err != nil {
klog.Errorf("failed to get lb %v", err)
}
vips, err := c.ovnClient.GetLoadBalancerVips(lbUuid)
if err != nil {
klog.Errorf("failed to get tcp lb vips %v", err)
return err
}
for vip := range vips {
if !util.IsStringIn(vip, tcpVips) {
err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterTcpLoadBalancer)
if err != nil {
klog.Errorf("failed to delete vip %s from tcp lb, %v", vip, err)
return err
}
}
}

vpcs, err := c.vpcsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list vpc, %v", err)
Expand Down
98 changes: 39 additions & 59 deletions pkg/controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,28 @@ import (
"github.com/kubeovn/kube-ovn/pkg/util"
)

type vpcService struct {
Vip string
Vpc string
Protocol v1.Protocol
}

func (c *Controller) enqueueDeleteService(obj interface{}) {
if !c.isLeader() {
return
}
svc := obj.(*v1.Service)
klog.V(3).Infof("enqueue delete service %s/%s", svc.Namespace, svc.Name)
//klog.V(3).Infof("enqueue delete service %s/%s", svc.Namespace, svc.Name)
klog.Infof("enqueue delete service %s/%s", svc.Namespace, svc.Name)
if svc.Spec.ClusterIP != v1.ClusterIPNone && svc.Spec.ClusterIP != "" {
for _, port := range svc.Spec.Ports {
if port.Protocol == v1.ProtocolTCP {
c.deleteTcpServiceQueue.Add(fmt.Sprintf("%s:%d", svc.Spec.ClusterIP, port.Port))
} else if port.Protocol == v1.ProtocolUDP {
c.deleteUdpServiceQueue.Add(fmt.Sprintf("%s:%d", svc.Spec.ClusterIP, port.Port))
vpcSvc := &vpcService{
Vip: fmt.Sprintf("%s:%d", svc.Spec.ClusterIP, port.Port),
Protocol: port.Protocol,
Vpc: svc.Annotations[util.VpcAnnotation],
}
klog.Infof("delete vpc service %v", vpcSvc)
c.deleteServiceQueue.Add(vpcSvc)
}
}
}
Expand All @@ -52,13 +61,8 @@ func (c *Controller) enqueueUpdateService(old, new interface{}) {
c.updateServiceQueue.Add(key)
}

func (c *Controller) runDeleteTcpServiceWorker() {
for c.processNextDeleteTcpServiceWorkItem() {
}
}

func (c *Controller) runDeleteUdpServiceWorker() {
for c.processNextDeleteUdpServiceWorkItem() {
func (c *Controller) runDeleteServiceWorker() {
for c.processNextDeleteServiceWorkItem() {
}
}

Expand All @@ -67,58 +71,28 @@ func (c *Controller) runUpdateServiceWorker() {
}
}

func (c *Controller) processNextDeleteTcpServiceWorkItem() bool {
obj, shutdown := c.deleteTcpServiceQueue.Get()
func (c *Controller) processNextDeleteServiceWorkItem() bool {
obj, shutdown := c.deleteServiceQueue.Get()

if shutdown {
return false
}

err := func(obj interface{}) error {
defer c.deleteTcpServiceQueue.Done(obj)
var key string
defer c.deleteServiceQueue.Done(obj)
var vpcSvc *vpcService
var ok bool
if key, ok = obj.(string); !ok {
c.deleteTcpServiceQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
if vpcSvc, ok = obj.(*vpcService); !ok {
c.deleteServiceQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected vpcService in workqueue but got %#v", obj))
return nil
}
if err := c.handleDeleteService(key, v1.ProtocolTCP); err != nil {
c.deleteTcpServiceQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.deleteTcpServiceQueue.Forget(obj)
return nil
}(obj)

if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}

func (c *Controller) processNextDeleteUdpServiceWorkItem() bool {
obj, shutdown := c.deleteUdpServiceQueue.Get()

if shutdown {
return false
}

err := func(obj interface{}) error {
defer c.deleteUdpServiceQueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.deleteUdpServiceQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
if err := c.handleDeleteService(vpcSvc); err != nil {
c.deleteServiceQueue.AddRateLimited(obj)
return fmt.Errorf("error syncing '%s': %s, requeuing", vpcSvc.Vip, err.Error())
}
if err := c.handleDeleteService(key, v1.ProtocolUDP); err != nil {
c.deleteUdpServiceQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.deleteUdpServiceQueue.Forget(obj)
c.deleteServiceQueue.Forget(obj)
return nil
}(obj)

Expand Down Expand Up @@ -160,29 +134,35 @@ func (c *Controller) processNextUpdateServiceWorkItem() bool {
return true
}

func (c *Controller) handleDeleteService(vip string, protocol v1.Protocol) error {
func (c *Controller) handleDeleteService(service *vpcService) error {
svcs, err := c.servicesLister.Services(v1.NamespaceAll).List(labels.Everything())
if err != nil {
klog.Errorf("failed to list svc, %v", err)
return err
}
for _, svc := range svcs {
if svc.Spec.ClusterIP == parseVipAddr(vip) {
if svc.Spec.ClusterIP == parseVipAddr(service.Vip) {
return nil
}
}

if protocol == v1.ProtocolTCP {
if err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterTcpLoadBalancer); err != nil {
vpcLbConfig := c.GenVpcLoadBalancer(service.Vpc)
vip := service.Vip
if service.Protocol == v1.ProtocolTCP {
if err := c.ovnClient.DeleteLoadBalancerVip(vip, vpcLbConfig.TcpLoadBalancer); err != nil {
klog.Errorf("failed to delete vip %s from tcp lb, %v", vip, err)
return err
}
if err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterTcpSessionLoadBalancer); err != nil {
if err := c.ovnClient.DeleteLoadBalancerVip(vip, vpcLbConfig.TcpSessLoadBalancer); err != nil {
klog.Errorf("failed to delete vip %s from tcp session lb, %v", vip, err)
return err
}
} else {
if err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterUdpSessionLoadBalancer); err != nil {
if err := c.ovnClient.DeleteLoadBalancerVip(vip, vpcLbConfig.UdpLoadBalancer); err != nil {
klog.Errorf("failed to delete vip %s from udp lb, %v", vip, err)
return err
}
if err := c.ovnClient.DeleteLoadBalancerVip(vip, vpcLbConfig.UdpSessLoadBalancer); err != nil {
klog.Errorf("failed to delete vip %s from udp session lb, %v", vip, err)
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/vpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ type VpcLoadBalancer struct {
}

func (c *Controller) GenVpcLoadBalancer(vpcKey string) *VpcLoadBalancer {
if vpcKey == util.DefaultVpc {
if vpcKey == util.DefaultVpc || vpcKey == "" {
return &VpcLoadBalancer{
TcpLoadBalancer: c.config.ClusterTcpLoadBalancer,
TcpSessLoadBalancer: c.config.ClusterTcpSessionLoadBalancer,
Expand Down

0 comments on commit 872340c

Please sign in to comment.