Skip to content

Commit

Permalink
Added new pod informer to Broadcater
Browse files Browse the repository at this point in the history
Fixed test and StartBroadcaster to work also with pods.

Implemented all handlefunctions of pod informer and set filter to select just offloaded pods.

Test file reordering.
Pod resource extracting.

Fix linting.
Add thread-safe functions to read pod and node resource structures.

Moved and resource Creation in test from suite_test.go to resourceRequest_operator_test.go with BeforeEach() and afterEach()
  • Loading branch information
giuse2596 committed Jun 11, 2021
1 parent 384ad59 commit 5b7eb47
Show file tree
Hide file tree
Showing 6 changed files with 584 additions and 352 deletions.
4 changes: 2 additions & 2 deletions cmd/advertisement-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,14 @@ func main() {
if err != nil {
os.Exit(1)
}
wg.Add(5)
wg.Add(4)
ctx, cancel := context.WithCancel(context.Background())
go advertisementReconciler.CleanOldAdvertisements(ctx.Done(), wg)
// TODO: this configuration watcher will be refactored before the release 0.3
go advertisementReconciler.WatchConfiguration(localKubeconfig, client, wg)
go newBroadcaster.WatchConfiguration(localKubeconfig, client, wg)
go newBroadcaster.StartBroadcaster(ctx, wg)
go resourceOfferReconciler.WatchConfiguration(localKubeconfig, client, wg)
newBroadcaster.StartBroadcaster(ctx, wg)

klog.Info("starting manager as advertisementoperator")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
Expand Down
198 changes: 162 additions & 36 deletions internal/resource-request-operator/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,46 +6,76 @@ import (
"sync"
"time"

crdclient "github.com/liqotech/liqo/pkg/crdClient"

configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1"
"github.com/liqotech/liqo/pkg/utils"
"k8s.io/apimachinery/pkg/labels"
resourcehelper "k8s.io/kubectl/pkg/util/resource"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"

configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1"
crdclient "github.com/liqotech/liqo/pkg/crdClient"
"github.com/liqotech/liqo/pkg/utils"
"github.com/liqotech/liqo/pkg/virtualKubelet/forge"
)

// Broadcaster is an object which is used to get resources of the cluster.
type Broadcaster struct {
allocatable corev1.ResourceList
clusterConfig configv1alpha1.ClusterConfig
offerMutex sync.RWMutex
configMutex sync.RWMutex
informer cache.SharedInformer
allocatable corev1.ResourceList
resourcePodMap map[string]corev1.ResourceList
clusterConfig configv1alpha1.ClusterConfig
nodeMutex sync.RWMutex
podMutex sync.RWMutex
configMutex sync.RWMutex
nodeInformer cache.SharedIndexInformer
podInformer cache.SharedIndexInformer
}

// SetupBroadcaster create the informer e run it to signal node changes updating Offers.
func (b *Broadcaster) SetupBroadcaster(clientset *kubernetes.Clientset, resyncPeriod time.Duration) error {
func (b *Broadcaster) SetupBroadcaster(clientset kubernetes.Interface, resyncPeriod time.Duration) error {
b.allocatable = corev1.ResourceList{}
b.resourcePodMap = map[string]corev1.ResourceList{}
factory := informers.NewSharedInformerFactory(clientset, resyncPeriod)
b.informer = factory.Core().V1().Nodes().Informer()
b.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: b.onAdd,
UpdateFunc: b.onUpdate,
DeleteFunc: b.onDelete,
b.nodeInformer = factory.Core().V1().Nodes().Informer()
if b.nodeInformer == nil {
return fmt.Errorf("SetupBroadcaster -> Error creating NodeInformer")
}
b.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: b.onNodeAdd,
UpdateFunc: b.onNodeUpdate,
DeleteFunc: b.onNodeDelete,
})

factoryWithOptions := informers.NewSharedInformerFactoryWithOptions(clientset, resyncPeriod, informers.WithTweakListOptions(setLabelSelectors))
b.podInformer = factoryWithOptions.Core().V1().Pods().Informer()
b.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: b.onPodAdd,
UpdateFunc: b.onPodUpdate,
DeleteFunc: b.onPodDelete,
})

return nil
}

// StartBroadcaster starts a new sharedInformer to watch nodes resources.
// StartBroadcaster starts 2 shared Informers, 1 for Nodes and 1 for pods launching 2 separated goroutines.
func (b *Broadcaster) StartBroadcaster(ctx context.Context, group *sync.WaitGroup) {
group.Add(2)
go b.startNodeInformer(ctx, group)
go b.startPodInformer(ctx, group)
}

func (b *Broadcaster) startNodeInformer(ctx context.Context, group *sync.WaitGroup) {
defer group.Done()
b.informer.Run(ctx.Done())
b.nodeInformer.Run(ctx.Done())
}

func (b *Broadcaster) startPodInformer(ctx context.Context, group *sync.WaitGroup) {
defer group.Done()
b.podInformer.Run(ctx.Done())
}

// WatchConfiguration starts a new watcher to get clusterConfig.
Expand All @@ -68,26 +98,26 @@ func (b *Broadcaster) getConfig() *configv1alpha1.ClusterConfig {
}

// react to a Node Creation/First informer run.
func (b *Broadcaster) onAdd(obj interface{}) {
func (b *Broadcaster) onNodeAdd(obj interface{}) {
node := obj.(*corev1.Node)
if node.Status.Phase == corev1.NodeRunning {
toAdd := &node.Status.Allocatable
currentResources := b.allocatable.DeepCopy()
currentResources := b.readNodeResources()
addResources(currentResources, *toAdd)

if err := b.writeResources(currentResources); err != nil {
if err := b.writeResources("", currentResources); err != nil {
klog.Errorf("OnAdd error: unable to write allocatable of Node %s: %s", node.Name, err)
}
}
}

// react to a Node Update.
func (b *Broadcaster) onUpdate(oldObj, newObj interface{}) {
func (b *Broadcaster) onNodeUpdate(oldObj, newObj interface{}) {
oldNode := oldObj.(*corev1.Node)
newNode := newObj.(*corev1.Node)
oldNodeResources := oldNode.Status.Allocatable
newNodeResources := newNode.Status.Allocatable
currentResources := b.allocatable.DeepCopy()
currentResources := b.readNodeResources()
if newNode.Status.Phase == corev1.NodeRunning {
// node was already Running, update with possible new resources.
if oldNode.Status.Phase == corev1.NodeRunning {
Expand All @@ -101,44 +131,104 @@ func (b *Broadcaster) onUpdate(oldObj, newObj interface{}) {
(newNode.Status.Phase == corev1.NodeTerminated || newNode.Status.Phase == corev1.NodePending) {
subResources(currentResources, oldNodeResources)
}
if err := b.writeResources(currentResources); err != nil {
if err := b.writeResources("", currentResources); err != nil {
klog.Errorf("OnUpdate error: unable to write allocatable of Node %s: %s", newNode.Name, err)
}
}

// react to a Node Delete.
func (b *Broadcaster) onDelete(obj interface{}) {
func (b *Broadcaster) onNodeDelete(obj interface{}) {
node := obj.(*corev1.Node)
toDelete := &node.Status.Allocatable
currentResources := b.allocatable.DeepCopy()
currentResources := b.readNodeResources()
if node.Status.Phase == corev1.NodeRunning {
subResources(currentResources, *toDelete)
if err := b.writeResources(currentResources); err != nil {
if err := b.writeResources("", currentResources); err != nil {
klog.Errorf("OnAdd error: unable to write allocatable of Node %s: %s", node.Name, err)
}
}
}

func (b *Broadcaster) onPodAdd(obj interface{}) {
pod := obj.(*corev1.Pod)
fmt.Printf("Entrato per pod %s\n", pod.Name)
if clusterID := pod.Labels[forge.LiqoOriginClusterID]; clusterID != "" {
fmt.Printf("Entrato per pod %s cluster id control\n", pod.Name)
toAdd, err := extractPodResources(pod)
currentResources := b.readPodResources(clusterID)
addResources(currentResources, toAdd)
if err != nil {
klog.Errorf("OnAddPod error: unable to extract resources from Pod %s: %s", pod.Name, err)
}
if err = b.writeResources(clusterID, currentResources); err != nil {
klog.Errorf("OnAddPod error: unable to write resources of Pod %s: %s", pod.Name, err)
}
}
}

func (b *Broadcaster) onPodUpdate(oldObj, newObj interface{}) {
oldPod := oldObj.(*corev1.Pod)
newPod := newObj.(*corev1.Pod)

if clusterID := newPod.Labels[forge.LiqoOriginClusterID]; clusterID != "" {
oldResources, err := extractPodResources(oldPod)
if err != nil {
klog.Errorf("OnUpdatePod error: unable to extract resources from old Pod %s: %s", oldPod.Name, err)
}
newResources, err := extractPodResources(newPod)
if err != nil {
klog.Errorf("OnUpdatePod error: unable to extract resources from new Pod %s: %s", newPod.Name, err)
}
currentResources := b.readPodResources(clusterID)
updateResources(currentResources, oldResources, newResources)

if err = b.writeResources(clusterID, currentResources); err != nil {
klog.Errorf("OnUpdatePod error: unable to write resources of Pod %s: %s", newPod.Name, err)
}
}
}

func (b *Broadcaster) onPodDelete(obj interface{}) {
pod := obj.(*corev1.Pod)
if clusterID := pod.Labels[forge.LiqoOriginClusterID]; clusterID != "" {
toAdd, err := extractPodResources(pod)
currentResources := b.readPodResources(clusterID)
subResources(currentResources, toAdd)
if err != nil {
klog.Errorf("OnDeletePod error: unable to extract resources from Pod %s: %s", pod.Name, err)
}
if err = b.writeResources(clusterID, currentResources); err != nil {
klog.Errorf("OnDeletePod error: unable to write resources of Pod %s: %s", pod.Name, err)
}
}
}

// write cluster resources in thread safe mode.
func (b *Broadcaster) writeResources(newResources corev1.ResourceList) error {
b.offerMutex.Lock()
defer b.offerMutex.Unlock()
func (b *Broadcaster) writeResources(clusterID string, newResources corev1.ResourceList) error {
if newResources != nil {
b.allocatable = newResources.DeepCopy()
if clusterID != "" {
b.podMutex.Lock()
b.resourcePodMap[clusterID] = newResources.DeepCopy()
b.podMutex.Unlock()
} else {
b.nodeMutex.Lock()
b.allocatable = newResources.DeepCopy()
b.nodeMutex.Unlock()
}
return nil
}

return fmt.Errorf("some error occurred during cluster resources read. Attempting writing nil resources")
}

// ReadResources return in thread safe mode a scaled value of the resources.
func (b *Broadcaster) ReadResources() (corev1.ResourceList, error) {
b.offerMutex.RLock()
defer b.offerMutex.RUnlock()
if b.allocatable == nil {
return nil, fmt.Errorf("error getting cluster resources")
func (b *Broadcaster) ReadResources(clusterID string) (corev1.ResourceList, error) {
toRead := b.readNodeResources()
if toRead == nil {
return nil, fmt.Errorf("error getting node resources")
}
toRead := b.allocatable.DeepCopy()
podsResources := b.readPodResources(clusterID)
addResources(toRead, podsResources)
for resourceName, quantity := range toRead {
scaled := quantity
b.scaleResources(&scaled)
Expand All @@ -147,6 +237,22 @@ func (b *Broadcaster) ReadResources() (corev1.ResourceList, error) {
return toRead, nil
}

func (b *Broadcaster) readNodeResources() corev1.ResourceList {
b.nodeMutex.RLock()
defer b.nodeMutex.RUnlock()
toRead := b.allocatable.DeepCopy()
return toRead
}

func (b *Broadcaster) readPodResources(clusterID string) corev1.ResourceList {
b.podMutex.RLock()
defer b.podMutex.RUnlock()
if toRead, exists := b.resourcePodMap[clusterID]; exists {
return toRead.DeepCopy()
}
return corev1.ResourceList{}
}

func (b *Broadcaster) scaleResources(quantity *resource.Quantity) {
percentage := int64(b.getConfig().Spec.AdvertisementConfig.OutgoingConfig.ResourceSharingPercentage)
if percentage == 0 {
Expand Down Expand Up @@ -192,3 +298,23 @@ func updateResources(currentResources, oldResources, newResources corev1.Resourc
}
}
}

func setLabelSelectors(options *metav1.ListOptions) {
labelSet := labels.Set{forge.LiqoOutgoingKey: "test"}
if options == nil {
options = &metav1.ListOptions{}
options.LabelSelector = labels.SelectorFromSet(labelSet).String()
return
}
set, err := labels.ConvertSelectorToLabelsMap(options.LabelSelector)
if err != nil {
klog.Errorf("unable to get existing label selector: %v", err)
return
}
options.LabelSelector = labels.Merge(labelSet, set).String()
}

func extractPodResources(pod *corev1.Pod) (corev1.ResourceList, error) {
resourcesToExtract, _ := resourcehelper.PodRequestsAndLimits(pod)
return resourcesToExtract, nil
}
Loading

0 comments on commit 5b7eb47

Please sign in to comment.