Skip to content

Commit

Permalink
Created new Broadcaser which start an informer to watch Nodes for Res…
Browse files Browse the repository at this point in the history
…ourceOffer generation/update

Add logic reacting after node add, update and delete

Add ReadResources() function to read safely the cluster resources

Added clusterconfig watcher to get scaling percentage.
Refactor of resource writing to manage all kind of resources.

Added new tests and fixed linting

Added Node Phase check.
Write Update and Delete Node tests.
  • Loading branch information
giuse2596 committed May 21, 2021
1 parent 8476758 commit 5a1e306
Show file tree
Hide file tree
Showing 6 changed files with 338 additions and 31 deletions.
17 changes: 14 additions & 3 deletions cmd/advertisement-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,20 @@ func main() {
os.Exit(1)
}

newBroadcaster := &resourceRequestOperator.Broadcaster{}

broadcasterStopper := make(chan struct{})

if err := newBroadcaster.SetupBroadcaster(clientset, config, broadcasterStopper); err != nil {
klog.Error(err)
os.Exit(1)
}

resourceRequestReconciler := &resourceRequestOperator.ResourceRequestReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
ClusterID: clusterId,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
ClusterID: clusterId,
NewBroadcaster: newBroadcaster,
}

if err = resourceRequestReconciler.SetupWithManager(mgr); err != nil {
Expand All @@ -192,5 +202,6 @@ func main() {
}
close(c)
close(client.Stop)
close(broadcasterStopper)
wg.Wait()
}
183 changes: 183 additions & 0 deletions internal/resource-request-operator/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package resourcerequestoperator

import (
"fmt"
"sync"

crdclient "github.com/liqotech/liqo/pkg/crdClient"

"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"

configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1"
"github.com/liqotech/liqo/pkg/utils"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
)

// Broadcaster is an object which is used to get resources of the cluster.
type Broadcaster struct {
allocatable corev1.ResourceList
clusterConfig configv1alpha1.ClusterConfig
offerMutex sync.RWMutex
configMutex sync.RWMutex
}

// SetupBroadcaster create the informer e run it to signal node changes updating Offers.
func (b *Broadcaster) SetupBroadcaster(clientset *kubernetes.Clientset, config *rest.Config, stopper chan struct{}) error {
b.allocatable = corev1.ResourceList{}
factory := informers.NewSharedInformerFactory(clientset, 0)
informer := factory.Core().V1().Nodes().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: b.onAdd,
UpdateFunc: b.onUpdate,
DeleteFunc: b.onDelete,
})

config.ContentConfig.GroupVersion = &configv1alpha1.GroupVersion
config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
config.UserAgent = rest.DefaultKubernetesUserAgent()
CRDclient, err := crdclient.NewFromConfig(config)
if err != nil {
return err
}
go utils.WatchConfiguration(b.setConfig, CRDclient, "")
go informer.Run(stopper)
return nil
}

func (b *Broadcaster) setConfig(configuration *configv1alpha1.ClusterConfig) {
b.configMutex.Lock()
defer b.configMutex.Unlock()
b.clusterConfig = *configuration
}

// react to a Node Creation/First informer run.
func (b *Broadcaster) onAdd(obj interface{}) {
node := obj.(*corev1.Node)
if node.Status.Phase == corev1.NodeRunning {
toAdd := &node.Status.Allocatable
currentResources := b.allocatable.DeepCopy()

for resourceName, quantity := range *toAdd {
value := currentResources[resourceName]
if !quantity.IsZero() {
value.Add(quantity)
}
currentResources[resourceName] = value
}

if err := b.writeResources(currentResources); err != nil {
klog.Errorf("OnAdd error: unable to write allocatable of Node %s: %s", node.Name, err)
}
}
}

// react to a Node Update.
func (b *Broadcaster) onUpdate(oldObj, newObj interface{}) {
oldNode := oldObj.(*corev1.Node)
newNode := newObj.(*corev1.Node)
oldNodeResources := oldNode.Status.Allocatable
newNodeResources := newNode.Status.Allocatable
currentResources := b.allocatable.DeepCopy()
if newNode.Status.Phase == corev1.NodeRunning {
// node was already Running, update with possible new resources.
if oldNode.Status.Phase == corev1.NodeRunning {
for resourceName, quantity := range newNodeResources {
oldQuantity := oldNodeResources[resourceName]
value := currentResources[resourceName]
if quantityToUpdate := resource.NewQuantity(quantity.Value()-oldQuantity.Value(),
quantity.Format); !quantityToUpdate.IsZero() {
value.Add(*quantityToUpdate)
}
currentResources[resourceName] = value
}
// node is starting, add all its resources.
} else if oldNode.Status.Phase == corev1.NodePending || oldNode.Status.Phase == corev1.NodeTerminated {
for resourceName, quantity := range newNodeResources {
value := currentResources[resourceName]
if !quantity.IsZero() {
value.Add(quantity)
}
currentResources[resourceName] = value
}
}
// node is terminating or stopping, delete all its resources.
} else if oldNode.Status.Phase == corev1.NodeRunning &&
(newNode.Status.Phase == corev1.NodeTerminated || newNode.Status.Phase == corev1.NodePending) {
for resourceName, quantity := range oldNodeResources {
value := currentResources[resourceName]
if !quantity.IsZero() {
value.Sub(quantity)
}
currentResources[resourceName] = value
}
}
if err := b.writeResources(currentResources); err != nil {
klog.Errorf("OnUpdate error: unable to write allocatable of Node %s: %s", newNode.Name, err)
}
}

// react to a Node Delete.
func (b *Broadcaster) onDelete(obj interface{}) {
node := obj.(*corev1.Node)
toDelete := &node.Status.Allocatable
currentResources := b.allocatable.DeepCopy()
if node.Status.Phase == corev1.NodeRunning {
for resourceName, quantity := range *toDelete {
value := currentResources[resourceName]
if !quantity.IsZero() {
value.Sub(quantity)
}
currentResources[resourceName] = value
}

if err := b.writeResources(currentResources); err != nil {
klog.Errorf("OnAdd error: unable to write allocatable of Node %s: %s", node.Name, err)
}
}
}

// write cluster resources in thread safe mode.
func (b *Broadcaster) writeResources(newResources corev1.ResourceList) error {
b.offerMutex.Lock()
defer b.offerMutex.Unlock()
if newResources != nil {
b.allocatable = newResources
return nil
}

return fmt.Errorf("some error occurred during cluster resources read. Attempting writing nil resources")
}

// ReadResources return in thread safe mode a scaled value of the resources.
func (b *Broadcaster) ReadResources() (corev1.ResourceList, error) {
b.offerMutex.RLock()
defer b.offerMutex.RUnlock()
if b.allocatable == nil {
return nil, fmt.Errorf("error getting cluster resources")
}
toRead := b.allocatable.DeepCopy()
for resourceName, quantity := range toRead {
scaled := quantity
b.scaleResources(&scaled)
toRead[resourceName] = scaled
}
return toRead, nil
}

func (b *Broadcaster) scaleResources(quantity *resource.Quantity) {
b.configMutex.RLock()
percentage := int64(b.clusterConfig.Spec.AdvertisementConfig.OutgoingConfig.ResourceSharingPercentage)
b.configMutex.RUnlock()
if percentage == 0 {
return
}

quantity.Set(quantity.Value() * percentage / 100)
}
14 changes: 3 additions & 11 deletions internal/resource-request-operator/resourceRequest_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -16,18 +15,11 @@ import (
// ResourceRequestReconciler reconciles a ResourceRequest object.
type ResourceRequestReconciler struct {
client.Client
Scheme *runtime.Scheme
ClusterID string
Scheme *runtime.Scheme
ClusterID string
NewBroadcaster *Broadcaster
}

// ResourceToOffer is a custom struct to encapsulate cluster's ResourceList.
type ResourceToOffer struct {
Offers corev1.ResourceList
}

// ResourceToOffer is a placeholder var with fake cluster resources.
var resources ResourceToOffer

const (
offerPrefix = "resourceoffer-"
timeToLive = 30 * time.Minute
Expand Down
117 changes: 116 additions & 1 deletion internal/resource-request-operator/resourceRequest_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
. "github.com/onsi/gomega/gstruct"
Expand All @@ -27,8 +30,39 @@ var _ = Describe("ResourceRequest controller", func() {

Context("Testing ResourceRequest Controller when creating a new ResourceRequest", func() {
It("Should create new ResourceRequest and related ResourceOffer ", func() {
By("By creating a new ResourceRequest")
ctx := context.Background()
By("Creating mock nodes")
resources := corev1.ResourceList{}
resources[corev1.ResourceCPU] = *resource.NewQuantity(2, resource.DecimalSI)
resources[corev1.ResourceMemory] = *resource.NewQuantity(1, resource.BinarySI)
resources[corev1.ResourceLimitsCPU] = *resource.NewQuantity(3, resource.DecimalSI)
node1 := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node1",
Namespace: "default",
},
Status: corev1.NodeStatus{
Capacity: resources,
Allocatable: resources,
Phase: corev1.NodeRunning,
},
}
_, err := clientset.CoreV1().Nodes().Create(ctx, node1, metav1.CreateOptions{})
Expect(err).To(BeNil())
node2 := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node2",
Namespace: "default",
},
Status: corev1.NodeStatus{
Capacity: resources,
Allocatable: resources,
Phase: corev1.NodeRunning,
},
}
_, err = clientset.CoreV1().Nodes().Create(ctx, node2, metav1.CreateOptions{})
Expect(err).To(BeNil())
By("By creating a new ResourceRequest")
resourceRequest := &discoveryv1alpha1.ResourceRequest{
ObjectMeta: metav1.ObjectMeta{
Name: ResourceRequestName,
Expand Down Expand Up @@ -70,6 +104,87 @@ var _ = Describe("ResourceRequest controller", func() {
Expect(createdResourceOffer.GetOwnerReferences()).Should(ContainElement(MatchFields(IgnoreExtras, Fields{
"Name": Equal(createdResourceRequest.Name),
})))
By("Checking resources at creation")
offerResources := createdResourceOffer.Spec.ResourceQuota.Hard
for resourceName, quantity := range offerResources {
testValue := node1.Status.Allocatable[resourceName]
testValue.Add(node2.Status.Allocatable[resourceName])
testValue.Set(testValue.Value())
Expect(quantity.Cmp(testValue)).Should(BeZero())
}
By("Checking update node phase")
node1.Status.Phase = corev1.NodePending
_, err = clientset.CoreV1().Nodes().UpdateStatus(ctx, node1, metav1.UpdateOptions{})
Expect(err).To(BeNil())
Eventually(func() bool {
resourcesRead, err := newBroadcaster.ReadResources()
if err != nil {
return false
}
for resourceName, quantity := range resourcesRead {
toCheck := node2.Status.Allocatable[resourceName]
if quantity.Cmp(toCheck) != 0 {
return false
}
}
return true
}, timeout, interval).Should(BeTrue())
node1.Status.Phase = corev1.NodeRunning
_, err = clientset.CoreV1().Nodes().UpdateStatus(ctx, node1, metav1.UpdateOptions{})
Expect(err).To(BeNil())
Eventually(func() bool {
resourcesRead, err := newBroadcaster.ReadResources()
if err != nil {
return false
}
for resourceName, quantity := range resourcesRead {
toCheck := node2.Status.Allocatable[resourceName]
toCheck.Add(node1.Status.Allocatable[resourceName])
if quantity.Cmp(toCheck) != 0 {
return false
}
}
return true
}, timeout, interval).Should(BeTrue())
By("Checking update resources")
toUpdate := node1.Status.Allocatable
for _, quantity := range toUpdate {
quantity.Sub(*resource.NewQuantity(1, quantity.Format))
}
node1.Status.Allocatable = toUpdate
_, err = clientset.CoreV1().Nodes().UpdateStatus(ctx, node1, metav1.UpdateOptions{})
Expect(err).To(BeNil())
Eventually(func() bool {
resourcesRead, err := newBroadcaster.ReadResources()
if err != nil {
return false
}
for resourceName, quantity := range resourcesRead {
toCheck := node2.Status.Allocatable[resourceName]
toCheck.Add(node1.Status.Allocatable[resourceName])
if quantity.Cmp(toCheck) != 0 {
return false
}
}
return true
}, timeout, interval).Should(BeTrue())
By("Checking Node Delete")
err = clientset.CoreV1().Nodes().Delete(ctx, node1.Name, metav1.DeleteOptions{})
Expect(err).To(BeNil())
Eventually(func() bool {
resourcesRead, err := newBroadcaster.ReadResources()
if err != nil {
return false
}
for resourceName, quantity := range resourcesRead {
toCheck := node2.Status.Allocatable[resourceName]
if quantity.Cmp(toCheck) != 0 {
return false
}
}
return true
}, timeout, interval).Should(BeTrue())

})

})
Expand Down
Loading

0 comments on commit 5a1e306

Please sign in to comment.