From 64e52a4e19fc9a729ffb8ab83ab975fad024a4fb Mon Sep 17 00:00:00 2001 From: giuseppe_alicino Date: Wed, 5 May 2021 19:33:41 +0200 Subject: [PATCH] Created new Broadcaser which start an informer to watch Nodes for ResourceOffer generation/update Add logic reacting after node add, update and delete Add ReadResources() function to read safely the cluster resources Added clusterconfig watcher to get scaling percentage. Refactor of resource writing to manage all kind of resources. Added new tests and fixed linting Added Node Phase check. Write Update and Delete Node tests. Fixed check resource value in add/sub/update functions Fixed existence cases. Add test of scaling percentage. add new methods StartBroadcaster() and WatchConfiguration() --- cmd/advertisement-operator/main.go | 25 ++- .../resource-request-operator/broadcaster.go | 197 ++++++++++++++++++ .../resourceRequest_controller.go | 14 +- .../resourceRequest_controller_test.go | 118 ++++++++++- .../resource-request-operator/suite_test.go | 45 +++- internal/resource-request-operator/utils.go | 17 +- 6 files changed, 377 insertions(+), 39 deletions(-) create mode 100644 internal/resource-request-operator/broadcaster.go diff --git a/cmd/advertisement-operator/main.go b/cmd/advertisement-operator/main.go index bdca4528b5..5bbc5ae1dd 100644 --- a/cmd/advertisement-operator/main.go +++ b/cmd/advertisement-operator/main.go @@ -13,6 +13,7 @@ limitations under the License. package main import ( + "context" "flag" "os" "sync" @@ -172,10 +173,19 @@ func main() { os.Exit(1) } + newBroadcaster := &resourceRequestOperator.Broadcaster{} + componentStopper := make(chan struct{}) + + if err := newBroadcaster.SetupBroadcaster(clientset, time.Duration(resyncPeriod)); err != nil { + klog.Error(err) + os.Exit(1) + } + resourceRequestReconciler := &resourceRequestOperator.ResourceRequestReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - ClusterID: clusterId, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + ClusterID: clusterId, + NewBroadcaster: newBroadcaster, } if err = resourceRequestReconciler.SetupWithManager(mgr); err != nil { @@ -204,10 +214,14 @@ func main() { if err != nil { os.Exit(1) } - wg.Add(3) - go advertisementReconciler.CleanOldAdvertisements(c, wg) + wg.Add(4) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go advertisementReconciler.CleanOldAdvertisements(componentStopper, wg) // TODO: this configuration watcher will be refactored before the release 0.3 go advertisementReconciler.WatchConfiguration(localKubeconfig, client, wg) + go newBroadcaster.WatchConfiguration(localKubeconfig, client, wg) + go newBroadcaster.StartBroadcaster(ctx, wg) go resourceOfferReconciler.WatchConfiguration(localKubeconfig, client, wg) klog.Info("starting manager as advertisementoperator") @@ -217,5 +231,6 @@ func main() { } close(c) close(client.Stop) + close(componentStopper) wg.Wait() } diff --git a/internal/resource-request-operator/broadcaster.go b/internal/resource-request-operator/broadcaster.go new file mode 100644 index 0000000000..64af5048d9 --- /dev/null +++ b/internal/resource-request-operator/broadcaster.go @@ -0,0 +1,197 @@ +package resourcerequestoperator + +import ( + "context" + "fmt" + "sync" + "time" + + crdclient "github.com/liqotech/liqo/pkg/crdClient" + + configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1" + "github.com/liqotech/liqo/pkg/utils" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/klog" +) + +// Broadcaster is an object which is used to get resources of the cluster. +type Broadcaster struct { + allocatable corev1.ResourceList + clusterConfig configv1alpha1.ClusterConfig + offerMutex sync.RWMutex + configMutex sync.RWMutex + informer cache.SharedInformer +} + +// SetupBroadcaster create the informer e run it to signal node changes updating Offers. +func (b *Broadcaster) SetupBroadcaster(clientset *kubernetes.Clientset, resyncPeriod time.Duration) error { + b.allocatable = corev1.ResourceList{} + factory := informers.NewSharedInformerFactory(clientset, resyncPeriod) + b.informer = factory.Core().V1().Nodes().Informer() + if b.informer == nil { + return fmt.Errorf("SetupBroadcaster -> Error creating Node informer") + } + b.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: b.onAdd, + UpdateFunc: b.onUpdate, + DeleteFunc: b.onDelete, + }) + + return nil +} + +// StartBroadcaster start a new sharedInformer to watch nodes resources. +func (b *Broadcaster) StartBroadcaster(ctx context.Context, group *sync.WaitGroup) { + defer group.Done() + b.informer.Run(ctx.Done()) +} + +// WatchConfiguration start a new watcher to get clusterConfig. +func (b *Broadcaster) WatchConfiguration(localKubeconfig string, crdClient *crdclient.CRDClient, wg *sync.WaitGroup) { + defer wg.Done() + utils.WatchConfiguration(b.setConfig, crdClient, localKubeconfig) +} + +func (b *Broadcaster) setConfig(configuration *configv1alpha1.ClusterConfig) { + b.configMutex.Lock() + defer b.configMutex.Unlock() + b.clusterConfig = *configuration +} + +func (b *Broadcaster) getConfig() *configv1alpha1.ClusterConfig { + b.configMutex.RLock() + defer b.configMutex.RUnlock() + configCopy := b.clusterConfig.DeepCopy() + return configCopy +} + +// react to a Node Creation/First informer run. +func (b *Broadcaster) onAdd(obj interface{}) { + node := obj.(*corev1.Node) + if node.Status.Phase == corev1.NodeRunning { + toAdd := &node.Status.Allocatable + currentResources := b.allocatable.DeepCopy() + addResources(¤tResources, toAdd) + + if err := b.writeResources(currentResources); err != nil { + klog.Errorf("OnAdd error: unable to write allocatable of Node %s: %s", node.Name, err) + } + } +} + +// react to a Node Update. +func (b *Broadcaster) onUpdate(oldObj, newObj interface{}) { + oldNode := oldObj.(*corev1.Node) + newNode := newObj.(*corev1.Node) + oldNodeResources := oldNode.Status.Allocatable + newNodeResources := newNode.Status.Allocatable + currentResources := b.allocatable.DeepCopy() + if newNode.Status.Phase == corev1.NodeRunning { + // node was already Running, update with possible new resources. + if oldNode.Status.Phase == corev1.NodeRunning { + updateResources(¤tResources, &oldNodeResources, &newNodeResources) + // node is starting, add all its resources. + } else if oldNode.Status.Phase == corev1.NodePending || oldNode.Status.Phase == corev1.NodeTerminated { + addResources(¤tResources, &newNodeResources) + } + // node is terminating or stopping, delete all its resources. + } else if oldNode.Status.Phase == corev1.NodeRunning && + (newNode.Status.Phase == corev1.NodeTerminated || newNode.Status.Phase == corev1.NodePending) { + subResources(¤tResources, &oldNodeResources) + } + if err := b.writeResources(currentResources); err != nil { + klog.Errorf("OnUpdate error: unable to write allocatable of Node %s: %s", newNode.Name, err) + } +} + +// react to a Node Delete. +func (b *Broadcaster) onDelete(obj interface{}) { + node := obj.(*corev1.Node) + toDelete := &node.Status.Allocatable + currentResources := b.allocatable.DeepCopy() + if node.Status.Phase == corev1.NodeRunning { + subResources(¤tResources, toDelete) + if err := b.writeResources(currentResources); err != nil { + klog.Errorf("OnAdd error: unable to write allocatable of Node %s: %s", node.Name, err) + } + } +} + +// write cluster resources in thread safe mode. +func (b *Broadcaster) writeResources(newResources corev1.ResourceList) error { + b.offerMutex.Lock() + defer b.offerMutex.Unlock() + if newResources != nil { + b.allocatable = newResources.DeepCopy() + return nil + } + + return fmt.Errorf("some error occurred during cluster resources read. Attempting writing nil resources") +} + +// ReadResources return in thread safe mode a scaled value of the resources. +func (b *Broadcaster) ReadResources() (corev1.ResourceList, error) { + b.offerMutex.RLock() + defer b.offerMutex.RUnlock() + if b.allocatable == nil { + return nil, fmt.Errorf("error getting cluster resources") + } + toRead := b.allocatable.DeepCopy() + for resourceName, quantity := range toRead { + scaled := quantity + b.scaleResources(&scaled) + toRead[resourceName] = scaled + } + return toRead, nil +} + +func (b *Broadcaster) scaleResources(quantity *resource.Quantity) { + percentage := int64(b.getConfig().Spec.AdvertisementConfig.OutgoingConfig.ResourceSharingPercentage) + if percentage == 0 { + return + } + + quantity.Set(quantity.Value() * percentage / 100) +} + +// addResources is an utility function to add resources. +func addResources(currentResources, toAdd *corev1.ResourceList) { + for resourceName, quantity := range *toAdd { + if value, exists := (*currentResources)[resourceName]; exists { + value.Add(quantity) + (*currentResources)[resourceName] = value + } else { + (*currentResources)[resourceName] = quantity + } + } +} + +// subResources is an utility function to subtract resources. +func subResources(currentResources, toSub *corev1.ResourceList) { + for resourceName, quantity := range *toSub { + if value, exists := (*currentResources)[resourceName]; exists { + value.Sub(quantity) + (*currentResources)[resourceName] = value + } + } +} + +// updateResources is an utility function to update resources. +func updateResources(currentResources, oldResources, newResources *corev1.ResourceList) { + for resourceName, quantity := range *newResources { + if oldQuantity, exists := (*oldResources)[resourceName]; exists { + value := (*currentResources)[resourceName] + quantityToUpdate := resource.NewQuantity(quantity.Value()-oldQuantity.Value(), + quantity.Format) + value.Add(*quantityToUpdate) + (*currentResources)[resourceName] = value + } else { + (*currentResources)[resourceName] = quantity + } + } +} diff --git a/internal/resource-request-operator/resourceRequest_controller.go b/internal/resource-request-operator/resourceRequest_controller.go index 1cb48490de..1037bd9c29 100644 --- a/internal/resource-request-operator/resourceRequest_controller.go +++ b/internal/resource-request-operator/resourceRequest_controller.go @@ -4,7 +4,6 @@ import ( "context" "time" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -16,18 +15,11 @@ import ( // ResourceRequestReconciler reconciles a ResourceRequest object. type ResourceRequestReconciler struct { client.Client - Scheme *runtime.Scheme - ClusterID string + Scheme *runtime.Scheme + ClusterID string + NewBroadcaster *Broadcaster } -// ResourceToOffer is a custom struct to encapsulate cluster's ResourceList. -type ResourceToOffer struct { - Offers corev1.ResourceList -} - -// ResourceToOffer is a placeholder var with fake cluster resources. -var resources ResourceToOffer - const ( offerPrefix = "resourceoffer-" timeToLive = 30 * time.Minute diff --git a/internal/resource-request-operator/resourceRequest_controller_test.go b/internal/resource-request-operator/resourceRequest_controller_test.go index 47348b5634..0dc74718f2 100644 --- a/internal/resource-request-operator/resourceRequest_controller_test.go +++ b/internal/resource-request-operator/resourceRequest_controller_test.go @@ -7,6 +7,8 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" . "github.com/onsi/gomega/gstruct" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -27,8 +29,37 @@ var _ = Describe("ResourceRequest controller", func() { Context("Testing ResourceRequest Controller when creating a new ResourceRequest", func() { It("Should create new ResourceRequest and related ResourceOffer ", func() { - By("By creating a new ResourceRequest") ctx := context.Background() + By("Creating mock nodes") + resources := corev1.ResourceList{} + resources[corev1.ResourceCPU] = *resource.NewQuantity(2, resource.DecimalSI) + resources[corev1.ResourceMemory] = *resource.NewQuantity(1, resource.BinarySI) + resources[corev1.ResourceLimitsCPU] = *resource.NewQuantity(3, resource.DecimalSI) + node1 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node1", + }, + Status: corev1.NodeStatus{ + Capacity: resources, + Allocatable: resources, + Phase: corev1.NodeRunning, + }, + } + _, err := clientset.CoreV1().Nodes().Create(ctx, node1, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + node2 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node2", + }, + Status: corev1.NodeStatus{ + Capacity: resources, + Allocatable: resources, + Phase: corev1.NodeRunning, + }, + } + _, err = clientset.CoreV1().Nodes().Create(ctx, node2, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + By("By creating a new ResourceRequest") resourceRequest := &discoveryv1alpha1.ResourceRequest{ ObjectMeta: metav1.ObjectMeta{ Name: ResourceRequestName, @@ -70,6 +101,91 @@ var _ = Describe("ResourceRequest controller", func() { Expect(createdResourceOffer.GetOwnerReferences()).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ "Name": Equal(createdResourceRequest.Name), }))) + By("Checking resources at creation") + offerResources := createdResourceOffer.Spec.ResourceQuota.Hard + for resourceName, quantity := range offerResources { + testValue := node1.Status.Allocatable[resourceName] + testValue.Add(node2.Status.Allocatable[resourceName]) + testValue.Set(testValue.Value() * 50 / 100) + Expect(quantity.Cmp(testValue)).Should(BeZero()) + } + By("Checking update node phase") + node1.Status.Phase = corev1.NodePending + _, err = clientset.CoreV1().Nodes().UpdateStatus(ctx, node1, metav1.UpdateOptions{}) + Expect(err).To(BeNil()) + Eventually(func() bool { + resourcesRead, err := newBroadcaster.ReadResources() + if err != nil { + return false + } + for resourceName, quantity := range resourcesRead { + toCheck := node2.Status.Allocatable[resourceName] + toCheck.Set(toCheck.Value() * 50 / 100) + if quantity.Cmp(toCheck) != 0 { + return false + } + } + return true + }, timeout, interval).Should(BeTrue()) + node1.Status.Phase = corev1.NodeRunning + _, err = clientset.CoreV1().Nodes().UpdateStatus(ctx, node1, metav1.UpdateOptions{}) + Expect(err).To(BeNil()) + Eventually(func() bool { + resourcesRead, err := newBroadcaster.ReadResources() + if err != nil { + return false + } + for resourceName, quantity := range resourcesRead { + toCheck := node2.Status.Allocatable[resourceName] + toCheck.Add(node1.Status.Allocatable[resourceName]) + toCheck.Set(toCheck.Value() * 50 / 100) + if quantity.Cmp(toCheck) != 0 { + return false + } + } + return true + }, timeout, interval).Should(BeTrue()) + By("Checking update resources") + toUpdate := node1.Status.Allocatable + for _, quantity := range toUpdate { + quantity.Sub(*resource.NewQuantity(1, quantity.Format)) + } + node1.Status.Allocatable = toUpdate + _, err = clientset.CoreV1().Nodes().UpdateStatus(ctx, node1, metav1.UpdateOptions{}) + Expect(err).To(BeNil()) + Eventually(func() bool { + resourcesRead, err := newBroadcaster.ReadResources() + if err != nil { + return false + } + for resourceName, quantity := range resourcesRead { + toCheck := node2.Status.Allocatable[resourceName] + toCheck.Add(node1.Status.Allocatable[resourceName]) + toCheck.Set(toCheck.Value() * 50 / 100) + if quantity.Cmp(toCheck) != 0 { + return false + } + } + return true + }, timeout, interval).Should(BeTrue()) + By("Checking Node Delete") + err = clientset.CoreV1().Nodes().Delete(ctx, node1.Name, metav1.DeleteOptions{}) + Expect(err).To(BeNil()) + Eventually(func() bool { + resourcesRead, err := newBroadcaster.ReadResources() + if err != nil { + return false + } + for resourceName, quantity := range resourcesRead { + toCheck := node2.Status.Allocatable[resourceName] + toCheck.Set(toCheck.Value() * 50 / 100) + if quantity.Cmp(toCheck) != 0 { + return false + } + } + return true + }, timeout, interval).Should(BeTrue()) + }) }) diff --git a/internal/resource-request-operator/suite_test.go b/internal/resource-request-operator/suite_test.go index 809a3c2c73..a3e6a4f76a 100644 --- a/internal/resource-request-operator/suite_test.go +++ b/internal/resource-request-operator/suite_test.go @@ -1,23 +1,26 @@ package resourcerequestoperator import ( + "context" "path/filepath" + "sync" "testing" + "time" "github.com/google/uuid" - ctrl "sigs.k8s.io/controller-runtime" - - sharingv1alpha1 "github.com/liqotech/liqo/apis/sharing/v1alpha1" - . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" "sigs.k8s.io/controller-runtime/pkg/envtest/printer" + configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1" discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1" + sharingv1alpha1 "github.com/liqotech/liqo/apis/sharing/v1alpha1" // +kubebuilder:scaffold:imports ) @@ -27,7 +30,12 @@ import ( var cfg *rest.Config var k8sClient client.Client var clusterId string +var clientset *kubernetes.Clientset var testEnv *envtest.Environment +var newBroadcaster Broadcaster +var ctx context.Context +var cancel context.CancelFunc +var group sync.WaitGroup func TestAPIs(t *testing.T) { RegisterFailHandler(Fail) @@ -39,6 +47,7 @@ func TestAPIs(t *testing.T) { var _ = BeforeSuite(func(done Done) { By("bootstrapping test environment") + ctx, cancel = context.WithCancel(context.Background()) testEnv = &envtest.Environment{ CRDDirectoryPaths: []string{filepath.Join("..", "..", "deployments", "liqo", "crds")}, } @@ -52,6 +61,8 @@ var _ = BeforeSuite(func(done Done) { Expect(err).NotTo(HaveOccurred()) err = sharingv1alpha1.AddToScheme(scheme.Scheme) Expect(err).NotTo(HaveOccurred()) + err = configv1alpha1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) // +kubebuilder:scaffold:scheme k8sManager, err := ctrl.NewManager(cfg, ctrl.Options{ @@ -59,12 +70,29 @@ var _ = BeforeSuite(func(done Done) { MetricsBindAddress: "0", // this avoids port binding collision }) Expect(err).ToNot(HaveOccurred()) + clientset, err = kubernetes.NewForConfig(k8sManager.GetConfig()) + Expect(err).ToNot(HaveOccurred()) + err = newBroadcaster.SetupBroadcaster(clientset, 10*time.Second) + Expect(err).ToNot(HaveOccurred()) + group.Add(1) + go newBroadcaster.StartBroadcaster(ctx, &group) + testClusterConf := &configv1alpha1.ClusterConfig{ + Spec: configv1alpha1.ClusterConfigSpec{ + AdvertisementConfig: configv1alpha1.AdvertisementConfig{ + OutgoingConfig: configv1alpha1.BroadcasterConfig{ + ResourceSharingPercentage: 50, + }, + }, + }, + } + newBroadcaster.setConfig(testClusterConf) id, _ := uuid.NewUUID() clusterId = id.String() err = (&ResourceRequestReconciler{ - Client: k8sManager.GetClient(), - Scheme: k8sManager.GetScheme(), - ClusterID: clusterId, + Client: k8sManager.GetClient(), + Scheme: k8sManager.GetScheme(), + ClusterID: clusterId, + NewBroadcaster: &newBroadcaster, }).SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) @@ -75,7 +103,6 @@ var _ = BeforeSuite(func(done Done) { k8sClient = k8sManager.GetClient() Expect(k8sClient).ToNot(BeNil()) - close(done) }, 60) @@ -83,4 +110,6 @@ var _ = AfterSuite(func() { By("tearing down the test environment") err := testEnv.Stop() Expect(err).ToNot(HaveOccurred()) + cancel() + group.Wait() }) diff --git a/internal/resource-request-operator/utils.go b/internal/resource-request-operator/utils.go index fcc3ff70a8..bd67bf9839 100644 --- a/internal/resource-request-operator/utils.go +++ b/internal/resource-request-operator/utils.go @@ -5,7 +5,6 @@ import ( "time" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -15,9 +14,9 @@ import ( "github.com/liqotech/liqo/pkg/discovery" ) -// this function generate an empty offer. +// this function generate a new local ResourceOffer. func (r *ResourceRequestReconciler) generateResourceOffer(request *discoveryv1alpha1.ResourceRequest) error { - err := r.computeResources() + resources, err := r.NewBroadcaster.ReadResources() if err != nil { return err } @@ -38,7 +37,7 @@ func (r *ResourceRequestReconciler) generateResourceOffer(request *discoveryv1al ClusterId: r.ClusterID, Images: []corev1.ContainerImage{}, ResourceQuota: corev1.ResourceQuotaSpec{ - Hard: resources.Offers, + Hard: resources, }, Timestamp: creationTime, TimeToLive: metav1.NewTime(creationTime.Add(timeToLive)), @@ -53,13 +52,3 @@ func (r *ResourceRequestReconciler) generateResourceOffer(request *discoveryv1al klog.Infof("%s -> %s Offer: %s", r.ClusterID, op, offer.ObjectMeta.Name) return nil } - -// this function returns all resource available that will be offered to remote cluster. -func (r *ResourceRequestReconciler) computeResources() error { - // placeholder for future logic - limits := corev1.ResourceList{} - limits[corev1.ResourceCPU] = *resource.NewQuantity(2, "2") - limits[corev1.ResourceMemory] = *resource.NewQuantity(1, "2m") - resources.Offers = limits - return nil -}