diff --git a/cmd/advertisement-operator/main.go b/cmd/advertisement-operator/main.go index fb0510c7b0..5a7480abbd 100644 --- a/cmd/advertisement-operator/main.go +++ b/cmd/advertisement-operator/main.go @@ -163,10 +163,20 @@ func main() { os.Exit(1) } + newBroadcaster := &resourceRequestOperator.Broadcaster{} + + broadcasterStopper := make(chan struct{}) + + if err := newBroadcaster.SetupBroadcaster(clientset, config, broadcasterStopper); err != nil { + klog.Error(err) + os.Exit(1) + } + resourceRequestReconciler := &resourceRequestOperator.ResourceRequestReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - ClusterID: clusterId, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + ClusterID: clusterId, + NewBroadcaster: newBroadcaster, } if err = resourceRequestReconciler.SetupWithManager(mgr); err != nil { @@ -192,5 +202,6 @@ func main() { } close(c) close(client.Stop) + close(broadcasterStopper) wg.Wait() } diff --git a/internal/resource-request-operator/broadcaster.go b/internal/resource-request-operator/broadcaster.go new file mode 100644 index 0000000000..1041bf3b12 --- /dev/null +++ b/internal/resource-request-operator/broadcaster.go @@ -0,0 +1,158 @@ +package resourcerequestoperator + +import ( + "fmt" + "sync" + + crdclient "github.com/liqotech/liqo/pkg/crdClient" + + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + + configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1" + "github.com/liqotech/liqo/pkg/utils" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/klog" +) + +// Broadcaster is an object which is used to get resources of the cluster. +type Broadcaster struct { + allocatable corev1.ResourceList + clusterConfig configv1alpha1.ClusterConfig + offerMutex sync.RWMutex + configMutex sync.RWMutex +} + +// SetupBroadcaster create the informer e run it to signal node changes updating Offers. +func (b *Broadcaster) SetupBroadcaster(clientset *kubernetes.Clientset, config *rest.Config, stopper chan struct{}) error { + b.allocatable = corev1.ResourceList{} + factory := informers.NewSharedInformerFactory(clientset, 0) + informer := factory.Core().V1().Nodes().Informer() + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: b.onAdd, + UpdateFunc: b.onUpdate, + DeleteFunc: b.onDelete, + }) + + config.ContentConfig.GroupVersion = &configv1alpha1.GroupVersion + config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() + config.UserAgent = rest.DefaultKubernetesUserAgent() + CRDclient, err := crdclient.NewFromConfig(config) + if err != nil { + return err + } + go utils.WatchConfiguration(b.setConfig, CRDclient, "") + go informer.Run(stopper) + return nil +} + +func (b *Broadcaster) setConfig(configuration *configv1alpha1.ClusterConfig) { + b.configMutex.Lock() + defer b.configMutex.Unlock() + b.clusterConfig = *configuration +} + +// react to a Node Creation/First informer run. +func (b *Broadcaster) onAdd(obj interface{}) { + node := obj.(*corev1.Node) + toAdd := &node.Status.Allocatable + currentResources := b.allocatable.DeepCopy() + + for resourceName, quantity := range *toAdd { + value := currentResources[resourceName] + if !quantity.IsZero() { + value.Add(quantity) + } + currentResources[resourceName] = value + } + + if err := b.writeResources(currentResources); err != nil { + klog.Errorf("OnAdd error: unable to write allocatable of Node %s: %s", node.Name, err) + } +} + +// react to a Node Update. +func (b *Broadcaster) onUpdate(oldObj, newObj interface{}) { + oldNode := oldObj.(*corev1.Node) + newNode := newObj.(*corev1.Node) + oldNodeResources := oldNode.Status.Allocatable + newNodeResources := newNode.Status.Allocatable + currentResources := b.allocatable.DeepCopy() + + for resourceName, quantity := range newNodeResources { + oldQuantity := oldNodeResources[resourceName] + value := currentResources[resourceName] + if quantityToUpdate := resource.NewQuantity(quantity.Value()-oldQuantity.Value(), + quantity.Format); !quantityToUpdate.IsZero() { + value.Add(*quantityToUpdate) + } + currentResources[resourceName] = value + } + + if err := b.writeResources(currentResources); err != nil { + klog.Errorf("OnUpdate error: unable to write allocatable of Node %s: %s", newNode.Name, err) + } +} + +// react to a Node Delete. +func (b *Broadcaster) onDelete(obj interface{}) { + node := obj.(*corev1.Node) + toDelete := &node.Status.Allocatable + currentResources := b.allocatable.DeepCopy() + + for resourceName, quantity := range *toDelete { + value := currentResources[resourceName] + if !quantity.IsZero() { + value.Sub(quantity) + } + currentResources[resourceName] = value + } + + if err := b.writeResources(currentResources); err != nil { + klog.Errorf("OnAdd error: unable to write allocatable of Node %s: %s", node.Name, err) + } +} + +// write cluster resources in thread safe mode. +func (b *Broadcaster) writeResources(newResources corev1.ResourceList) error { + b.offerMutex.Lock() + defer b.offerMutex.Unlock() + if newResources != nil { + b.allocatable = newResources + return nil + } + + return fmt.Errorf("some error occurred during cluster resources read. Attempting writing nil resources") +} + +// ReadResources return in thread safe mode a scaled value of the resources. +func (b *Broadcaster) ReadResources() (corev1.ResourceList, error) { + b.offerMutex.RLock() + defer b.offerMutex.RUnlock() + if b.allocatable == nil { + return nil, fmt.Errorf("error getting cluster resources") + } + toRead := b.allocatable.DeepCopy() + for resourceName, quantity := range toRead { + scaled := quantity + b.scaleResources(&scaled) + toRead[resourceName] = scaled + } + return toRead, nil +} + +func (b *Broadcaster) scaleResources(quantity *resource.Quantity) { + b.configMutex.RLock() + percentage := int64(b.clusterConfig.Spec.AdvertisementConfig.OutgoingConfig.ResourceSharingPercentage) + b.configMutex.RUnlock() + if percentage == 0 { + return + } + + quantity.Set(quantity.Value() * percentage / 100) +} diff --git a/internal/resource-request-operator/resourceRequest_controller.go b/internal/resource-request-operator/resourceRequest_controller.go index 1cb48490de..1037bd9c29 100644 --- a/internal/resource-request-operator/resourceRequest_controller.go +++ b/internal/resource-request-operator/resourceRequest_controller.go @@ -4,7 +4,6 @@ import ( "context" "time" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -16,18 +15,11 @@ import ( // ResourceRequestReconciler reconciles a ResourceRequest object. type ResourceRequestReconciler struct { client.Client - Scheme *runtime.Scheme - ClusterID string + Scheme *runtime.Scheme + ClusterID string + NewBroadcaster *Broadcaster } -// ResourceToOffer is a custom struct to encapsulate cluster's ResourceList. -type ResourceToOffer struct { - Offers corev1.ResourceList -} - -// ResourceToOffer is a placeholder var with fake cluster resources. -var resources ResourceToOffer - const ( offerPrefix = "resourceoffer-" timeToLive = 30 * time.Minute diff --git a/internal/resource-request-operator/resourceRequest_controller_test.go b/internal/resource-request-operator/resourceRequest_controller_test.go index 47348b5634..3878e18264 100644 --- a/internal/resource-request-operator/resourceRequest_controller_test.go +++ b/internal/resource-request-operator/resourceRequest_controller_test.go @@ -4,6 +4,9 @@ import ( "context" "time" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" . "github.com/onsi/gomega/gstruct" @@ -27,8 +30,37 @@ var _ = Describe("ResourceRequest controller", func() { Context("Testing ResourceRequest Controller when creating a new ResourceRequest", func() { It("Should create new ResourceRequest and related ResourceOffer ", func() { - By("By creating a new ResourceRequest") ctx := context.Background() + By("Creating mock nodes") + resources := corev1.ResourceList{} + resources[corev1.ResourceCPU] = *resource.NewQuantity(2, resource.DecimalSI) + resources[corev1.ResourceMemory] = *resource.NewQuantity(1, resource.BinarySI) + resources[corev1.ResourceLimitsCPU] = *resource.NewQuantity(3, resource.DecimalSI) + node1 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node1", + Namespace: "default", + }, + Status: corev1.NodeStatus{ + Capacity: resources, + Allocatable: resources, + }, + } + _, err := clientset.CoreV1().Nodes().Create(ctx, node1, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + node2 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node2", + Namespace: "default", + }, + Status: corev1.NodeStatus{ + Capacity: resources, + Allocatable: resources, + }, + } + _, err = clientset.CoreV1().Nodes().Create(ctx, node2, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + By("By creating a new ResourceRequest") resourceRequest := &discoveryv1alpha1.ResourceRequest{ ObjectMeta: metav1.ObjectMeta{ Name: ResourceRequestName, @@ -70,6 +102,15 @@ var _ = Describe("ResourceRequest controller", func() { Expect(createdResourceOffer.GetOwnerReferences()).Should(ContainElement(MatchFields(IgnoreExtras, Fields{ "Name": Equal(createdResourceRequest.Name), }))) + By("Checking resources at creation") + offerResources := createdResourceOffer.Spec.ResourceQuota.Hard + for resourceName, quantity := range offerResources { + testValue := node1.Status.Allocatable[resourceName] + testValue.Add(node2.Status.Allocatable[resourceName]) + testValue.Set(testValue.Value()) + Expect(quantity.Cmp(testValue)).Should(BeZero()) + } + }) }) diff --git a/internal/resource-request-operator/suite_test.go b/internal/resource-request-operator/suite_test.go index 809a3c2c73..69b3f27e38 100644 --- a/internal/resource-request-operator/suite_test.go +++ b/internal/resource-request-operator/suite_test.go @@ -4,6 +4,10 @@ import ( "path/filepath" "testing" + configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1" + + "k8s.io/client-go/kubernetes" + "github.com/google/uuid" ctrl "sigs.k8s.io/controller-runtime" @@ -27,7 +31,9 @@ import ( var cfg *rest.Config var k8sClient client.Client var clusterId string +var clientset *kubernetes.Clientset var testEnv *envtest.Environment +var broadcasterStopper chan struct{} func TestAPIs(t *testing.T) { RegisterFailHandler(Fail) @@ -39,6 +45,7 @@ func TestAPIs(t *testing.T) { var _ = BeforeSuite(func(done Done) { By("bootstrapping test environment") + broadcasterStopper = make(chan struct{}) testEnv = &envtest.Environment{ CRDDirectoryPaths: []string{filepath.Join("..", "..", "deployments", "liqo", "crds")}, } @@ -52,6 +59,8 @@ var _ = BeforeSuite(func(done Done) { Expect(err).NotTo(HaveOccurred()) err = sharingv1alpha1.AddToScheme(scheme.Scheme) Expect(err).NotTo(HaveOccurred()) + err = configv1alpha1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) // +kubebuilder:scaffold:scheme k8sManager, err := ctrl.NewManager(cfg, ctrl.Options{ @@ -59,12 +68,19 @@ var _ = BeforeSuite(func(done Done) { MetricsBindAddress: "0", // this avoids port binding collision }) Expect(err).ToNot(HaveOccurred()) + clientset, err = kubernetes.NewForConfig(k8sManager.GetConfig()) + Expect(err).ToNot(HaveOccurred()) + + newBroadcaster := &Broadcaster{} + err = newBroadcaster.SetupBroadcaster(clientset, k8sManager.GetConfig(), broadcasterStopper) + Expect(err).ToNot(HaveOccurred()) id, _ := uuid.NewUUID() clusterId = id.String() err = (&ResourceRequestReconciler{ - Client: k8sManager.GetClient(), - Scheme: k8sManager.GetScheme(), - ClusterID: clusterId, + Client: k8sManager.GetClient(), + Scheme: k8sManager.GetScheme(), + ClusterID: clusterId, + NewBroadcaster: newBroadcaster, }).SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) @@ -83,4 +99,6 @@ var _ = AfterSuite(func() { By("tearing down the test environment") err := testEnv.Stop() Expect(err).ToNot(HaveOccurred()) + close(broadcasterStopper) + }) diff --git a/internal/resource-request-operator/utils.go b/internal/resource-request-operator/utils.go index fcc3ff70a8..0122e79d67 100644 --- a/internal/resource-request-operator/utils.go +++ b/internal/resource-request-operator/utils.go @@ -5,7 +5,6 @@ import ( "time" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -17,7 +16,7 @@ import ( // this function generate an empty offer. func (r *ResourceRequestReconciler) generateResourceOffer(request *discoveryv1alpha1.ResourceRequest) error { - err := r.computeResources() + resources, err := r.NewBroadcaster.ReadResources() if err != nil { return err } @@ -38,7 +37,7 @@ func (r *ResourceRequestReconciler) generateResourceOffer(request *discoveryv1al ClusterId: r.ClusterID, Images: []corev1.ContainerImage{}, ResourceQuota: corev1.ResourceQuotaSpec{ - Hard: resources.Offers, + Hard: resources, }, Timestamp: creationTime, TimeToLive: metav1.NewTime(creationTime.Add(timeToLive)), @@ -53,13 +52,3 @@ func (r *ResourceRequestReconciler) generateResourceOffer(request *discoveryv1al klog.Infof("%s -> %s Offer: %s", r.ClusterID, op, offer.ObjectMeta.Name) return nil } - -// this function returns all resource available that will be offered to remote cluster. -func (r *ResourceRequestReconciler) computeResources() error { - // placeholder for future logic - limits := corev1.ResourceList{} - limits[corev1.ResourceCPU] = *resource.NewQuantity(2, "2") - limits[corev1.ResourceMemory] = *resource.NewQuantity(1, "2m") - resources.Offers = limits - return nil -}