Skip to content

Commit

Permalink
Added OfferUpdater component which implement all ResourceOffer update.
Browse files Browse the repository at this point in the history
Added BroadcasterInterface and UpdaterInterface to avoid cyclid dependencies.
Added lastReadResources parameter to track last read resources.

Implemented update method.
added checkThreshold method.
Some other fix.

fix deadlock.
added logic to remove a clusterId from both broadcaster pod map and updater queue.
fix bug in tests.
  • Loading branch information
giuse2596 committed Jul 14, 2021
1 parent adbb394 commit 8a148b5
Show file tree
Hide file tree
Showing 10 changed files with 375 additions and 31 deletions.
11 changes: 9 additions & 2 deletions cmd/advertisement-operator/main.go
Expand Up @@ -80,12 +80,17 @@ func main() {
var liqoNamespace, kubeletImage, initKubeletImage string
var resyncPeriod int64
var offloadingStatusControllerRequeueTime int64
var offerUploadThreshold int64
var timeToLive int64

flag.StringVar(&metricsAddr, "metrics-addr", defaultMetricsaddr, "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection,
"enable-leader-election", false,
"Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")
flag.Int64Var(&offerUploadThreshold, "offer-upload-threshold-perc", int64(5),
"Set the threshold percentage of quantity of resources modified which triggers the resourceOffer update.")
flag.Int64Var(&timeToLive, "resourceOffer-time-to-live", int64(30), "Set ResourceOffer time-to-live value")

flag.Int64Var(&resyncPeriod, "resyncPeriod", int64(10*time.Hour), "Period after that operators and informers will requeue events.")
flag.Int64Var(&offloadingStatusControllerRequeueTime, "offloadingStatusControllerRequeueTime", int64(10*time.Second),
Expand Down Expand Up @@ -154,8 +159,9 @@ func main() {
}

newBroadcaster := &resourceRequestOperator.Broadcaster{}

if err := newBroadcaster.SetupBroadcaster(clientset, time.Duration(resyncPeriod)); err != nil {
updater := &resourceRequestOperator.OfferUpdater{}
updater.Setup(newBroadcaster, mgr.GetClient())
if err := newBroadcaster.SetupBroadcaster(clientset, updater, time.Duration(resyncPeriod), offerUploadThreshold); err != nil {
klog.Error(err)
os.Exit(1)
}
Expand All @@ -165,6 +171,7 @@ func main() {
Scheme: mgr.GetScheme(),
ClusterID: clusterId,
Broadcaster: newBroadcaster,
TimeToLive: time.Duration(timeToLive) * time.Minute,
}

if err = resourceRequestReconciler.SetupWithManager(mgr); err != nil {
Expand Down
64 changes: 55 additions & 9 deletions internal/resource-request-operator/broadcaster.go
Expand Up @@ -14,27 +14,35 @@ import (
resourcehelper "k8s.io/kubectl/pkg/util/resource"

configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1"
"github.com/liqotech/liqo/internal/resource-request-operator/interfaces"
crdclient "github.com/liqotech/liqo/pkg/crdClient"
"github.com/liqotech/liqo/pkg/utils"
"github.com/liqotech/liqo/pkg/virtualKubelet/forge"
)

// Broadcaster is an object which is used to get resources of the cluster.
type Broadcaster struct {
allocatable corev1.ResourceList
resourcePodMap map[string]corev1.ResourceList
clusterConfig configv1alpha1.ClusterConfig
nodeMutex sync.RWMutex
podMutex sync.RWMutex
configMutex sync.RWMutex
nodeInformer cache.SharedIndexInformer
podInformer cache.SharedIndexInformer
allocatable corev1.ResourceList
resourcePodMap map[string]corev1.ResourceList
lastReadResources map[string]corev1.ResourceList
clusterConfig configv1alpha1.ClusterConfig
nodeMutex sync.RWMutex
podMutex sync.RWMutex
configMutex sync.RWMutex
nodeInformer cache.SharedIndexInformer
podInformer cache.SharedIndexInformer
updater interfaces.UpdaterInterface
updateThresholdPercentage int64
}

// SetupBroadcaster create the informer e run it to signal node changes updating Offers.
func (b *Broadcaster) SetupBroadcaster(clientset kubernetes.Interface, resyncPeriod time.Duration) error {
func (b *Broadcaster) SetupBroadcaster(clientset kubernetes.Interface, updater interfaces.UpdaterInterface,
resyncPeriod time.Duration, offerUpdateThreshold int64) error {
b.allocatable = corev1.ResourceList{}
b.updateThresholdPercentage = offerUpdateThreshold
b.updater = updater
b.resourcePodMap = map[string]corev1.ResourceList{}
b.lastReadResources = map[string]corev1.ResourceList{}
factory := informers.NewSharedInformerFactory(clientset, resyncPeriod)
b.nodeInformer = factory.Core().V1().Nodes().Informer()
b.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand All @@ -54,6 +62,7 @@ func (b *Broadcaster) SetupBroadcaster(clientset kubernetes.Interface, resyncPer

// StartBroadcaster starts two shared Informers, one for nodes and one for pods launching two separated goroutines.
func (b *Broadcaster) StartBroadcaster(ctx context.Context, group *sync.WaitGroup) {
go b.updater.Start(ctx, group)
go b.startNodeInformer(ctx, group)
go b.startPodInformer(ctx, group)
}
Expand Down Expand Up @@ -173,22 +182,33 @@ func (b *Broadcaster) onPodDelete(obj interface{}) {
// write nodes resources in thread safe mode.
func (b *Broadcaster) writeClusterResources(newResources corev1.ResourceList) {
b.nodeMutex.Lock()
b.podMutex.RLock()
defer b.nodeMutex.Unlock()
defer b.podMutex.RUnlock()
b.allocatable = newResources.DeepCopy()
for clusterID := range b.resourcePodMap {
if b.checkThreshold(clusterID) {
b.updater.Push(clusterID)
}
}
}

// write pods resources in thread safe mode.
func (b *Broadcaster) writePodsResources(clusterID string, newResources corev1.ResourceList) {
b.podMutex.Lock()
defer b.podMutex.Unlock()
b.resourcePodMap[clusterID] = newResources.DeepCopy()
if b.checkThreshold(clusterID) {
b.updater.Push(clusterID)
}
}

// ReadResources return in thread safe mode a scaled value of the resources.
func (b *Broadcaster) ReadResources(clusterID string) corev1.ResourceList {
toRead := b.readClusterResources()
podsResources := b.readPodResources(clusterID)
addResources(toRead, podsResources)
b.lastReadResources[clusterID] = toRead
for resourceName, quantity := range toRead {
scaled := quantity
b.scaleResources(resourceName, &scaled)
Expand All @@ -197,6 +217,16 @@ func (b *Broadcaster) ReadResources(clusterID string) corev1.ResourceList {
return toRead
}

// RemoveClusterID remove a clusterID from all broadcaster internal structures
// it is useful when a particular foreign cluster has no more peering and its ResourceRequest has been deleted.
func (b *Broadcaster) RemoveClusterID(clusterID string) {
b.podMutex.Lock()
defer b.podMutex.Unlock()
delete(b.resourcePodMap, clusterID)
delete(b.lastReadResources, clusterID)
b.updater.Remove(clusterID)
}

func (b *Broadcaster) readClusterResources() corev1.ResourceList {
b.nodeMutex.RLock()
defer b.nodeMutex.RUnlock()
Expand All @@ -212,6 +242,22 @@ func (b *Broadcaster) readPodResources(clusterID string) corev1.ResourceList {
return corev1.ResourceList{}
}

func (b *Broadcaster) checkThreshold(clusterID string) bool {
for resourceName, resources := range b.resourcePodMap[clusterID] {
clusterResources := b.allocatable[resourceName]
lastRead := b.lastReadResources[clusterID][resourceName]
clusterResourcesValue := clusterResources.Value()
resourcePodValue := resources.Value()
lastReadValue := lastRead.Value()
diff := (clusterResourcesValue + resourcePodValue) - lastReadValue
if diff > lastReadValue*b.updateThresholdPercentage/100 {
return true
}
}

return false
}

func (b *Broadcaster) scaleResources(resourceName corev1.ResourceName, quantity *resource.Quantity) {
percentage := int64(b.getConfig().Spec.AdvertisementConfig.OutgoingConfig.ResourceSharingPercentage)

Expand Down
@@ -0,0 +1,13 @@
package interfaces

import corev1 "k8s.io/api/core/v1"

// ClusterResourceInterface represents a generic subset of Broadcaster exported methods to be used instead of a direct access to
// the Broadcaster instance and get/update some cluster resources information.
type ClusterResourceInterface interface {
// ReadResources returns all free cluster resources calculated for a given clusterID scaled by a percentage value.
// If clusterID is not valid all scaled cluster resources will be returned.
ReadResources(clusterID string) corev1.ResourceList
// RemoveClusterID removes given clusterID from all internal structures and it will be no more valid.
RemoveClusterID(clusterID string)
}
2 changes: 2 additions & 0 deletions internal/resource-request-operator/interfaces/doc.go
@@ -0,0 +1,2 @@
// Package interfaces contains all the ResourceRequestOperator interfaces representing some of its components.
package interfaces
17 changes: 17 additions & 0 deletions internal/resource-request-operator/interfaces/updaterInterface.go
@@ -0,0 +1,17 @@
package interfaces

import (
"context"
"sync"
)

// UpdaterInterface represents a generic subset of Updater exported methods to be used instead of a direct access to
// a particular Updater instance.
type UpdaterInterface interface {
// Start run an instance of an updater which will be stopped when ctx.Done() is called.
Start(ctx context.Context, group *sync.WaitGroup)
// Push add the clusterID to the internal queue to be processed as soon as possible.
Push(clusterID string)
// Remove removes the clusterID from the internal queue and will be no longer processed.
Remove(clusterID string)
}
146 changes: 146 additions & 0 deletions internal/resource-request-operator/offerUpdater.go
@@ -0,0 +1,146 @@
package resourcerequestoperator

import (
"context"
"crypto/rand"
"fmt"
"math/big"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1"
sharingv1alpha1 "github.com/liqotech/liqo/apis/sharing/v1alpha1"
"github.com/liqotech/liqo/internal/resource-request-operator/interfaces"
"github.com/liqotech/liqo/pkg/discovery"
)

const requeueTimeout = 5 * time.Minute

// OfferUpdater is a component which wraps all ResourceOffer update logic.
type OfferUpdater struct {
queue workqueue.RateLimitingInterface
client.Client
broadcasterInt interfaces.ClusterResourceInterface
}

// Setup initialize all parameters of the OfferUpdater component.
func (u *OfferUpdater) Setup(broadcaster interfaces.ClusterResourceInterface, k8Client client.Client) {
u.broadcasterInt = broadcaster
u.Client = k8Client
u.queue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Offer update queue")
}

// Start run the OfferUpdate worker.
func (u *OfferUpdater) Start(ctx context.Context, group *sync.WaitGroup) {
defer u.queue.ShutDown()
group.Add(1)
defer group.Done()
go u.startRunner(ctx)
<-ctx.Done()
}

func (u *OfferUpdater) startRunner(ctx context.Context) {
wait.Until(u.run, 2*time.Second, ctx.Done())
}

func (u *OfferUpdater) run() {
for u.processNextItem() {
}
}

func (u *OfferUpdater) processNextItem() bool {
obj, shutdown := u.queue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer u.queue.Done(obj)
var clusterID string
var ok bool
if clusterID, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
u.queue.Forget(obj)
return fmt.Errorf("error getting object %v from OfferUpater queue. It is not a string", obj)
}
// check if the clusterID is still valid and can be processed
if !u.isValidClusterID(clusterID) {
// if clusterID is not valid means that the peering has been dropped or something went wrong.
// so we call Forget and RemoveClusterID in order to invalidate it.
u.broadcasterInt.RemoveClusterID(clusterID)
return fmt.Errorf("clusterID %s is no more valid and will be deleted", clusterID)
}
if err := u.updateOffer(clusterID); err != nil {
// Put the item back on the workqueue to handle any transient errors.
u.queue.AddRateLimited(clusterID)
return fmt.Errorf("error during updating ResourceOffer for cluster %s: %w", clusterID, err)
}
return nil
}(obj)
if err != nil {
klog.Errorf("Error occurred during ResourceOffer update %s", err)
return true
}
klog.Infof("Update cluster %s processed", obj.(string))

// requeue after timeout seconds
u.queue.AddAfter(obj, requeueTimeout+time.Duration(getRandomDelta())*time.Second)
return true
}

func (u *OfferUpdater) updateOffer(clusterID string) error {
offerList := &sharingv1alpha1.ResourceOfferList{}
if err := u.Client.List(context.Background(), offerList, client.MatchingLabels{
discovery.ClusterIDLabel: clusterID,
}); err != nil {
return err
}
if len(offerList.Items) > 1 {
return fmt.Errorf("too many ResourceOffers for cluster %s", clusterID)
} else if len(offerList.Items) == 0 {
return fmt.Errorf("no ResourceOffer for cluster %s", clusterID)
}
offer := &offerList.Items[0]
resources := u.broadcasterInt.ReadResources(clusterID)
offer.Spec.ResourceQuota.Hard = resources.DeepCopy()
if err := u.Client.Update(context.Background(), offer); err != nil {
return err
}
return nil
}

// Push add new clusterID to update queue which will be processes as soon as possible.
func (u *OfferUpdater) Push(clusterID string) {
u.queue.Add(clusterID)
}

// Remove remove a specified clusterID from the update queue and it will be no more processed.
func (u *OfferUpdater) Remove(clusterID string) {
u.queue.Forget(clusterID)
klog.Infof("Removed cluster %s from update queue", clusterID)
}

func (u *OfferUpdater) isValidClusterID(clusterID string) bool {
resourceRequestList := &discoveryv1alpha1.ResourceRequestList{}
err := u.Client.List(context.Background(), resourceRequestList)
if err != nil || len(resourceRequestList.Items) != 1 {
return false
}
return true
}

func getRandomDelta() int64 {
max := new(big.Int)
max.SetInt64(int64(60))
n, err := rand.Int(rand.Reader, max)
if err != nil {
return int64(0)
}
return n.Int64()
}
Expand Up @@ -22,11 +22,11 @@ type ResourceRequestReconciler struct {
Scheme *runtime.Scheme
ClusterID string
Broadcaster *Broadcaster
TimeToLive time.Duration
}

const (
offerPrefix = "resourceoffer-"
timeToLive = 30 * time.Minute
)

// +kubebuilder:rbac:groups=discovery.liqo.io,resources=resourceRequests,verbs=get;list;watch;create;update;patch;
Expand Down

0 comments on commit 8a148b5

Please sign in to comment.