Skip to content

Commit

Permalink
limiting cache clients for symmetricRoutingOperator and overlayOperat…
Browse files Browse the repository at this point in the history
…or in liqo-route
  • Loading branch information
alacuku committed Jul 16, 2021
1 parent 7200a77 commit 31fbf06
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 136 deletions.
108 changes: 89 additions & 19 deletions cmd/liqonet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ package main
import (
"flag"
"os"
"strings"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand All @@ -45,6 +50,15 @@ import (
"github.com/liqotech/liqo/pkg/mapperUtils"
)

const (
// This labels are the ones set during the deployment of liqo using the helm chart.
// Any change to those labels on the helm chart has also to be reflected here.
podInstanceLabelKey = "app.kubernetes.io/instance"
routeInstanceLabelValue = "liqo-route"
podNameLabelKey = "app.kubernetes.io/name"
routeNameLabelValue = "route"
)

var (
scheme = runtime.NewScheme()
vxlanConfig = &overlay.VxlanDeviceAttrs{
Expand Down Expand Up @@ -78,15 +92,6 @@ func main() {
case liqoconst.LiqoRouteOperatorName:
mutex := &sync.RWMutex{}
nodeMap := map[string]string{}
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
MapperProvider: mapperUtils.LiqoMapperProvider(scheme),
Scheme: scheme,
MetricsBindAddress: metricsAddr,
})
if err != nil {
klog.Errorf("unable to get manager: %s", err)
os.Exit(1)
}
// Get the pod ip and parse to net.IP.
podIP, err := utils.GetPodIP()
if err != nil {
Expand All @@ -98,6 +103,68 @@ func main() {
klog.Errorf("unable to get node name: %v", err)
os.Exit(1)
}
podNamespace, err := utils.GetPodNamespace()
if err != nil {
klog.Errorf("unable to get pod namespace: %v", err)
os.Exit(1)
}
// Asking the api-server to only inform the operator for the pods running in a node different from the one
// where the operator is running.
smcFieldSelector, err := fields.ParseSelector(strings.Join([]string{"spec.nodeName", "!=", nodeName}, ""))
if err != nil {
klog.Errorf("unable to create label requirement: %v", err)
os.Exit(1)
}
// Asking the api-server to only inform the operator for the pods running in a node different from
// the virtual nodes. We want to process only the pods running on the local cluster and not the ones
// offloaded to a remote cluster.
smcLabelRequirement, err := labels.NewRequirement(liqoconst.LocalPodLabelKey, selection.DoesNotExist, []string{})
if err != nil {
klog.Errorf("unable to create label requirement: %v", err)
os.Exit(1)
}
smcLabelSelector := labels.NewSelector().Add(*smcLabelRequirement)
mainMgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
MapperProvider: mapperUtils.LiqoMapperProvider(scheme),
Scheme: scheme,
MetricsBindAddress: metricsAddr,
NewCache: cache.BuilderWithOptions(cache.Options{
SelectorsByObject: cache.SelectorsByObject{
&corev1.Pod{}: {
Field: smcFieldSelector,
Label: smcLabelSelector,
},
},
}),
})
if err != nil {
klog.Errorf("unable to get manager: %s", err)
os.Exit(1)
}
// Asking the api-server to only inform the operator for the pods that are part of the route component.
ovcLabelSelector := labels.SelectorFromSet(labels.Set{
podNameLabelKey: routeNameLabelValue,
podInstanceLabelKey: routeInstanceLabelValue,
})
// This manager is used by the overlay operator and it is limited to the pods running
// on the same namespace as the operator.
overlayMgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
MapperProvider: mapperUtils.LiqoMapperProvider(scheme),
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Namespace: podNamespace,
NewCache: cache.BuilderWithOptions(cache.Options{
SelectorsByObject: cache.SelectorsByObject{
&corev1.Pod{}: {
Label: ovcLabelSelector,
},
},
}),
})
if err != nil {
klog.Errorf("unable to get manager: %s", err)
os.Exit(1)
}
vxlanConfig.VtepAddr = podIP
vxlanDevice, err := overlay.NewVxlanDevice(vxlanConfig)
if err != nil {
Expand All @@ -110,33 +177,36 @@ func main() {
klog.Errorf("an error occurred while creating the vxlan routing manager: %v", err)
os.Exit(1)
}
eventRecorder := mgr.GetEventRecorderFor(liqoconst.LiqoRouteOperatorName + "." + podIP.String())
routeController := routeoperator.NewRouteController(podIP.String(), vxlanDevice, vxlanRoutingManager, eventRecorder, mgr.GetClient())
if err = routeController.SetupWithManager(mgr); err != nil {
eventRecorder := mainMgr.GetEventRecorderFor(liqoconst.LiqoRouteOperatorName + "." + podIP.String())
routeController := routeoperator.NewRouteController(podIP.String(), vxlanDevice, vxlanRoutingManager, eventRecorder, mainMgr.GetClient())
if err = routeController.SetupWithManager(mainMgr); err != nil {
klog.Errorf("unable to setup controller: %s", err)
os.Exit(1)
}
overlayController, err := routeoperator.NewOverlayController(podIP.String(),
routeoperator.PodLabelSelector, vxlanDevice, mutex, nodeMap, mgr.GetClient())
overlayController, err := routeoperator.NewOverlayController(podIP.String(), vxlanDevice, mutex, nodeMap, overlayMgr.GetClient())
if err != nil {
klog.Errorf("an error occurred while creating overlay controller: %v", err)
os.Exit(3)
}
if err = overlayController.SetupWithManager(mgr); err != nil {
if err = overlayController.SetupWithManager(overlayMgr); err != nil {
klog.Errorf("unable to setup overlay controller: %s", err)
os.Exit(1)
}
symmetricRoutingOperator, err := routeoperator.NewSymmetricRoutingOperator(nodeName,
liqoconst.RoutingTableID, vxlanDevice, mutex, nodeMap, mgr.GetClient())
symmetricRoutingController, err := routeoperator.NewSymmetricRoutingOperator(nodeName,
liqoconst.RoutingTableID, vxlanDevice, mutex, nodeMap, mainMgr.GetClient())
if err != nil {
klog.Errorf("an error occurred while creting symmetric routing controller: %v", err)
os.Exit(4)
}
if err = symmetricRoutingOperator.SetupWithManager(mgr); err != nil {
if err = symmetricRoutingController.SetupWithManager(mainMgr); err != nil {
klog.Errorf("unable to setup overlay controller: %s", err)
os.Exit(1)
}
if err := mgr.Start(routeController.SetupSignalHandlerForRouteOperator()); err != nil {
if err := mainMgr.Add(overlayMgr); err != nil {
klog.Errorf("unable to add the overlay manager to the main manager: %s", err)
os.Exit(1)
}
if err := mainMgr.Start(routeController.SetupSignalHandlerForRouteOperator()); err != nil {
klog.Errorf("unable to start controller: %s", err)
os.Exit(1)
}
Expand Down
61 changes: 14 additions & 47 deletions internal/liqonet/route-operator/overlayOperator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"github.com/vishvananda/netlink"
corev1 "k8s.io/api/core/v1"
k8sApiErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -22,39 +20,17 @@ import (
)

var (
// This labels are the ones set during the deployment of liqo using the helm chart.
// Any change to those labels on the helm chart has also to be reflected here.
podInstanceLabelKey = "app.kubernetes.io/instance"
podInstanceLabelValue = "liqo-route"
podNameLabelKey = "app.kubernetes.io/name"
podNameLabelValue = "route"
// vxlanMACAddressKey annotation key the mac address of vxlan interface.
// vxlanMACAddressKey annotation key for the mac address of vxlan interface.
vxlanMACAddressKey = "net.liqo.io/vxlan.mac.address"
// PodLabelSelector label selector used to track only the route pods.
PodLabelSelector = &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: podInstanceLabelKey,
Operator: metav1.LabelSelectorOpIn,
Values: []string{podInstanceLabelValue},
},
{
Key: podNameLabelKey,
Operator: metav1.LabelSelectorOpIn,
Values: []string{podNameLabelValue},
},
},
}
)

// OverlayController reconciles pods objects, in our case the route operators pods.
type OverlayController struct {
client.Client
vxlanDev *overlay.VxlanDevice
podIP string
podSelector labels.Selector
nodesLock *sync.RWMutex
vxlanPeers map[string]*overlay.Neighbor
vxlanDev *overlay.VxlanDevice
podIP string
nodesLock *sync.RWMutex
vxlanPeers map[string]*overlay.Neighbor
// For each nodeName contains its IP addr.
vxlanNodes map[string]string
// Given the namespace/podName it contains the pod name where the pod is running.
Expand Down Expand Up @@ -112,27 +88,22 @@ func (ovc *OverlayController) Reconcile(ctx context.Context, req ctrl.Request) (
}

// NewOverlayController returns a new controller ready to be setup and started with the controller manager.
func NewOverlayController(podIP string, podSelector *metav1.LabelSelector, vxlanDevice *overlay.VxlanDevice,
nodesLock *sync.RWMutex, vxlanNodes map[string]string, cl client.Client) (*OverlayController, error) {
selector, err := metav1.LabelSelectorAsSelector(podSelector)
if err != nil {
return nil, err
}
func NewOverlayController(podIP string, vxlanDevice *overlay.VxlanDevice, nodesLock *sync.RWMutex,
vxlanNodes map[string]string, cl client.Client) (*OverlayController, error) {
if vxlanDevice == nil {
return nil, &liqoerrors.WrongParameter{
Reason: liqoerrors.NotNil,
Parameter: "vxlanDevice",
}
}
return &OverlayController{
Client: cl,
vxlanDev: vxlanDevice,
podIP: podIP,
podSelector: selector,
nodesLock: nodesLock,
vxlanPeers: map[string]*overlay.Neighbor{},
vxlanNodes: vxlanNodes,
podToNode: map[string]string{},
Client: cl,
vxlanDev: vxlanDevice,
podIP: podIP,
nodesLock: nodesLock,
vxlanPeers: map[string]*overlay.Neighbor{},
vxlanNodes: vxlanNodes,
podToNode: map[string]string{},
}, nil
}

Expand Down Expand Up @@ -231,10 +202,6 @@ func (ovc *OverlayController) podFilter(obj client.Object) bool {
klog.Infof("object {%s} is not of type corev1.Pod", obj.GetName())
return false
}
// Filter by labels.
if match := ovc.podSelector.Matches(labels.Set(obj.GetLabels())); !match {
return false
}
// If it is our pod then process it.
if ovc.podIP == p.Status.PodIP {
return true
Expand Down
55 changes: 12 additions & 43 deletions internal/liqonet/route-operator/overlayOperator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ var _ = Describe("OverlayOperator", func() {
ObjectMeta: metav1.ObjectMeta{
Name: overlayReq.Name,
Namespace: overlayReq.Namespace,
Labels: map[string]string{
podNameLabelKey: podNameLabelValue,
podInstanceLabelKey: podInstanceLabelValue,
},
Annotations: map[string]string{
overlayAnnKey: overlayAnnValue,
},
Expand All @@ -82,17 +78,14 @@ var _ = Describe("OverlayOperator", func() {
},
}
// Create dummy overlay operator.
s, err := metav1.LabelSelectorAsSelector(PodLabelSelector)
Expect(err).ShouldNot(HaveOccurred())
ovc = &OverlayController{
podSelector: s,
podIP: overlayPodIP,
vxlanPeers: make(map[string]*overlay.Neighbor, 0),
vxlanDev: vxlanDevice,
Client: k8sClient,
nodesLock: &sync.RWMutex{},
vxlanNodes: map[string]string{},
podToNode: map[string]string{},
podIP: overlayPodIP,
vxlanPeers: make(map[string]*overlay.Neighbor),
vxlanDev: vxlanDevice,
Client: k8sClient,
nodesLock: &sync.RWMutex{},
vxlanNodes: map[string]string{},
podToNode: map[string]string{},
}
// Add fdb entries for existing peer.
Expect(addFdb(overlayExistingNeigh, vxlanDevice.Link.Attrs().Index))
Expand All @@ -104,33 +97,18 @@ var _ = Describe("OverlayOperator", func() {
})
Describe("testing NewOverlayOperator function", func() {
Context("when input parameters are not correct", func() {
It("label selector is not correct, should return nil and error", func() {
labelSelector := &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: podInstanceLabelKey,
Operator: "incorrect",
Values: []string{podInstanceLabelValue},
},
},
}
ovc, err := NewOverlayController(overlayPodIP, labelSelector, vxlanDevice, &sync.RWMutex{}, nil, k8sClient)
Expect(err).Should(MatchError("\"incorrect\" is not a valid pod selector operator"))
Expect(ovc).Should(BeNil())
})

It("vxlan device is not correct, should return nil and error", func() {
ovc, err := NewOverlayController(overlayPodIP, PodLabelSelector, nil, &sync.RWMutex{}, nil, k8sClient)
ovcTest, err := NewOverlayController(overlayPodIP, nil, &sync.RWMutex{}, nil, k8sClient)
Expect(err).Should(MatchError(&liqoerrors.WrongParameter{Parameter: "vxlanDevice", Reason: liqoerrors.NotNil}))
Expect(ovc).Should(BeNil())
Expect(ovcTest).Should(BeNil())
})
})

Context("when input parameters are correct", func() {
It("should return overlay controller and nil", func() {
ovc, err := NewOverlayController(overlayPodIP, PodLabelSelector, vxlanDevice, &sync.RWMutex{}, nil, k8sClient)
ovcTest, err := NewOverlayController(overlayPodIP, vxlanDevice, &sync.RWMutex{}, nil, k8sClient)
Expect(err).ShouldNot(HaveOccurred())
Expect(ovc).ShouldNot(BeNil())
Expect(ovcTest).ShouldNot(BeNil())
})
})
})
Expand Down Expand Up @@ -346,16 +324,7 @@ var _ = Describe("OverlayOperator", func() {
})
})

Context("when pod has not the right labels", func() {
It("should return false", func() {
// Remove the labels from the test pod.
overlayTestPod.SetLabels(nil)
ok := ovc.podFilter(overlayTestPod)
Expect(ok).Should(BeFalse())
})
})

Context("when pod has the right labels", func() {
Context("when object is a pod", func() {
It("and has same ip, should return true", func() {
// Add ip address to the test pod.
overlayTestPod.Status.PodIP = overlayPodIP
Expand Down

0 comments on commit 31fbf06

Please sign in to comment.