Skip to content

Commit

Permalink
Added OfferUpdater component which implement all ResourceOffer update.
Browse files Browse the repository at this point in the history
Added BroadcasterInterface and UpdaterInterface to avoid cyclid dependencies.
Added lastReadResources parameter to track last read resources.

Implemented update method.
added checkThreshold method.
Some other fix.

fix deadlock.
added logic to remove a clusterId from both broadcaster pod map and updater queue.
fix bug in tests.

Moved ResourceOffer creation in offerUpdater component.
Removed Timestamp and TimeToLive field in ResourceOffer CR.
  • Loading branch information
giuse2596 committed Jul 19, 2021
1 parent 58caf0c commit c51efbf
Show file tree
Hide file tree
Showing 15 changed files with 433 additions and 117 deletions.
5 changes: 0 additions & 5 deletions apis/sharing/v1alpha1/resourceoffer_types.go
Expand Up @@ -18,11 +18,6 @@ type ResourceOfferSpec struct {
Labels map[string]string `json:"labels,omitempty"`
// Prices contains the possible prices for every kind of resource (cpu, memory, image).
Prices corev1.ResourceList `json:"prices,omitempty"`
// Timestamp is the time instant when this ResourceOffer was created.
Timestamp metav1.Time `json:"timestamp"`
// TimeToLive is the time instant until this ResourceOffer will be valid.
// If not refreshed, an ResourceOffer will expire after 30 minutes.
TimeToLive metav1.Time `json:"timeToLive"`
// WithdrawalTimestamp is set when a graceful deletion is requested by the user.
WithdrawalTimestamp *metav1.Time `json:"withdrawalTimestamp,omitempty"`
}
Expand Down
2 changes: 0 additions & 2 deletions apis/sharing/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 11 additions & 3 deletions cmd/advertisement-operator/main.go
Expand Up @@ -80,13 +80,15 @@ func main() {
var liqoNamespace, kubeletImage, initKubeletImage string
var resyncPeriod int64
var offloadingStatusControllerRequeueTime int64
var offerUpdateThreshold uint64

flag.StringVar(&metricsAddr, "metrics-addr", defaultMetricsaddr, "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection,
"enable-leader-election", false,
"Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")

flag.Uint64Var(&offerUpdateThreshold, "offer-update-threshold-perc", uint64(5),
"Set the threshold percentage of quantity of resources modified which triggers the resourceOffer update.")
flag.Int64Var(&resyncPeriod, "resyncPeriod", int64(10*time.Hour), "Period after that operators and informers will requeue events.")
flag.Int64Var(&offloadingStatusControllerRequeueTime, "offloadingStatusControllerRequeueTime", int64(10*time.Second),
"Period after that the offloadingStatusController is awaken on every namespaceOffloading in order to set its status.")
Expand All @@ -108,6 +110,11 @@ func main() {
os.Exit(1)
}

if offerUpdateThreshold > 100 {
klog.Error("offerUpdateThreshold exceeds 100")
os.Exit(1)
}

if localKubeconfig != "" {
if err := os.Setenv("KUBECONFIG", localKubeconfig); err != nil {
os.Exit(1)
Expand Down Expand Up @@ -154,8 +161,9 @@ func main() {
}

newBroadcaster := &resourceRequestOperator.Broadcaster{}

if err := newBroadcaster.SetupBroadcaster(clientset, time.Duration(resyncPeriod)); err != nil {
updater := &resourceRequestOperator.OfferUpdater{}
updater.Setup(clusterId, mgr.GetScheme(), newBroadcaster, mgr.GetClient())
if err := newBroadcaster.SetupBroadcaster(clientset, updater, time.Duration(resyncPeriod), offerUpdateThreshold); err != nil {
klog.Error(err)
os.Exit(1)
}
Expand Down
13 changes: 0 additions & 13 deletions deployments/liqo/crds/sharing.liqo.io_resourceoffers.yaml
Expand Up @@ -155,26 +155,13 @@ spec:
type: string
type: array
type: object
timeToLive:
description: TimeToLive is the time instant until this ResourceOffer
will be valid. If not refreshed, an ResourceOffer will expire after
30 minutes.
format: date-time
type: string
timestamp:
description: Timestamp is the time instant when this ResourceOffer
was created.
format: date-time
type: string
withdrawalTimestamp:
description: WithdrawalTimestamp is set when a graceful deletion is
requested by the user.
format: date-time
type: string
required:
- clusterId
- timeToLive
- timestamp
type: object
status:
description: ResourceOfferStatus defines the observed state of ResourceOffer.
Expand Down
72 changes: 61 additions & 11 deletions internal/resource-request-operator/broadcaster.go
Expand Up @@ -14,27 +14,35 @@ import (
resourcehelper "k8s.io/kubectl/pkg/util/resource"

configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1"
"github.com/liqotech/liqo/internal/resource-request-operator/interfaces"
crdclient "github.com/liqotech/liqo/pkg/crdClient"
"github.com/liqotech/liqo/pkg/utils"
"github.com/liqotech/liqo/pkg/virtualKubelet/forge"
)

// Broadcaster is an object which is used to get resources of the cluster.
type Broadcaster struct {
allocatable corev1.ResourceList
resourcePodMap map[string]corev1.ResourceList
clusterConfig configv1alpha1.ClusterConfig
nodeMutex sync.RWMutex
podMutex sync.RWMutex
configMutex sync.RWMutex
nodeInformer cache.SharedIndexInformer
podInformer cache.SharedIndexInformer
allocatable corev1.ResourceList
resourcePodMap map[string]corev1.ResourceList
lastReadResources map[string]corev1.ResourceList
clusterConfig configv1alpha1.ClusterConfig
nodeMutex sync.RWMutex
podMutex sync.RWMutex
configMutex sync.RWMutex
nodeInformer cache.SharedIndexInformer
podInformer cache.SharedIndexInformer
updater interfaces.UpdaterInterface
updateThresholdPercentage uint64
}

// SetupBroadcaster create the informer e run it to signal node changes updating Offers.
func (b *Broadcaster) SetupBroadcaster(clientset kubernetes.Interface, resyncPeriod time.Duration) error {
func (b *Broadcaster) SetupBroadcaster(clientset kubernetes.Interface, updater interfaces.UpdaterInterface,
resyncPeriod time.Duration, offerUpdateThreshold uint64) error {
b.allocatable = corev1.ResourceList{}
b.updateThresholdPercentage = offerUpdateThreshold
b.updater = updater
b.resourcePodMap = map[string]corev1.ResourceList{}
b.lastReadResources = map[string]corev1.ResourceList{}
factory := informers.NewSharedInformerFactory(clientset, resyncPeriod)
b.nodeInformer = factory.Core().V1().Nodes().Informer()
b.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand All @@ -54,6 +62,7 @@ func (b *Broadcaster) SetupBroadcaster(clientset kubernetes.Interface, resyncPer

// StartBroadcaster starts two shared Informers, one for nodes and one for pods launching two separated goroutines.
func (b *Broadcaster) StartBroadcaster(ctx context.Context, group *sync.WaitGroup) {
go b.updater.Start(ctx, group)
go b.startNodeInformer(ctx, group)
go b.startPodInformer(ctx, group)
}
Expand Down Expand Up @@ -82,7 +91,8 @@ func (b *Broadcaster) setConfig(configuration *configv1alpha1.ClusterConfig) {
b.clusterConfig = *configuration
}

func (b *Broadcaster) getConfig() *configv1alpha1.ClusterConfig {
// GetConfig returns an instance of a ClusterConfig resource.
func (b *Broadcaster) GetConfig() *configv1alpha1.ClusterConfig {
b.configMutex.RLock()
defer b.configMutex.RUnlock()
configCopy := b.clusterConfig.DeepCopy()
Expand Down Expand Up @@ -173,22 +183,33 @@ func (b *Broadcaster) onPodDelete(obj interface{}) {
// write nodes resources in thread safe mode.
func (b *Broadcaster) writeClusterResources(newResources corev1.ResourceList) {
b.nodeMutex.Lock()
b.podMutex.RLock()
defer b.nodeMutex.Unlock()
defer b.podMutex.RUnlock()
b.allocatable = newResources.DeepCopy()
for clusterID := range b.resourcePodMap {
if b.checkThreshold(clusterID) {
b.enqueueForCreationOrUpdate(clusterID)
}
}
}

// write pods resources in thread safe mode.
func (b *Broadcaster) writePodsResources(clusterID string, newResources corev1.ResourceList) {
b.podMutex.Lock()
defer b.podMutex.Unlock()
b.resourcePodMap[clusterID] = newResources.DeepCopy()
if b.checkThreshold(clusterID) {
b.enqueueForCreationOrUpdate(clusterID)
}
}

// ReadResources return in thread safe mode a scaled value of the resources.
func (b *Broadcaster) ReadResources(clusterID string) corev1.ResourceList {
toRead := b.readClusterResources()
podsResources := b.readPodResources(clusterID)
addResources(toRead, podsResources)
b.lastReadResources[clusterID] = toRead
for resourceName, quantity := range toRead {
scaled := quantity
b.scaleResources(resourceName, &scaled)
Expand All @@ -197,6 +218,19 @@ func (b *Broadcaster) ReadResources(clusterID string) corev1.ResourceList {
return toRead
}

func (b *Broadcaster) enqueueForCreationOrUpdate(clusterID string) {
b.updater.Push(clusterID)
}

// RemoveClusterID removes a clusterID from all broadcaster internal structures
// it is useful when a particular foreign cluster has no more peering and its ResourceRequest has been deleted.
func (b *Broadcaster) RemoveClusterID(clusterID string) {
b.podMutex.Lock()
defer b.podMutex.Unlock()
delete(b.resourcePodMap, clusterID)
delete(b.lastReadResources, clusterID)
}

func (b *Broadcaster) readClusterResources() corev1.ResourceList {
b.nodeMutex.RLock()
defer b.nodeMutex.RUnlock()
Expand All @@ -212,8 +246,24 @@ func (b *Broadcaster) readPodResources(clusterID string) corev1.ResourceList {
return corev1.ResourceList{}
}

func (b *Broadcaster) checkThreshold(clusterID string) bool {
for resourceName, resources := range b.resourcePodMap[clusterID] {
clusterResources := b.allocatable[resourceName]
lastRead := b.lastReadResources[clusterID][resourceName]
clusterResourcesValue := clusterResources.Value()
resourcePodValue := resources.Value()
lastReadValue := lastRead.Value()
diff := (clusterResourcesValue + resourcePodValue) - lastReadValue
if diff > lastReadValue*int64(b.updateThresholdPercentage)/100 {
return true
}
}

return false
}

func (b *Broadcaster) scaleResources(resourceName corev1.ResourceName, quantity *resource.Quantity) {
percentage := int64(b.getConfig().Spec.AdvertisementConfig.OutgoingConfig.ResourceSharingPercentage)
percentage := int64(b.GetConfig().Spec.AdvertisementConfig.OutgoingConfig.ResourceSharingPercentage)

switch resourceName {
case corev1.ResourceCPU:
Expand Down
@@ -0,0 +1,18 @@
package interfaces

import (
corev1 "k8s.io/api/core/v1"

configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1"
)

// ClusterResourceInterface represents a generic subset of Broadcaster exported methods to be used instead of a direct access to
// the Broadcaster instance and get/update some cluster resources information.
type ClusterResourceInterface interface {
// ReadResources returns all free cluster resources calculated for a given clusterID scaled by a percentage value.
ReadResources(clusterID string) corev1.ResourceList
// RemoveClusterID removes given clusterID from all internal structures and it will be no more valid.
RemoveClusterID(clusterID string)
// GetConfig returns a ClusterConfig instance.
GetConfig() *configv1alpha1.ClusterConfig
}
2 changes: 2 additions & 0 deletions internal/resource-request-operator/interfaces/doc.go
@@ -0,0 +1,2 @@
// Package interfaces contains all the ResourceRequestOperator interfaces representing some of its components.
package interfaces
15 changes: 15 additions & 0 deletions internal/resource-request-operator/interfaces/updaterInterface.go
@@ -0,0 +1,15 @@
package interfaces

import (
"context"
"sync"
)

// UpdaterInterface represents a generic subset of Updater exported methods to be used instead of a direct access to
// a particular Updater instance.
type UpdaterInterface interface {
// Start runs an instance of an updater which will be stopped when ctx.Done() is closed.
Start(ctx context.Context, group *sync.WaitGroup)
// Push adds the clusterID to the internal queue to be processed as soon as possible.
Push(clusterID string)
}

0 comments on commit c51efbf

Please sign in to comment.