Skip to content

Commit

Permalink
Created new Broadcaser which start an informer to watch Nodes for Res…
Browse files Browse the repository at this point in the history
…ourceOffer generation/update

Add logic reacting after node add, update and delete

Add ReadResources() function to read safely the cluster resources

Added clusterconfig watcher to get scaling percentage.
Refactor of resource writing to manage all kind of resources.

Added new tests and fixed linting
  • Loading branch information
giuse2596 committed May 20, 2021
1 parent cc05ac0 commit f7e4fba
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 31 deletions.
17 changes: 14 additions & 3 deletions cmd/advertisement-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,20 @@ func main() {
os.Exit(1)
}

newBroadcaster := &resourceRequestOperator.Broadcaster{}

broadcasterStopper := make(chan struct{})

if err := newBroadcaster.SetupBroadcaster(clientset, config, broadcasterStopper); err != nil {
klog.Error(err)
os.Exit(1)
}

resourceRequestReconciler := &resourceRequestOperator.ResourceRequestReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
ClusterID: clusterId,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
ClusterID: clusterId,
NewBroadcaster: newBroadcaster,
}

if err = resourceRequestReconciler.SetupWithManager(mgr); err != nil {
Expand All @@ -192,5 +202,6 @@ func main() {
}
close(c)
close(client.Stop)
close(broadcasterStopper)
wg.Wait()
}
158 changes: 158 additions & 0 deletions internal/resource-request-operator/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package resourcerequestoperator

import (
"fmt"
"sync"

crdclient "github.com/liqotech/liqo/pkg/crdClient"

"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"

configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1"
"github.com/liqotech/liqo/pkg/utils"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
)

// Broadcaster is an object which is used to get resources of the cluster.
type Broadcaster struct {
allocatable corev1.ResourceList
clusterConfig configv1alpha1.ClusterConfig
offerMutex sync.RWMutex
configMutex sync.RWMutex
}

// SetupBroadcaster create the informer e run it to signal node changes updating Offers.
func (b *Broadcaster) SetupBroadcaster(clientset *kubernetes.Clientset, config *rest.Config, stopper chan struct{}) error {
b.allocatable = corev1.ResourceList{}
factory := informers.NewSharedInformerFactory(clientset, 0)
informer := factory.Core().V1().Nodes().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: b.onAdd,
UpdateFunc: b.onUpdate,
DeleteFunc: b.onDelete,
})

config.ContentConfig.GroupVersion = &configv1alpha1.GroupVersion
config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
config.UserAgent = rest.DefaultKubernetesUserAgent()
CRDclient, err := crdclient.NewFromConfig(config)
if err != nil {
return err
}
go utils.WatchConfiguration(b.setConfig, CRDclient, "")
go informer.Run(stopper)
return nil
}

func (b *Broadcaster) setConfig(configuration *configv1alpha1.ClusterConfig) {
b.configMutex.Lock()
defer b.configMutex.Unlock()
b.clusterConfig = *configuration
}

// react to a Node Creation/First informer run.
func (b *Broadcaster) onAdd(obj interface{}) {
node := obj.(*corev1.Node)
toAdd := &node.Status.Allocatable
currentResources := b.allocatable.DeepCopy()

for resourceName, quantity := range *toAdd {
value := currentResources[resourceName]
if !quantity.IsZero() {
value.Add(quantity)
}
currentResources[resourceName] = value
}

if err := b.writeResources(currentResources); err != nil {
klog.Errorf("OnAdd error: unable to write allocatable of Node %s: %s", node.Name, err)
}
}

// react to a Node Update.
func (b *Broadcaster) onUpdate(oldObj, newObj interface{}) {
oldNode := oldObj.(*corev1.Node)
newNode := newObj.(*corev1.Node)
oldNodeResources := oldNode.Status.Allocatable
newNodeResources := newNode.Status.Allocatable
currentResources := b.allocatable.DeepCopy()

for resourceName, quantity := range newNodeResources {
oldQuantity := oldNodeResources[resourceName]
value := currentResources[resourceName]
if quantityToUpdate := resource.NewQuantity(quantity.Value()-oldQuantity.Value(),
quantity.Format); !quantityToUpdate.IsZero() {
value.Add(*quantityToUpdate)
}
currentResources[resourceName] = value
}

if err := b.writeResources(currentResources); err != nil {
klog.Errorf("OnUpdate error: unable to write allocatable of Node %s: %s", newNode.Name, err)
}
}

// react to a Node Delete.
func (b *Broadcaster) onDelete(obj interface{}) {
node := obj.(*corev1.Node)
toDelete := &node.Status.Allocatable
currentResources := b.allocatable.DeepCopy()

for resourceName, quantity := range *toDelete {
value := currentResources[resourceName]
if !quantity.IsZero() {
value.Sub(quantity)
}
currentResources[resourceName] = value
}

if err := b.writeResources(currentResources); err != nil {
klog.Errorf("OnAdd error: unable to write allocatable of Node %s: %s", node.Name, err)
}
}

// write cluster resources in thread safe mode.
func (b *Broadcaster) writeResources(newResources corev1.ResourceList) error {
b.offerMutex.Lock()
defer b.offerMutex.Unlock()
if newResources != nil {
b.allocatable = newResources
return nil
}

return fmt.Errorf("some error occurred during cluster resources read. Attempting writing nil resources")
}

// ReadResources return in thread safe mode a scaled value of the resources.
func (b *Broadcaster) ReadResources() (corev1.ResourceList, error) {
b.offerMutex.RLock()
defer b.offerMutex.RUnlock()
if b.allocatable == nil {
return nil, fmt.Errorf("error getting cluster resources")
}
toRead := b.allocatable.DeepCopy()
for resourceName, quantity := range toRead {
scaled := quantity
b.scaleResources(&scaled)
toRead[resourceName] = scaled
}
return toRead, nil
}

func (b *Broadcaster) scaleResources(quantity *resource.Quantity) {
b.configMutex.RLock()
percentage := int64(b.clusterConfig.Spec.AdvertisementConfig.OutgoingConfig.ResourceSharingPercentage)
b.configMutex.RUnlock()
if percentage == 0 {
return
}

quantity.Set(quantity.Value() * percentage / 100)
}
14 changes: 3 additions & 11 deletions internal/resource-request-operator/resourceRequest_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -16,18 +15,11 @@ import (
// ResourceRequestReconciler reconciles a ResourceRequest object.
type ResourceRequestReconciler struct {
client.Client
Scheme *runtime.Scheme
ClusterID string
Scheme *runtime.Scheme
ClusterID string
NewBroadcaster *Broadcaster
}

// ResourceToOffer is a custom struct to encapsulate cluster's ResourceList.
type ResourceToOffer struct {
Offers corev1.ResourceList
}

// ResourceToOffer is a placeholder var with fake cluster resources.
var resources ResourceToOffer

const (
offerPrefix = "resourceoffer-"
timeToLive = 30 * time.Minute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
. "github.com/onsi/gomega/gstruct"
Expand All @@ -27,8 +30,37 @@ var _ = Describe("ResourceRequest controller", func() {

Context("Testing ResourceRequest Controller when creating a new ResourceRequest", func() {
It("Should create new ResourceRequest and related ResourceOffer ", func() {
By("By creating a new ResourceRequest")
ctx := context.Background()
By("Creating mock nodes")
resources := corev1.ResourceList{}
resources[corev1.ResourceCPU] = *resource.NewQuantity(2, resource.DecimalSI)
resources[corev1.ResourceMemory] = *resource.NewQuantity(1, resource.BinarySI)
resources[corev1.ResourceLimitsCPU] = *resource.NewQuantity(3, resource.DecimalSI)
node1 := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node1",
Namespace: "default",
},
Status: corev1.NodeStatus{
Capacity: resources,
Allocatable: resources,
},
}
_, err := clientset.CoreV1().Nodes().Create(ctx, node1, metav1.CreateOptions{})
Expect(err).To(BeNil())
node2 := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node2",
Namespace: "default",
},
Status: corev1.NodeStatus{
Capacity: resources,
Allocatable: resources,
},
}
_, err = clientset.CoreV1().Nodes().Create(ctx, node2, metav1.CreateOptions{})
Expect(err).To(BeNil())
By("By creating a new ResourceRequest")
resourceRequest := &discoveryv1alpha1.ResourceRequest{
ObjectMeta: metav1.ObjectMeta{
Name: ResourceRequestName,
Expand Down Expand Up @@ -70,6 +102,15 @@ var _ = Describe("ResourceRequest controller", func() {
Expect(createdResourceOffer.GetOwnerReferences()).Should(ContainElement(MatchFields(IgnoreExtras, Fields{
"Name": Equal(createdResourceRequest.Name),
})))
By("Checking resources at creation")
offerResources := createdResourceOffer.Spec.ResourceQuota.Hard
for resourceName, quantity := range offerResources {
testValue := node1.Status.Allocatable[resourceName]
testValue.Add(node2.Status.Allocatable[resourceName])
testValue.Set(testValue.Value())
Expect(quantity.Cmp(testValue)).Should(BeZero())
}

})

})
Expand Down
24 changes: 21 additions & 3 deletions internal/resource-request-operator/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"path/filepath"
"testing"

configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1"

"k8s.io/client-go/kubernetes"

"github.com/google/uuid"
ctrl "sigs.k8s.io/controller-runtime"

Expand All @@ -27,7 +31,9 @@ import (
var cfg *rest.Config
var k8sClient client.Client
var clusterId string
var clientset *kubernetes.Clientset
var testEnv *envtest.Environment
var broadcasterStopper chan struct{}

func TestAPIs(t *testing.T) {
RegisterFailHandler(Fail)
Expand All @@ -39,6 +45,7 @@ func TestAPIs(t *testing.T) {

var _ = BeforeSuite(func(done Done) {
By("bootstrapping test environment")
broadcasterStopper = make(chan struct{})
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "deployments", "liqo", "crds")},
}
Expand All @@ -52,19 +59,28 @@ var _ = BeforeSuite(func(done Done) {
Expect(err).NotTo(HaveOccurred())
err = sharingv1alpha1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
err = configv1alpha1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
// +kubebuilder:scaffold:scheme

k8sManager, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme.Scheme,
MetricsBindAddress: "0", // this avoids port binding collision
})
Expect(err).ToNot(HaveOccurred())
clientset, err = kubernetes.NewForConfig(k8sManager.GetConfig())
Expect(err).ToNot(HaveOccurred())

newBroadcaster := &Broadcaster{}
err = newBroadcaster.SetupBroadcaster(clientset, k8sManager.GetConfig(), broadcasterStopper)
Expect(err).ToNot(HaveOccurred())
id, _ := uuid.NewUUID()
clusterId = id.String()
err = (&ResourceRequestReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
ClusterID: clusterId,
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
ClusterID: clusterId,
NewBroadcaster: newBroadcaster,
}).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())

Expand All @@ -83,4 +99,6 @@ var _ = AfterSuite(func() {
By("tearing down the test environment")
err := testEnv.Stop()
Expect(err).ToNot(HaveOccurred())
close(broadcasterStopper)

})
15 changes: 2 additions & 13 deletions internal/resource-request-operator/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand All @@ -17,7 +16,7 @@ import (

// this function generate an empty offer.
func (r *ResourceRequestReconciler) generateResourceOffer(request *discoveryv1alpha1.ResourceRequest) error {
err := r.computeResources()
resources, err := r.NewBroadcaster.ReadResources()
if err != nil {
return err
}
Expand All @@ -38,7 +37,7 @@ func (r *ResourceRequestReconciler) generateResourceOffer(request *discoveryv1al
ClusterId: r.ClusterID,
Images: []corev1.ContainerImage{},
ResourceQuota: corev1.ResourceQuotaSpec{
Hard: resources.Offers,
Hard: resources,
},
Timestamp: creationTime,
TimeToLive: metav1.NewTime(creationTime.Add(timeToLive)),
Expand All @@ -53,13 +52,3 @@ func (r *ResourceRequestReconciler) generateResourceOffer(request *discoveryv1al
klog.Infof("%s -> %s Offer: %s", r.ClusterID, op, offer.ObjectMeta.Name)
return nil
}

// this function returns all resource available that will be offered to remote cluster.
func (r *ResourceRequestReconciler) computeResources() error {
// placeholder for future logic
limits := corev1.ResourceList{}
limits[corev1.ResourceCPU] = *resource.NewQuantity(2, "2")
limits[corev1.ResourceMemory] = *resource.NewQuantity(1, "2m")
resources.Offers = limits
return nil
}

0 comments on commit f7e4fba

Please sign in to comment.