Skip to content

Commit

Permalink
Created new Broadcaser which start an informer to watch Nodes for Res…
Browse files Browse the repository at this point in the history
…ourceOffer generation/update

Add logic reacting after node add, update and delete

Add ReadResources() function to read safely the cluster resources

Added clusterconfig watcher to get scaling percentage.
Refactor of resource writing to manage all kind of resources.

Added new tests and fixed linting

Added Node Phase check.
Write Update and Delete Node tests.
  • Loading branch information
giuse2596 committed May 21, 2021
1 parent 8476758 commit f2c87c7
Show file tree
Hide file tree
Showing 6 changed files with 337 additions and 34 deletions.
20 changes: 14 additions & 6 deletions cmd/advertisement-operator/main.go
Expand Up @@ -163,10 +163,19 @@ func main() {
os.Exit(1)
}

newBroadcaster := &resourceRequestOperator.Broadcaster{}
componentStopper := make(chan struct{})

if err := newBroadcaster.SetupBroadcaster(clientset, config, componentStopper); err != nil {
klog.Error(err)
os.Exit(1)
}

resourceRequestReconciler := &resourceRequestOperator.ResourceRequestReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
ClusterID: clusterId,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
ClusterID: clusterId,
NewBroadcaster: newBroadcaster,
}

if err = resourceRequestReconciler.SetupWithManager(mgr); err != nil {
Expand All @@ -175,22 +184,21 @@ func main() {

// +kubebuilder:scaffold:builder

c := make(chan struct{})
var wg = &sync.WaitGroup{}
client, err := advertisementReconciler.InitCRDClient(localKubeconfig)
if err != nil {
os.Exit(1)
}
wg.Add(2)
go advertisementReconciler.CleanOldAdvertisements(c, wg)
go advertisementReconciler.CleanOldAdvertisements(componentStopper, wg)
go advertisementReconciler.WatchConfiguration(localKubeconfig, client, wg)

klog.Info("starting manager as advertisementoperator")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
klog.Error(err)
os.Exit(1)
}
close(c)
close(client.Stop)
close(componentStopper)
wg.Wait()
}
184 changes: 184 additions & 0 deletions internal/resource-request-operator/broadcaster.go
@@ -0,0 +1,184 @@
package resourcerequestoperator

import (
"fmt"
"sync"

crdclient "github.com/liqotech/liqo/pkg/crdClient"

"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"

configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1"
"github.com/liqotech/liqo/pkg/utils"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
)

// Broadcaster is an object which is used to get resources of the cluster.
type Broadcaster struct {
allocatable corev1.ResourceList
clusterConfig configv1alpha1.ClusterConfig
offerMutex sync.RWMutex
configMutex sync.RWMutex
}

// SetupBroadcaster create the informer e run it to signal node changes updating Offers.
func (b *Broadcaster) SetupBroadcaster(clientset *kubernetes.Clientset, config *rest.Config, stopper chan struct{}) error {
b.allocatable = corev1.ResourceList{}
factory := informers.NewSharedInformerFactory(clientset, 0)
informer := factory.Core().V1().Nodes().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: b.onAdd,
UpdateFunc: b.onUpdate,
DeleteFunc: b.onDelete,
})

config.ContentConfig.GroupVersion = &configv1alpha1.GroupVersion
config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
config.UserAgent = rest.DefaultKubernetesUserAgent()
CRDclient, err := crdclient.NewFromConfig(config)
if err != nil {
return err
}
go utils.WatchConfiguration(b.setConfig, CRDclient, "")
go informer.Run(stopper)
return nil
}

func (b *Broadcaster) setConfig(configuration *configv1alpha1.ClusterConfig) {
b.configMutex.Lock()
defer b.configMutex.Unlock()
b.clusterConfig = *configuration
}

// react to a Node Creation/First informer run.
func (b *Broadcaster) onAdd(obj interface{}) {
node := obj.(*corev1.Node)
if node.Status.Phase == corev1.NodeRunning {
toAdd := &node.Status.Allocatable
currentResources := b.allocatable.DeepCopy()
addResources(&currentResources, toAdd)

if err := b.writeResources(currentResources); err != nil {
klog.Errorf("OnAdd error: unable to write allocatable of Node %s: %s", node.Name, err)
}
}
}

// react to a Node Update.
func (b *Broadcaster) onUpdate(oldObj, newObj interface{}) {
oldNode := oldObj.(*corev1.Node)
newNode := newObj.(*corev1.Node)
oldNodeResources := oldNode.Status.Allocatable
newNodeResources := newNode.Status.Allocatable
currentResources := b.allocatable.DeepCopy()
if newNode.Status.Phase == corev1.NodeRunning {
// node was already Running, update with possible new resources.
if oldNode.Status.Phase == corev1.NodeRunning {
updateResources(&currentResources, &oldNodeResources, &newNodeResources)
// node is starting, add all its resources.
} else if oldNode.Status.Phase == corev1.NodePending || oldNode.Status.Phase == corev1.NodeTerminated {
addResources(&currentResources, &newNodeResources)
}
// node is terminating or stopping, delete all its resources.
} else if oldNode.Status.Phase == corev1.NodeRunning &&
(newNode.Status.Phase == corev1.NodeTerminated || newNode.Status.Phase == corev1.NodePending) {
subResources(&currentResources, &oldNodeResources)
}
if err := b.writeResources(currentResources); err != nil {
klog.Errorf("OnUpdate error: unable to write allocatable of Node %s: %s", newNode.Name, err)
}
}

// react to a Node Delete.
func (b *Broadcaster) onDelete(obj interface{}) {
node := obj.(*corev1.Node)
toDelete := &node.Status.Allocatable
currentResources := b.allocatable.DeepCopy()
if node.Status.Phase == corev1.NodeRunning {
subResources(&currentResources, toDelete)
if err := b.writeResources(currentResources); err != nil {
klog.Errorf("OnAdd error: unable to write allocatable of Node %s: %s", node.Name, err)
}
}
}

// write cluster resources in thread safe mode.
func (b *Broadcaster) writeResources(newResources corev1.ResourceList) error {
b.offerMutex.Lock()
defer b.offerMutex.Unlock()
if newResources != nil {
b.allocatable = newResources.DeepCopy()
return nil
}

return fmt.Errorf("some error occurred during cluster resources read. Attempting writing nil resources")
}

// ReadResources return in thread safe mode a scaled value of the resources.
func (b *Broadcaster) ReadResources() (corev1.ResourceList, error) {
b.offerMutex.RLock()
defer b.offerMutex.RUnlock()
if b.allocatable == nil {
return nil, fmt.Errorf("error getting cluster resources")
}
toRead := b.allocatable.DeepCopy()
for resourceName, quantity := range toRead {
scaled := quantity
b.scaleResources(&scaled)
toRead[resourceName] = scaled
}
return toRead, nil
}

func (b *Broadcaster) scaleResources(quantity *resource.Quantity) {
b.configMutex.RLock()
percentage := int64(b.clusterConfig.Spec.AdvertisementConfig.OutgoingConfig.ResourceSharingPercentage)
b.configMutex.RUnlock()
if percentage == 0 {
return
}

quantity.Set(quantity.Value() * percentage / 100)
}

// addResources is an utility function to add resources.
func addResources(currentResources, toAdd *corev1.ResourceList) {
for resourceName, quantity := range *toAdd {
value := (*currentResources)[resourceName]
if !quantity.IsZero() {
value.Add(quantity)
}
(*currentResources)[resourceName] = value
}
}

// subResources is an utility function to subtract resources.
func subResources(currentResources, toSub *corev1.ResourceList) {
for resourceName, quantity := range *toSub {
value := (*currentResources)[resourceName]
if !quantity.IsZero() {
value.Sub(quantity)
}
(*currentResources)[resourceName] = value
}
}

// updateResources is an utility function to update resources.
func updateResources(currentResources, oldResources, newResources *corev1.ResourceList) {
for resourceName, quantity := range *newResources {
oldQuantity := (*oldResources)[resourceName]
value := (*currentResources)[resourceName]
if quantityToUpdate := resource.NewQuantity(quantity.Value()-oldQuantity.Value(),
quantity.Format); !quantityToUpdate.IsZero() {
value.Add(*quantityToUpdate)
}
(*currentResources)[resourceName] = value
}
}
14 changes: 3 additions & 11 deletions internal/resource-request-operator/resourceRequest_controller.go
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -16,18 +15,11 @@ import (
// ResourceRequestReconciler reconciles a ResourceRequest object.
type ResourceRequestReconciler struct {
client.Client
Scheme *runtime.Scheme
ClusterID string
Scheme *runtime.Scheme
ClusterID string
NewBroadcaster *Broadcaster
}

// ResourceToOffer is a custom struct to encapsulate cluster's ResourceList.
type ResourceToOffer struct {
Offers corev1.ResourceList
}

// ResourceToOffer is a placeholder var with fake cluster resources.
var resources ResourceToOffer

const (
offerPrefix = "resourceoffer-"
timeToLive = 30 * time.Minute
Expand Down
115 changes: 114 additions & 1 deletion internal/resource-request-operator/resourceRequest_controller_test.go
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
. "github.com/onsi/gomega/gstruct"
Expand All @@ -27,8 +30,37 @@ var _ = Describe("ResourceRequest controller", func() {

Context("Testing ResourceRequest Controller when creating a new ResourceRequest", func() {
It("Should create new ResourceRequest and related ResourceOffer ", func() {
By("By creating a new ResourceRequest")
ctx := context.Background()
By("Creating mock nodes")
resources := corev1.ResourceList{}
resources[corev1.ResourceCPU] = *resource.NewQuantity(2, resource.DecimalSI)
resources[corev1.ResourceMemory] = *resource.NewQuantity(1, resource.BinarySI)
resources[corev1.ResourceLimitsCPU] = *resource.NewQuantity(3, resource.DecimalSI)
node1 := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node1",
},
Status: corev1.NodeStatus{
Capacity: resources,
Allocatable: resources,
Phase: corev1.NodeRunning,
},
}
_, err := clientset.CoreV1().Nodes().Create(ctx, node1, metav1.CreateOptions{})
Expect(err).To(BeNil())
node2 := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node2",
},
Status: corev1.NodeStatus{
Capacity: resources,
Allocatable: resources,
Phase: corev1.NodeRunning,
},
}
_, err = clientset.CoreV1().Nodes().Create(ctx, node2, metav1.CreateOptions{})
Expect(err).To(BeNil())
By("By creating a new ResourceRequest")
resourceRequest := &discoveryv1alpha1.ResourceRequest{
ObjectMeta: metav1.ObjectMeta{
Name: ResourceRequestName,
Expand Down Expand Up @@ -70,6 +102,87 @@ var _ = Describe("ResourceRequest controller", func() {
Expect(createdResourceOffer.GetOwnerReferences()).Should(ContainElement(MatchFields(IgnoreExtras, Fields{
"Name": Equal(createdResourceRequest.Name),
})))
By("Checking resources at creation")
offerResources := createdResourceOffer.Spec.ResourceQuota.Hard
for resourceName, quantity := range offerResources {
testValue := node1.Status.Allocatable[resourceName]
testValue.Add(node2.Status.Allocatable[resourceName])
testValue.Set(testValue.Value())
Expect(quantity.Cmp(testValue)).Should(BeZero())
}
By("Checking update node phase")
node1.Status.Phase = corev1.NodePending
_, err = clientset.CoreV1().Nodes().UpdateStatus(ctx, node1, metav1.UpdateOptions{})
Expect(err).To(BeNil())
Eventually(func() bool {
resourcesRead, err := newBroadcaster.ReadResources()
if err != nil {
return false
}
for resourceName, quantity := range resourcesRead {
toCheck := node2.Status.Allocatable[resourceName]
if quantity.Cmp(toCheck) != 0 {
return false
}
}
return true
}, timeout, interval).Should(BeTrue())
node1.Status.Phase = corev1.NodeRunning
_, err = clientset.CoreV1().Nodes().UpdateStatus(ctx, node1, metav1.UpdateOptions{})
Expect(err).To(BeNil())
Eventually(func() bool {
resourcesRead, err := newBroadcaster.ReadResources()
if err != nil {
return false
}
for resourceName, quantity := range resourcesRead {
toCheck := node2.Status.Allocatable[resourceName]
toCheck.Add(node1.Status.Allocatable[resourceName])
if quantity.Cmp(toCheck) != 0 {
return false
}
}
return true
}, timeout, interval).Should(BeTrue())
By("Checking update resources")
toUpdate := node1.Status.Allocatable
for _, quantity := range toUpdate {
quantity.Sub(*resource.NewQuantity(1, quantity.Format))
}
node1.Status.Allocatable = toUpdate
_, err = clientset.CoreV1().Nodes().UpdateStatus(ctx, node1, metav1.UpdateOptions{})
Expect(err).To(BeNil())
Eventually(func() bool {
resourcesRead, err := newBroadcaster.ReadResources()
if err != nil {
return false
}
for resourceName, quantity := range resourcesRead {
toCheck := node2.Status.Allocatable[resourceName]
toCheck.Add(node1.Status.Allocatable[resourceName])
if quantity.Cmp(toCheck) != 0 {
return false
}
}
return true
}, timeout, interval).Should(BeTrue())
By("Checking Node Delete")
err = clientset.CoreV1().Nodes().Delete(ctx, node1.Name, metav1.DeleteOptions{})
Expect(err).To(BeNil())
Eventually(func() bool {
resourcesRead, err := newBroadcaster.ReadResources()
if err != nil {
return false
}
for resourceName, quantity := range resourcesRead {
toCheck := node2.Status.Allocatable[resourceName]
if quantity.Cmp(toCheck) != 0 {
return false
}
}
return true
}, timeout, interval).Should(BeTrue())

})

})
Expand Down

0 comments on commit f2c87c7

Please sign in to comment.