Skip to content

Commit

Permalink
decouple unidling controller
Browse files Browse the repository at this point in the history
Move the unidling controller code to its own package and add e2e
test.

OVN, if configured with the empty-lb-backend options, must generate
an event PodsNeeded if it receives traffic to a Service VIP without
endpoints that is annotated with k8s.ovn.org/idled-at

Signed-off-by: Antonio Ojea <aojea@redhat.com>
  • Loading branch information
Antonio Ojea committed Feb 24, 2021
1 parent f8225cd commit e4e05cf
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 75 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,11 @@ jobs:
- shard: shard-conformance
hybrid-overlay: false
multicast-enable: false
emptylb-enable: false
- shard: control-plane
hybrid-overlay: true
multicast-enable: true
emptylb-enable: true
ha:
- enabled: "true"
name: "HA"
Expand Down Expand Up @@ -172,6 +174,7 @@ jobs:
OVN_HYBRID_OVERLAY_ENABLE: "${{ matrix.target.hybrid-overlay }}"
OVN_GATEWAY_MODE: "${{ matrix.gateway-mode }}"
OVN_MULTICAST_ENABLE: "${{ matrix.target.multicast-enable }}"
OVN_EMPTY_LB_EVENTS: "${{ matrix.target.emptylb-enable }}"
steps:

- name: Free up disk space
Expand Down
4 changes: 4 additions & 0 deletions go-controller/pkg/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,10 @@ func (wf *WatchFactory) NamespaceInformer() cache.SharedIndexInformer {
return wf.informers[namespaceType].inf
}

func (wf *WatchFactory) ServiceInformer() cache.SharedIndexInformer {
return wf.informers[serviceType].inf
}

// noHeadlessServiceSelector is a LabelSelector added to the watch for
// Endpoints (and, eventually, EndpointSlices) that excludes endpoints
// for headless services.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ovn
package unidling

import (
"encoding/json"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ovn
package unidling

import (
"reflect"
Expand Down
138 changes: 138 additions & 0 deletions go-controller/pkg/ovn/controller/unidling/unidle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package unidling

import (
"sync"
"time"

"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"

"k8s.io/klog/v2"
)

// unidlinController checks periodically the OVN events db
// and generates a Kubernetes NeedPods events with the Service
// associated to the VIP
type unidlingController struct {
eventRecorder record.EventRecorder
// Map of load balancers to service namespace
serviceVIPToName map[ServiceVIPKey]types.NamespacedName
serviceVIPToNameLock sync.Mutex
}

// NewController creates a new unidling controller
func NewController(recorder record.EventRecorder, serviceInformer cache.SharedIndexInformer) *unidlingController {
uc := &unidlingController{
eventRecorder: recorder,
serviceVIPToName: map[ServiceVIPKey]types.NamespacedName{},
}

// we only process events on unidling, there is no reconcilation
klog.Info("Setting up event handlers for services")
serviceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: uc.onServiceAdd,
UpdateFunc: func(old, new interface{}) {
uc.onServiceDelete(old)
uc.onServiceAdd(new)
},
DeleteFunc: uc.onServiceDelete,
})
return uc
}

func (uc *unidlingController) onServiceAdd(obj interface{}) {
svc := obj.(*v1.Service)
if util.ServiceTypeHasClusterIP(svc) && util.IsClusterIPSet(svc) {
for _, ip := range util.GetClusterIPs(svc) {
for _, svcPort := range svc.Spec.Ports {
vip := util.JoinHostPortInt32(ip, svcPort.Port)
uc.AddServiceVIPToName(vip, svcPort.Protocol, svc.Namespace, svc.Name)
}
}
}
}

func (uc *unidlingController) onServiceDelete(obj interface{}) {
svc := obj.(*v1.Service)
if util.ServiceTypeHasClusterIP(svc) && util.IsClusterIPSet(svc) {
for _, ip := range util.GetClusterIPs(svc) {
for _, svcPort := range svc.Spec.Ports {
vip := util.JoinHostPortInt32(ip, svcPort.Port)
uc.DeleteServiceVIPToName(vip, svcPort.Protocol)
}
}
}
}

// ServiceVIPKey is used for looking up service namespace information for a
// particular load balancer
type ServiceVIPKey struct {
// Load balancer VIP in the form "ip:port"
vip string
// Protocol used by the load balancer
protocol v1.Protocol
}

// AddServiceVIPToName associates a k8s service name with a load balancer VIP
func (uc *unidlingController) AddServiceVIPToName(vip string, protocol v1.Protocol, namespace, name string) {
uc.serviceVIPToNameLock.Lock()
defer uc.serviceVIPToNameLock.Unlock()
uc.serviceVIPToName[ServiceVIPKey{vip, protocol}] = types.NamespacedName{Namespace: namespace, Name: name}
}

// GetServiceVIPToName retrieves the associated k8s service name for a load balancer VIP
func (uc *unidlingController) GetServiceVIPToName(vip string, protocol v1.Protocol) (types.NamespacedName, bool) {
uc.serviceVIPToNameLock.Lock()
defer uc.serviceVIPToNameLock.Unlock()
namespace, ok := uc.serviceVIPToName[ServiceVIPKey{vip, protocol}]
return namespace, ok
}

// DeleteServiceVIPToName retrieves the associated k8s service name for a load balancer VIP
func (uc *unidlingController) DeleteServiceVIPToName(vip string, protocol v1.Protocol) {
uc.serviceVIPToNameLock.Lock()
defer uc.serviceVIPToNameLock.Unlock()
delete(uc.serviceVIPToName, ServiceVIPKey{vip, protocol})
}

func (uc *unidlingController) Run(stopCh <-chan struct{}) {
ticker := time.NewTicker(5 * time.Second)

for {
select {
case <-ticker.C:
out, _, err := util.RunOVNSbctl("--format=json", "list", "controller_event")
if err != nil {
continue
}

events, err := extractEmptyLBBackendsEvents([]byte(out))
if err != nil || len(events) == 0 {
continue
}

for _, event := range events {
_, _, err := util.RunOVNSbctl("destroy", "controller_event", event.uuid)
if err != nil {
// Don't unidle until we are able to remove the controller event
klog.Errorf("Unable to remove controller event %s", event.uuid)
continue
}
if serviceName, ok := uc.GetServiceVIPToName(event.vip, event.protocol); ok {
serviceRef := v1.ObjectReference{
Kind: "Service",
Namespace: serviceName.Namespace,
Name: serviceName.Name,
}
klog.V(5).Infof("Sending a NeedPods event for service %s in namespace %s.", serviceName.Name, serviceName.Namespace)
uc.eventRecorder.Eventf(&serviceRef, v1.EventTypeNormal, "NeedPods", "The service %s needs pods", serviceName.Name)
}
}
case <-stopCh:
return
}
}
}
2 changes: 0 additions & 2 deletions go-controller/pkg/ovn/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ func (ovn *Controller) AddEndpoints(ep *kapi.Endpoints) error {
klog.Errorf("Error in creating Cluster IP for svc %s, target port: %d - %v\n", svc.Name, lbEps.Port, err)
continue
}
vip := util.JoinHostPortInt32(svc.Spec.ClusterIP, svcPort.Port)
ovn.AddServiceVIPToName(vip, svcPort.Protocol, svc.Namespace, svc.Name)
if len(svc.Spec.ExternalIPs) > 0 {
gateways, _, err := ovn.getOvnGateways()
if err != nil {
Expand Down
82 changes: 11 additions & 71 deletions go-controller/pkg/ovn/ovn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/kube"
addressset "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/address_set"
svccontroller "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/controller/services"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/controller/unidling"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/ipallocator"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/subnetallocator"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"
Expand Down Expand Up @@ -50,15 +51,6 @@ const (
egressFirewallDNSDefaultDuration time.Duration = 30 * time.Minute
)

// ServiceVIPKey is used for looking up service namespace information for a
// particular load balancer
type ServiceVIPKey struct {
// Load balancer VIP in the form "ip:port"
vip string
// Protocol used by the load balancer
protocol kapi.Protocol
}

// loadBalancerConf contains the OVN based config for a LB
type loadBalancerConf struct {
// List of endpoints as configured in OVN, ip:port
Expand Down Expand Up @@ -188,11 +180,6 @@ type Controller struct {
// Is ACL logging enabled while configuring meters?
aclLoggingEnabled bool

// Map of load balancers to service namespace
serviceVIPToName map[ServiceVIPKey]types.NamespacedName

serviceVIPToNameLock sync.Mutex

// Map of load balancers, each containing a map of VIP to OVN LB Config
serviceLBMap map[string]map[string]*loadBalancerConf

Expand Down Expand Up @@ -290,8 +277,6 @@ func NewOvnController(ovnClient *util.OVNClientset, wf *factory.WatchFactory,
loadbalancerClusterCache: make(map[kapi.Protocol]string),
multicastSupport: config.EnableMulticast,
aclLoggingEnabled: true,
serviceVIPToName: make(map[ServiceVIPKey]types.NamespacedName),
serviceVIPToNameLock: sync.Mutex{},
serviceLBMap: make(map[string]map[string]*loadBalancerConf),
serviceLBLock: sync.Mutex{},
joinSwIPManager: nil,
Expand Down Expand Up @@ -378,7 +363,16 @@ func (oc *Controller) Run(wg *sync.WaitGroup) error {
klog.Infof("Completing all the Watchers took %v", time.Since(start))

if config.Kubernetes.OVNEmptyLbEvents {
go oc.ovnControllerEventChecker()
klog.Infof("Starting unidling controller")
unidlingController := unidling.NewController(
oc.recorder,
oc.watchFactory.ServiceInformer(),
)
wg.Add(1)
go func() {
defer wg.Done()
unidlingController.Run(oc.stopChan)
}()
}

if oc.hoMaster != nil {
Expand Down Expand Up @@ -410,45 +404,6 @@ func (oc *Controller) syncPeriodic() {
}()
}

func (oc *Controller) ovnControllerEventChecker() {
ticker := time.NewTicker(5 * time.Second)

for {
select {
case <-ticker.C:
out, _, err := util.RunOVNSbctl("--format=json", "list", "controller_event")
if err != nil {
continue
}

events, err := extractEmptyLBBackendsEvents([]byte(out))
if err != nil || len(events) == 0 {
continue
}

for _, event := range events {
_, _, err := util.RunOVNSbctl("destroy", "controller_event", event.uuid)
if err != nil {
// Don't unidle until we are able to remove the controller event
klog.Errorf("Unable to remove controller event %s", event.uuid)
continue
}
if serviceName, ok := oc.GetServiceVIPToName(event.vip, event.protocol); ok {
serviceRef := kapi.ObjectReference{
Kind: "Service",
Namespace: serviceName.Namespace,
Name: serviceName.Name,
}
klog.V(5).Infof("Sending a NeedPods event for service %s in namespace %s.", serviceName.Name, serviceName.Namespace)
oc.recorder.Eventf(&serviceRef, kapi.EventTypeNormal, "NeedPods", "The service %s needs pods", serviceName.Name)
}
}
case <-oc.stopChan:
return
}
}
}

func podScheduled(pod *kapi.Pod) bool {
return pod.Spec.NodeName != ""
}
Expand Down Expand Up @@ -1054,21 +1009,6 @@ func (oc *Controller) WatchNodes() {
klog.Infof("Bootstrapping existing nodes and cleaning stale nodes took %v", time.Since(start))
}

// AddServiceVIPToName associates a k8s service name with a load balancer VIP
func (oc *Controller) AddServiceVIPToName(vip string, protocol kapi.Protocol, namespace, name string) {
oc.serviceVIPToNameLock.Lock()
defer oc.serviceVIPToNameLock.Unlock()
oc.serviceVIPToName[ServiceVIPKey{vip, protocol}] = types.NamespacedName{Namespace: namespace, Name: name}
}

// GetServiceVIPToName retrieves the associated k8s service name for a load balancer VIP
func (oc *Controller) GetServiceVIPToName(vip string, protocol kapi.Protocol) (types.NamespacedName, bool) {
oc.serviceVIPToNameLock.Lock()
defer oc.serviceVIPToNameLock.Unlock()
namespace, ok := oc.serviceVIPToName[ServiceVIPKey{vip, protocol}]
return namespace, ok
}

// GetNetworkPolicyACLLogging retrieves ACL deny policy logging setting for the Namespace
func (oc *Controller) GetNetworkPolicyACLLogging(ns string) *ACLLoggingLevels {
nsInfo := oc.getNamespaceLocked(ns)
Expand Down

0 comments on commit e4e05cf

Please sign in to comment.