From 5a1e3067bc99d2d4067436224348e6d5f25c7cb1 Mon Sep 17 00:00:00 2001 From: giuseppe_alicino Date: Wed, 5 May 2021 19:33:41 +0200 Subject: [PATCH] Created new Broadcaser which start an informer to watch Nodes for ResourceOffer generation/update Add logic reacting after node add, update and delete Add ReadResources() function to read safely the cluster resources Added clusterconfig watcher to get scaling percentage. Refactor of resource writing to manage all kind of resources. Added new tests and fixed linting Added Node Phase check. Write Update and Delete Node tests. --- cmd/advertisement-operator/main.go | 17 +- .../resource-request-operator/broadcaster.go | 183 ++++++++++++++++++ .../resourceRequest_controller.go | 14 +- .../resourceRequest_controller_test.go | 117 ++++++++++- .../resource-request-operator/suite_test.go | 23 ++- internal/resource-request-operator/utils.go | 15 +- 6 files changed, 338 insertions(+), 31 deletions(-) create mode 100644 internal/resource-request-operator/broadcaster.go diff --git a/cmd/advertisement-operator/main.go b/cmd/advertisement-operator/main.go index fb0510c7b0..5a7480abbd 100644 --- a/cmd/advertisement-operator/main.go +++ b/cmd/advertisement-operator/main.go @@ -163,10 +163,20 @@ func main() { os.Exit(1) } + newBroadcaster := &resourceRequestOperator.Broadcaster{} + + broadcasterStopper := make(chan struct{}) + + if err := newBroadcaster.SetupBroadcaster(clientset, config, broadcasterStopper); err != nil { + klog.Error(err) + os.Exit(1) + } + resourceRequestReconciler := &resourceRequestOperator.ResourceRequestReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - ClusterID: clusterId, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + ClusterID: clusterId, + NewBroadcaster: newBroadcaster, } if err = resourceRequestReconciler.SetupWithManager(mgr); err != nil { @@ -192,5 +202,6 @@ func main() { } close(c) close(client.Stop) + close(broadcasterStopper) wg.Wait() } diff --git a/internal/resource-request-operator/broadcaster.go b/internal/resource-request-operator/broadcaster.go new file mode 100644 index 0000000000..8ded96e57a --- /dev/null +++ b/internal/resource-request-operator/broadcaster.go @@ -0,0 +1,183 @@ +package resourcerequestoperator + +import ( + "fmt" + "sync" + + crdclient "github.com/liqotech/liqo/pkg/crdClient" + + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + + configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1" + "github.com/liqotech/liqo/pkg/utils" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/klog" +) + +// Broadcaster is an object which is used to get resources of the cluster. +type Broadcaster struct { + allocatable corev1.ResourceList + clusterConfig configv1alpha1.ClusterConfig + offerMutex sync.RWMutex + configMutex sync.RWMutex +} + +// SetupBroadcaster create the informer e run it to signal node changes updating Offers. +func (b *Broadcaster) SetupBroadcaster(clientset *kubernetes.Clientset, config *rest.Config, stopper chan struct{}) error { + b.allocatable = corev1.ResourceList{} + factory := informers.NewSharedInformerFactory(clientset, 0) + informer := factory.Core().V1().Nodes().Informer() + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: b.onAdd, + UpdateFunc: b.onUpdate, + DeleteFunc: b.onDelete, + }) + + config.ContentConfig.GroupVersion = &configv1alpha1.GroupVersion + config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() + config.UserAgent = rest.DefaultKubernetesUserAgent() + CRDclient, err := crdclient.NewFromConfig(config) + if err != nil { + return err + } + go utils.WatchConfiguration(b.setConfig, CRDclient, "") + go informer.Run(stopper) + return nil +} + +func (b *Broadcaster) setConfig(configuration *configv1alpha1.ClusterConfig) { + b.configMutex.Lock() + defer b.configMutex.Unlock() + b.clusterConfig = *configuration +} + +// react to a Node Creation/First informer run. +func (b *Broadcaster) onAdd(obj interface{}) { + node := obj.(*corev1.Node) + if node.Status.Phase == corev1.NodeRunning { + toAdd := &node.Status.Allocatable + currentResources := b.allocatable.DeepCopy() + + for resourceName, quantity := range *toAdd { + value := currentResources[resourceName] + if !quantity.IsZero() { + value.Add(quantity) + } + currentResources[resourceName] = value + } + + if err := b.writeResources(currentResources); err != nil { + klog.Errorf("OnAdd error: unable to write allocatable of Node %s: %s", node.Name, err) + } + } +} + +// react to a Node Update. +func (b *Broadcaster) onUpdate(oldObj, newObj interface{}) { + oldNode := oldObj.(*corev1.Node) + newNode := newObj.(*corev1.Node) + oldNodeResources := oldNode.Status.Allocatable + newNodeResources := newNode.Status.Allocatable + currentResources := b.allocatable.DeepCopy() + if newNode.Status.Phase == corev1.NodeRunning { + // node was already Running, update with possible new resources. + if oldNode.Status.Phase == corev1.NodeRunning { + for resourceName, quantity := range newNodeResources { + oldQuantity := oldNodeResources[resourceName] + value := currentResources[resourceName] + if quantityToUpdate := resource.NewQuantity(quantity.Value()-oldQuantity.Value(), + quantity.Format); !quantityToUpdate.IsZero() { + value.Add(*quantityToUpdate) + } + currentResources[resourceName] = value + } + // node is starting, add all its resources. + } else if oldNode.Status.Phase == corev1.NodePending || oldNode.Status.Phase == corev1.NodeTerminated { + for resourceName, quantity := range newNodeResources { + value := currentResources[resourceName] + if !quantity.IsZero() { + value.Add(quantity) + } + currentResources[resourceName] = value + } + } + // node is terminating or stopping, delete all its resources. + } else if oldNode.Status.Phase == corev1.NodeRunning && + (newNode.Status.Phase == corev1.NodeTerminated || newNode.Status.Phase == corev1.NodePending) { + for resourceName, quantity := range oldNodeResources { + value := currentResources[resourceName] + if !quantity.IsZero() { + value.Sub(quantity) + } + currentResources[resourceName] = value + } + } + if err := b.writeResources(currentResources); err != nil { + klog.Errorf("OnUpdate error: unable to write allocatable of Node %s: %s", newNode.Name, err) + } +} + +// react to a Node Delete. +func (b *Broadcaster) onDelete(obj interface{}) { + node := obj.(*corev1.Node) + toDelete := &node.Status.Allocatable + currentResources := b.allocatable.DeepCopy() + if node.Status.Phase == corev1.NodeRunning { + for resourceName, quantity := range *toDelete { + value := currentResources[resourceName] + if !quantity.IsZero() { + value.Sub(quantity) + } + currentResources[resourceName] = value + } + + if err := b.writeResources(currentResources); err != nil { + klog.Errorf("OnAdd error: unable to write allocatable of Node %s: %s", node.Name, err) + } + } +} + +// write cluster resources in thread safe mode. +func (b *Broadcaster) writeResources(newResources corev1.ResourceList) error { + b.offerMutex.Lock() + defer b.offerMutex.Unlock() + if newResources != nil { + b.allocatable = newResources + return nil + } + + return fmt.Errorf("some error occurred during cluster resources read. Attempting writing nil resources") +} + +// ReadResources return in thread safe mode a scaled value of the resources. +func (b *Broadcaster) ReadResources() (corev1.ResourceList, error) { + b.offerMutex.RLock() + defer b.offerMutex.RUnlock() + if b.allocatable == nil { + return nil, fmt.Errorf("error getting cluster resources") + } + toRead := b.allocatable.DeepCopy() + for resourceName, quantity := range toRead { + scaled := quantity + b.scaleResources(&scaled) + toRead[resourceName] = scaled + } + return toRead, nil +} + +func (b *Broadcaster) scaleResources(quantity *resource.Quantity) { + b.configMutex.RLock() + percentage := int64(b.clusterConfig.Spec.AdvertisementConfig.OutgoingConfig.ResourceSharingPercentage) + b.configMutex.RUnlock() + if percentage == 0 { + return + } + + quantity.Set(quantity.Value() * percentage / 100) +} diff --git a/internal/resource-request-operator/resourceRequest_controller.go b/internal/resource-request-operator/resourceRequest_controller.go index 1cb48490de..1037bd9c29 100644 --- a/internal/resource-request-operator/resourceRequest_controller.go +++ b/internal/resource-request-operator/resourceRequest_controller.go @@ -4,7 +4,6 @@ import ( "context" "time" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -16,18 +15,11 @@ import ( // ResourceRequestReconciler reconciles a ResourceRequest object. type ResourceRequestReconciler struct { client.Client - Scheme *runtime.Scheme - ClusterID string + Scheme *runtime.Scheme + ClusterID string + NewBroadcaster *Broadcaster } -// ResourceToOffer is a custom struct to encapsulate cluster's ResourceList. -type ResourceToOffer struct { - Offers corev1.ResourceList -} - -// ResourceToOffer is a placeholder var with fake cluster resources. -var resources ResourceToOffer - const ( offerPrefix = "resourceoffer-" timeToLive = 30 * time.Minute diff --git a/internal/resource-request-operator/resourceRequest_controller_test.go b/internal/resource-request-operator/resourceRequest_controller_test.go index 47348b5634..95fec8c0ab 100644 --- a/internal/resource-request-operator/resourceRequest_controller_test.go +++ b/internal/resource-request-operator/resourceRequest_controller_test.go @@ -4,6 +4,9 @@ import ( "context" "time" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" . "github.com/onsi/gomega/gstruct" @@ -27,8 +30,39 @@ var _ = Describe("ResourceRequest controller", func() { Context("Testing ResourceRequest Controller when creating a new ResourceRequest", func() { It("Should create new ResourceRequest and related ResourceOffer ", func() { - By("By creating a new ResourceRequest") ctx := context.Background() + By("Creating mock nodes") + resources := corev1.ResourceList{} + resources[corev1.ResourceCPU] = *resource.NewQuantity(2, resource.DecimalSI) + resources[corev1.ResourceMemory] = *resource.NewQuantity(1, resource.BinarySI) + resources[corev1.ResourceLimitsCPU] = *resource.NewQuantity(3, resource.DecimalSI) + node1 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node1", + Namespace: "default", + }, + Status: corev1.NodeStatus{ + Capacity: resources, + Allocatable: resources, + Phase: corev1.NodeRunning, + }, + } + _, err := clientset.CoreV1().Nodes().Create(ctx, node1, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + node2 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node2", + Namespace: "default", + }, + Status: corev1.NodeStatus{ + Capacity: resources, + Allocatable: resources, + Phase: corev1.NodeRunning, + }, + } + _, err = clientset.CoreV1().Nodes().Create(ctx, node2, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + By("By creating a new ResourceRequest") resourceRequest := &discoveryv1alpha1.ResourceRequest{ ObjectMeta: metav1.ObjectMeta{ Name: ResourceRequestName, @@ -70,6 +104,87 @@ var _ = Describe("ResourceRequest controller", func() { Expect(createdResourceOffer.GetOwnerReferences()).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ "Name": Equal(createdResourceRequest.Name), }))) + By("Checking resources at creation") + offerResources := createdResourceOffer.Spec.ResourceQuota.Hard + for resourceName, quantity := range offerResources { + testValue := node1.Status.Allocatable[resourceName] + testValue.Add(node2.Status.Allocatable[resourceName]) + testValue.Set(testValue.Value()) + Expect(quantity.Cmp(testValue)).Should(BeZero()) + } + By("Checking update node phase") + node1.Status.Phase = corev1.NodePending + _, err = clientset.CoreV1().Nodes().UpdateStatus(ctx, node1, metav1.UpdateOptions{}) + Expect(err).To(BeNil()) + Eventually(func() bool { + resourcesRead, err := newBroadcaster.ReadResources() + if err != nil { + return false + } + for resourceName, quantity := range resourcesRead { + toCheck := node2.Status.Allocatable[resourceName] + if quantity.Cmp(toCheck) != 0 { + return false + } + } + return true + }, timeout, interval).Should(BeTrue()) + node1.Status.Phase = corev1.NodeRunning + _, err = clientset.CoreV1().Nodes().UpdateStatus(ctx, node1, metav1.UpdateOptions{}) + Expect(err).To(BeNil()) + Eventually(func() bool { + resourcesRead, err := newBroadcaster.ReadResources() + if err != nil { + return false + } + for resourceName, quantity := range resourcesRead { + toCheck := node2.Status.Allocatable[resourceName] + toCheck.Add(node1.Status.Allocatable[resourceName]) + if quantity.Cmp(toCheck) != 0 { + return false + } + } + return true + }, timeout, interval).Should(BeTrue()) + By("Checking update resources") + toUpdate := node1.Status.Allocatable + for _, quantity := range toUpdate { + quantity.Sub(*resource.NewQuantity(1, quantity.Format)) + } + node1.Status.Allocatable = toUpdate + _, err = clientset.CoreV1().Nodes().UpdateStatus(ctx, node1, metav1.UpdateOptions{}) + Expect(err).To(BeNil()) + Eventually(func() bool { + resourcesRead, err := newBroadcaster.ReadResources() + if err != nil { + return false + } + for resourceName, quantity := range resourcesRead { + toCheck := node2.Status.Allocatable[resourceName] + toCheck.Add(node1.Status.Allocatable[resourceName]) + if quantity.Cmp(toCheck) != 0 { + return false + } + } + return true + }, timeout, interval).Should(BeTrue()) + By("Checking Node Delete") + err = clientset.CoreV1().Nodes().Delete(ctx, node1.Name, metav1.DeleteOptions{}) + Expect(err).To(BeNil()) + Eventually(func() bool { + resourcesRead, err := newBroadcaster.ReadResources() + if err != nil { + return false + } + for resourceName, quantity := range resourcesRead { + toCheck := node2.Status.Allocatable[resourceName] + if quantity.Cmp(toCheck) != 0 { + return false + } + } + return true + }, timeout, interval).Should(BeTrue()) + }) }) diff --git a/internal/resource-request-operator/suite_test.go b/internal/resource-request-operator/suite_test.go index 809a3c2c73..be92abef33 100644 --- a/internal/resource-request-operator/suite_test.go +++ b/internal/resource-request-operator/suite_test.go @@ -4,6 +4,10 @@ import ( "path/filepath" "testing" + configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1" + + "k8s.io/client-go/kubernetes" + "github.com/google/uuid" ctrl "sigs.k8s.io/controller-runtime" @@ -27,7 +31,10 @@ import ( var cfg *rest.Config var k8sClient client.Client var clusterId string +var clientset *kubernetes.Clientset var testEnv *envtest.Environment +var newBroadcaster Broadcaster +var broadcasterStopper chan struct{} func TestAPIs(t *testing.T) { RegisterFailHandler(Fail) @@ -39,6 +46,7 @@ func TestAPIs(t *testing.T) { var _ = BeforeSuite(func(done Done) { By("bootstrapping test environment") + broadcasterStopper = make(chan struct{}) testEnv = &envtest.Environment{ CRDDirectoryPaths: []string{filepath.Join("..", "..", "deployments", "liqo", "crds")}, } @@ -52,6 +60,8 @@ var _ = BeforeSuite(func(done Done) { Expect(err).NotTo(HaveOccurred()) err = sharingv1alpha1.AddToScheme(scheme.Scheme) Expect(err).NotTo(HaveOccurred()) + err = configv1alpha1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) // +kubebuilder:scaffold:scheme k8sManager, err := ctrl.NewManager(cfg, ctrl.Options{ @@ -59,12 +69,17 @@ var _ = BeforeSuite(func(done Done) { MetricsBindAddress: "0", // this avoids port binding collision }) Expect(err).ToNot(HaveOccurred()) + clientset, err = kubernetes.NewForConfig(k8sManager.GetConfig()) + Expect(err).ToNot(HaveOccurred()) + err = newBroadcaster.SetupBroadcaster(clientset, k8sManager.GetConfig(), broadcasterStopper) + Expect(err).ToNot(HaveOccurred()) id, _ := uuid.NewUUID() clusterId = id.String() err = (&ResourceRequestReconciler{ - Client: k8sManager.GetClient(), - Scheme: k8sManager.GetScheme(), - ClusterID: clusterId, + Client: k8sManager.GetClient(), + Scheme: k8sManager.GetScheme(), + ClusterID: clusterId, + NewBroadcaster: &newBroadcaster, }).SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) @@ -83,4 +98,6 @@ var _ = AfterSuite(func() { By("tearing down the test environment") err := testEnv.Stop() Expect(err).ToNot(HaveOccurred()) + close(broadcasterStopper) + }) diff --git a/internal/resource-request-operator/utils.go b/internal/resource-request-operator/utils.go index fcc3ff70a8..0122e79d67 100644 --- a/internal/resource-request-operator/utils.go +++ b/internal/resource-request-operator/utils.go @@ -5,7 +5,6 @@ import ( "time" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -17,7 +16,7 @@ import ( // this function generate an empty offer. func (r *ResourceRequestReconciler) generateResourceOffer(request *discoveryv1alpha1.ResourceRequest) error { - err := r.computeResources() + resources, err := r.NewBroadcaster.ReadResources() if err != nil { return err } @@ -38,7 +37,7 @@ func (r *ResourceRequestReconciler) generateResourceOffer(request *discoveryv1al ClusterId: r.ClusterID, Images: []corev1.ContainerImage{}, ResourceQuota: corev1.ResourceQuotaSpec{ - Hard: resources.Offers, + Hard: resources, }, Timestamp: creationTime, TimeToLive: metav1.NewTime(creationTime.Add(timeToLive)), @@ -53,13 +52,3 @@ func (r *ResourceRequestReconciler) generateResourceOffer(request *discoveryv1al klog.Infof("%s -> %s Offer: %s", r.ClusterID, op, offer.ObjectMeta.Name) return nil } - -// this function returns all resource available that will be offered to remote cluster. -func (r *ResourceRequestReconciler) computeResources() error { - // placeholder for future logic - limits := corev1.ResourceList{} - limits[corev1.ResourceCPU] = *resource.NewQuantity(2, "2") - limits[corev1.ResourceMemory] = *resource.NewQuantity(1, "2m") - resources.Offers = limits - return nil -}