From 31fbf066d1cbc34e8b69e712b08508bf02838ffd Mon Sep 17 00:00:00 2001 From: alacuku Date: Thu, 15 Jul 2021 19:19:02 +0200 Subject: [PATCH] limiting cache clients for symmetricRoutingOperator and overlayOperator in liqo-route --- cmd/liqonet/main.go | 108 +++++++++++++++--- .../liqonet/route-operator/overlayOperator.go | 61 +++------- .../route-operator/overlayOperator_test.go | 55 ++------- .../symmetricRoutingOperator.go | 20 ++-- .../symmetricRoutingOperator_test.go | 9 -- pkg/consts/replication.go | 4 + pkg/virtualKubelet/provider/pods.go | 14 ++- 7 files changed, 135 insertions(+), 136 deletions(-) diff --git a/cmd/liqonet/main.go b/cmd/liqonet/main.go index 9792158d73..e8ac3f2bf4 100644 --- a/cmd/liqonet/main.go +++ b/cmd/liqonet/main.go @@ -18,10 +18,15 @@ package main import ( "flag" "os" + "strings" "sync" "time" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/selection" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -45,6 +50,15 @@ import ( "github.com/liqotech/liqo/pkg/mapperUtils" ) +const ( + // This labels are the ones set during the deployment of liqo using the helm chart. + // Any change to those labels on the helm chart has also to be reflected here. + podInstanceLabelKey = "app.kubernetes.io/instance" + routeInstanceLabelValue = "liqo-route" + podNameLabelKey = "app.kubernetes.io/name" + routeNameLabelValue = "route" +) + var ( scheme = runtime.NewScheme() vxlanConfig = &overlay.VxlanDeviceAttrs{ @@ -78,15 +92,6 @@ func main() { case liqoconst.LiqoRouteOperatorName: mutex := &sync.RWMutex{} nodeMap := map[string]string{} - mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ - MapperProvider: mapperUtils.LiqoMapperProvider(scheme), - Scheme: scheme, - MetricsBindAddress: metricsAddr, - }) - if err != nil { - klog.Errorf("unable to get manager: %s", err) - os.Exit(1) - } // Get the pod ip and parse to net.IP. podIP, err := utils.GetPodIP() if err != nil { @@ -98,6 +103,68 @@ func main() { klog.Errorf("unable to get node name: %v", err) os.Exit(1) } + podNamespace, err := utils.GetPodNamespace() + if err != nil { + klog.Errorf("unable to get pod namespace: %v", err) + os.Exit(1) + } + // Asking the api-server to only inform the operator for the pods running in a node different from the one + // where the operator is running. + smcFieldSelector, err := fields.ParseSelector(strings.Join([]string{"spec.nodeName", "!=", nodeName}, "")) + if err != nil { + klog.Errorf("unable to create label requirement: %v", err) + os.Exit(1) + } + // Asking the api-server to only inform the operator for the pods running in a node different from + // the virtual nodes. We want to process only the pods running on the local cluster and not the ones + // offloaded to a remote cluster. + smcLabelRequirement, err := labels.NewRequirement(liqoconst.LocalPodLabelKey, selection.DoesNotExist, []string{}) + if err != nil { + klog.Errorf("unable to create label requirement: %v", err) + os.Exit(1) + } + smcLabelSelector := labels.NewSelector().Add(*smcLabelRequirement) + mainMgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + MapperProvider: mapperUtils.LiqoMapperProvider(scheme), + Scheme: scheme, + MetricsBindAddress: metricsAddr, + NewCache: cache.BuilderWithOptions(cache.Options{ + SelectorsByObject: cache.SelectorsByObject{ + &corev1.Pod{}: { + Field: smcFieldSelector, + Label: smcLabelSelector, + }, + }, + }), + }) + if err != nil { + klog.Errorf("unable to get manager: %s", err) + os.Exit(1) + } + // Asking the api-server to only inform the operator for the pods that are part of the route component. + ovcLabelSelector := labels.SelectorFromSet(labels.Set{ + podNameLabelKey: routeNameLabelValue, + podInstanceLabelKey: routeInstanceLabelValue, + }) + // This manager is used by the overlay operator and it is limited to the pods running + // on the same namespace as the operator. + overlayMgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + MapperProvider: mapperUtils.LiqoMapperProvider(scheme), + Scheme: scheme, + MetricsBindAddress: metricsAddr, + Namespace: podNamespace, + NewCache: cache.BuilderWithOptions(cache.Options{ + SelectorsByObject: cache.SelectorsByObject{ + &corev1.Pod{}: { + Label: ovcLabelSelector, + }, + }, + }), + }) + if err != nil { + klog.Errorf("unable to get manager: %s", err) + os.Exit(1) + } vxlanConfig.VtepAddr = podIP vxlanDevice, err := overlay.NewVxlanDevice(vxlanConfig) if err != nil { @@ -110,33 +177,36 @@ func main() { klog.Errorf("an error occurred while creating the vxlan routing manager: %v", err) os.Exit(1) } - eventRecorder := mgr.GetEventRecorderFor(liqoconst.LiqoRouteOperatorName + "." + podIP.String()) - routeController := routeoperator.NewRouteController(podIP.String(), vxlanDevice, vxlanRoutingManager, eventRecorder, mgr.GetClient()) - if err = routeController.SetupWithManager(mgr); err != nil { + eventRecorder := mainMgr.GetEventRecorderFor(liqoconst.LiqoRouteOperatorName + "." + podIP.String()) + routeController := routeoperator.NewRouteController(podIP.String(), vxlanDevice, vxlanRoutingManager, eventRecorder, mainMgr.GetClient()) + if err = routeController.SetupWithManager(mainMgr); err != nil { klog.Errorf("unable to setup controller: %s", err) os.Exit(1) } - overlayController, err := routeoperator.NewOverlayController(podIP.String(), - routeoperator.PodLabelSelector, vxlanDevice, mutex, nodeMap, mgr.GetClient()) + overlayController, err := routeoperator.NewOverlayController(podIP.String(), vxlanDevice, mutex, nodeMap, overlayMgr.GetClient()) if err != nil { klog.Errorf("an error occurred while creating overlay controller: %v", err) os.Exit(3) } - if err = overlayController.SetupWithManager(mgr); err != nil { + if err = overlayController.SetupWithManager(overlayMgr); err != nil { klog.Errorf("unable to setup overlay controller: %s", err) os.Exit(1) } - symmetricRoutingOperator, err := routeoperator.NewSymmetricRoutingOperator(nodeName, - liqoconst.RoutingTableID, vxlanDevice, mutex, nodeMap, mgr.GetClient()) + symmetricRoutingController, err := routeoperator.NewSymmetricRoutingOperator(nodeName, + liqoconst.RoutingTableID, vxlanDevice, mutex, nodeMap, mainMgr.GetClient()) if err != nil { klog.Errorf("an error occurred while creting symmetric routing controller: %v", err) os.Exit(4) } - if err = symmetricRoutingOperator.SetupWithManager(mgr); err != nil { + if err = symmetricRoutingController.SetupWithManager(mainMgr); err != nil { klog.Errorf("unable to setup overlay controller: %s", err) os.Exit(1) } - if err := mgr.Start(routeController.SetupSignalHandlerForRouteOperator()); err != nil { + if err := mainMgr.Add(overlayMgr); err != nil { + klog.Errorf("unable to add the overlay manager to the main manager: %s", err) + os.Exit(1) + } + if err := mainMgr.Start(routeController.SetupSignalHandlerForRouteOperator()); err != nil { klog.Errorf("unable to start controller: %s", err) os.Exit(1) } diff --git a/internal/liqonet/route-operator/overlayOperator.go b/internal/liqonet/route-operator/overlayOperator.go index db0c4870a4..70ef7dfe01 100644 --- a/internal/liqonet/route-operator/overlayOperator.go +++ b/internal/liqonet/route-operator/overlayOperator.go @@ -9,8 +9,6 @@ import ( "github.com/vishvananda/netlink" corev1 "k8s.io/api/core/v1" k8sApiErrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/klog" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -22,39 +20,17 @@ import ( ) var ( - // This labels are the ones set during the deployment of liqo using the helm chart. - // Any change to those labels on the helm chart has also to be reflected here. - podInstanceLabelKey = "app.kubernetes.io/instance" - podInstanceLabelValue = "liqo-route" - podNameLabelKey = "app.kubernetes.io/name" - podNameLabelValue = "route" - // vxlanMACAddressKey annotation key the mac address of vxlan interface. + // vxlanMACAddressKey annotation key for the mac address of vxlan interface. vxlanMACAddressKey = "net.liqo.io/vxlan.mac.address" - // PodLabelSelector label selector used to track only the route pods. - PodLabelSelector = &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: podInstanceLabelKey, - Operator: metav1.LabelSelectorOpIn, - Values: []string{podInstanceLabelValue}, - }, - { - Key: podNameLabelKey, - Operator: metav1.LabelSelectorOpIn, - Values: []string{podNameLabelValue}, - }, - }, - } ) // OverlayController reconciles pods objects, in our case the route operators pods. type OverlayController struct { client.Client - vxlanDev *overlay.VxlanDevice - podIP string - podSelector labels.Selector - nodesLock *sync.RWMutex - vxlanPeers map[string]*overlay.Neighbor + vxlanDev *overlay.VxlanDevice + podIP string + nodesLock *sync.RWMutex + vxlanPeers map[string]*overlay.Neighbor // For each nodeName contains its IP addr. vxlanNodes map[string]string // Given the namespace/podName it contains the pod name where the pod is running. @@ -112,12 +88,8 @@ func (ovc *OverlayController) Reconcile(ctx context.Context, req ctrl.Request) ( } // NewOverlayController returns a new controller ready to be setup and started with the controller manager. -func NewOverlayController(podIP string, podSelector *metav1.LabelSelector, vxlanDevice *overlay.VxlanDevice, - nodesLock *sync.RWMutex, vxlanNodes map[string]string, cl client.Client) (*OverlayController, error) { - selector, err := metav1.LabelSelectorAsSelector(podSelector) - if err != nil { - return nil, err - } +func NewOverlayController(podIP string, vxlanDevice *overlay.VxlanDevice, nodesLock *sync.RWMutex, + vxlanNodes map[string]string, cl client.Client) (*OverlayController, error) { if vxlanDevice == nil { return nil, &liqoerrors.WrongParameter{ Reason: liqoerrors.NotNil, @@ -125,14 +97,13 @@ func NewOverlayController(podIP string, podSelector *metav1.LabelSelector, vxlan } } return &OverlayController{ - Client: cl, - vxlanDev: vxlanDevice, - podIP: podIP, - podSelector: selector, - nodesLock: nodesLock, - vxlanPeers: map[string]*overlay.Neighbor{}, - vxlanNodes: vxlanNodes, - podToNode: map[string]string{}, + Client: cl, + vxlanDev: vxlanDevice, + podIP: podIP, + nodesLock: nodesLock, + vxlanPeers: map[string]*overlay.Neighbor{}, + vxlanNodes: vxlanNodes, + podToNode: map[string]string{}, }, nil } @@ -231,10 +202,6 @@ func (ovc *OverlayController) podFilter(obj client.Object) bool { klog.Infof("object {%s} is not of type corev1.Pod", obj.GetName()) return false } - // Filter by labels. - if match := ovc.podSelector.Matches(labels.Set(obj.GetLabels())); !match { - return false - } // If it is our pod then process it. if ovc.podIP == p.Status.PodIP { return true diff --git a/internal/liqonet/route-operator/overlayOperator_test.go b/internal/liqonet/route-operator/overlayOperator_test.go index c122769e36..7d740d4e0f 100644 --- a/internal/liqonet/route-operator/overlayOperator_test.go +++ b/internal/liqonet/route-operator/overlayOperator_test.go @@ -58,10 +58,6 @@ var _ = Describe("OverlayOperator", func() { ObjectMeta: metav1.ObjectMeta{ Name: overlayReq.Name, Namespace: overlayReq.Namespace, - Labels: map[string]string{ - podNameLabelKey: podNameLabelValue, - podInstanceLabelKey: podInstanceLabelValue, - }, Annotations: map[string]string{ overlayAnnKey: overlayAnnValue, }, @@ -82,17 +78,14 @@ var _ = Describe("OverlayOperator", func() { }, } // Create dummy overlay operator. - s, err := metav1.LabelSelectorAsSelector(PodLabelSelector) - Expect(err).ShouldNot(HaveOccurred()) ovc = &OverlayController{ - podSelector: s, - podIP: overlayPodIP, - vxlanPeers: make(map[string]*overlay.Neighbor, 0), - vxlanDev: vxlanDevice, - Client: k8sClient, - nodesLock: &sync.RWMutex{}, - vxlanNodes: map[string]string{}, - podToNode: map[string]string{}, + podIP: overlayPodIP, + vxlanPeers: make(map[string]*overlay.Neighbor), + vxlanDev: vxlanDevice, + Client: k8sClient, + nodesLock: &sync.RWMutex{}, + vxlanNodes: map[string]string{}, + podToNode: map[string]string{}, } // Add fdb entries for existing peer. Expect(addFdb(overlayExistingNeigh, vxlanDevice.Link.Attrs().Index)) @@ -104,33 +97,18 @@ var _ = Describe("OverlayOperator", func() { }) Describe("testing NewOverlayOperator function", func() { Context("when input parameters are not correct", func() { - It("label selector is not correct, should return nil and error", func() { - labelSelector := &metav1.LabelSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: podInstanceLabelKey, - Operator: "incorrect", - Values: []string{podInstanceLabelValue}, - }, - }, - } - ovc, err := NewOverlayController(overlayPodIP, labelSelector, vxlanDevice, &sync.RWMutex{}, nil, k8sClient) - Expect(err).Should(MatchError("\"incorrect\" is not a valid pod selector operator")) - Expect(ovc).Should(BeNil()) - }) - It("vxlan device is not correct, should return nil and error", func() { - ovc, err := NewOverlayController(overlayPodIP, PodLabelSelector, nil, &sync.RWMutex{}, nil, k8sClient) + ovcTest, err := NewOverlayController(overlayPodIP, nil, &sync.RWMutex{}, nil, k8sClient) Expect(err).Should(MatchError(&liqoerrors.WrongParameter{Parameter: "vxlanDevice", Reason: liqoerrors.NotNil})) - Expect(ovc).Should(BeNil()) + Expect(ovcTest).Should(BeNil()) }) }) Context("when input parameters are correct", func() { It("should return overlay controller and nil", func() { - ovc, err := NewOverlayController(overlayPodIP, PodLabelSelector, vxlanDevice, &sync.RWMutex{}, nil, k8sClient) + ovcTest, err := NewOverlayController(overlayPodIP, vxlanDevice, &sync.RWMutex{}, nil, k8sClient) Expect(err).ShouldNot(HaveOccurred()) - Expect(ovc).ShouldNot(BeNil()) + Expect(ovcTest).ShouldNot(BeNil()) }) }) }) @@ -346,16 +324,7 @@ var _ = Describe("OverlayOperator", func() { }) }) - Context("when pod has not the right labels", func() { - It("should return false", func() { - // Remove the labels from the test pod. - overlayTestPod.SetLabels(nil) - ok := ovc.podFilter(overlayTestPod) - Expect(ok).Should(BeFalse()) - }) - }) - - Context("when pod has the right labels", func() { + Context("when object is a pod", func() { It("and has same ip, should return true", func() { // Add ip address to the test pod. overlayTestPod.Status.PodIP = overlayPodIP diff --git a/internal/liqonet/route-operator/symmetricRoutingOperator.go b/internal/liqonet/route-operator/symmetricRoutingOperator.go index e13b1deba2..1cb7c8790c 100644 --- a/internal/liqonet/route-operator/symmetricRoutingOperator.go +++ b/internal/liqonet/route-operator/symmetricRoutingOperator.go @@ -13,7 +13,6 @@ import ( "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" liqoerrors "github.com/liqotech/liqo/pkg/liqonet/errors" @@ -22,6 +21,8 @@ import ( "github.com/liqotech/liqo/pkg/liqonet/utils" ) +const infoLogLevel = 4 + // SymmetricRoutingController reconciles pods objects, in our case all the existing pods. type SymmetricRoutingController struct { client.Client @@ -63,7 +64,7 @@ func (src *SymmetricRoutingController) Reconcile(ctx context.Context, req ctrl.R added, err := src.addRoute(req, &p) if err != nil { if err.Error() == "ip not set" { - klog.V(4).Infof("unable to set route for pod {%s}: ip address for node {%s} has not been set yet", + klog.V(infoLogLevel).Infof("unable to set route for pod {%s}: ip address for node {%s} has not been set yet", req.String(), p.Spec.NodeName) return ctrl.Result{}, err } @@ -148,14 +149,13 @@ func (src *SymmetricRoutingController) podFilter(obj client.Object) bool { klog.Infof("object {%s} is not of type corev1.Pod", obj.GetName()) return false } - // Check if pod is running on same node as the operator. - if p.Spec.NodeName == src.nodeName { - klog.V(4).Infof("skipping pod {%s} running on our same node {%s}", p.Name, p.Spec.NodeName) - return false - } // If podIP is not set return false. + // Here the newly created pods scheduled on a virtual node will be skipped. The filtered cache for all the pods + // scheduled on a virtual node works only when the correct label has been added to the pod. When pods are created + // the label is not present, but we are sure that it will be added before the IP address for the same pod is set. + //Once the pods have been labeled the api server should not inform the controller about them. if p.Status.PodIP == "" { - klog.Infof("skipping pod {%s} running on node {%s} has ip address set to empty", p.Name, p.Spec.NodeName) + klog.V(infoLogLevel).Infof("skipping pod {%s} running on node {%s} has ip address set to empty", p.Name, p.Spec.NodeName) return false } return true @@ -164,10 +164,6 @@ func (src *SymmetricRoutingController) podFilter(obj client.Object) bool { // SetupWithManager used to set up the controller with a given manager. func (src *SymmetricRoutingController) SetupWithManager(mgr ctrl.Manager) error { p := predicate.NewPredicateFuncs(src.podFilter) - // We only filter out pods when the event is of type {event.CreateEvent} and {event.UpdateEvent} - p.DeleteFunc = func(event event.DeleteEvent) bool { - return true - } return ctrl.NewControllerManagedBy(mgr).For(&corev1.Pod{}).WithEventFilter(p). Complete(src) } diff --git a/internal/liqonet/route-operator/symmetricRoutingOperator_test.go b/internal/liqonet/route-operator/symmetricRoutingOperator_test.go index a65dd6b464..5b7333f097 100644 --- a/internal/liqonet/route-operator/symmetricRoutingOperator_test.go +++ b/internal/liqonet/route-operator/symmetricRoutingOperator_test.go @@ -296,15 +296,6 @@ var _ = Describe("SymmetricRoutingOperator", func() { }) }) - Context("when pod is running on same node as the operator", func() { - It("should return false", func() { - // Set the same node name. - srcTestPod.Spec.NodeName = srcNodeName - ok := src.podFilter(srcTestPod) - Expect(ok).Should(BeFalse()) - }) - }) - Context("when pod is running on different node than operator", func() { It("podIP is not set, should return false", func() { ok := src.podFilter(srcTestPod) diff --git a/pkg/consts/replication.go b/pkg/consts/replication.go index ebdefc54d9..980c6f4adf 100644 --- a/pkg/consts/replication.go +++ b/pkg/consts/replication.go @@ -11,4 +11,8 @@ const ( // - the spec of the resource is owned by the local cluster. // - the status by the remote cluster. OwnershipShared OwnershipType = "Shared" + // LocalPodLabelKey label key added to all the local pods that have been offloaded/replicated to a remote cluster. + LocalPodLabelKey = "liqo.io/shadowPod" + // LocalPodLabelValue value of the label added to the local pods that have been offloaded/replicated to a remote cluster. + LocalPodLabelValue = "true" ) diff --git a/pkg/virtualKubelet/provider/pods.go b/pkg/virtualKubelet/provider/pods.go index 225d6b356f..8a051bc194 100644 --- a/pkg/virtualKubelet/provider/pods.go +++ b/pkg/virtualKubelet/provider/pods.go @@ -18,6 +18,7 @@ import ( "k8s.io/klog" stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1" + liqoconst "github.com/liqotech/liqo/pkg/consts" "github.com/liqotech/liqo/pkg/virtualKubelet" apimgmgt "github.com/liqotech/liqo/pkg/virtualKubelet/apiReflection" vkContext "github.com/liqotech/liqo/pkg/virtualKubelet/context" @@ -55,15 +56,16 @@ func (p *LiqoProvider) CreatePod(ctx context.Context, homePod *corev1.Pod) error foreignReplicaset := forge.ReplicasetFromPod(foreignPod) - // add a finalizer to allow the pod to be garbage collected by the incoming replicaset reflector - finalizerPatch := []byte(fmt.Sprintf( - `[{"op":"add","path":"/metadata/finalizers","value":["%s"]}]`, - virtualKubelet.HomePodFinalizer)) + // Add a finalizer to allow the pod to be garbage collected by the incoming replicaset reflector. + // Add label to distinct the offloaded pods from the local ones. + homePodPatch := []byte(fmt.Sprintf( + `{"metadata":{"labels":{"%s":"%s"},"finalizers":["%s"]}}`, + liqoconst.LocalPodLabelKey, liqoconst.LocalPodLabelValue, virtualKubelet.HomePodFinalizer)) _, err = p.nntClient.Client().CoreV1().Pods(homePod.Namespace).Patch(context.TODO(), homePod.Name, - types.JSONPatchType, - finalizerPatch, + types.StrategicMergePatchType, + homePodPatch, metav1.PatchOptions{}) if err != nil { klog.Error(err)