From c51efbf8e200f78839b29f52596942d60cd92126 Mon Sep 17 00:00:00 2001 From: Giuseppe Alicino Date: Tue, 6 Jul 2021 19:05:50 +0200 Subject: [PATCH] Added OfferUpdater component which implement all ResourceOffer update. Added BroadcasterInterface and UpdaterInterface to avoid cyclid dependencies. Added lastReadResources parameter to track last read resources. Implemented update method. added checkThreshold method. Some other fix. fix deadlock. added logic to remove a clusterId from both broadcaster pod map and updater queue. fix bug in tests. Moved ResourceOffer creation in offerUpdater component. Removed Timestamp and TimeToLive field in ResourceOffer CR. --- apis/sharing/v1alpha1/resourceoffer_types.go | 5 - .../sharing/v1alpha1/zz_generated.deepcopy.go | 2 - cmd/advertisement-operator/main.go | 14 +- .../crds/sharing.liqo.io_resourceoffers.yaml | 13 -- .../resource-request-operator/broadcaster.go | 72 +++++-- .../interfaces/clusterResourceInterface.go | 18 ++ .../interfaces/doc.go | 2 + .../interfaces/updaterInterface.go | 15 ++ .../resource-request-operator/offerUpdater.go | 182 ++++++++++++++++++ .../resourceRequest_controller.go | 4 +- .../resourceRequest_operator_test.go | 151 +++++++++++++-- .../resource-request-operator/suite_test.go | 10 +- internal/resource-request-operator/utils.go | 42 ---- .../resourceoffer_controller_test.go | 12 +- .../liqoNodeProvider/nodeProvider_test.go | 8 +- 15 files changed, 433 insertions(+), 117 deletions(-) create mode 100644 internal/resource-request-operator/interfaces/clusterResourceInterface.go create mode 100644 internal/resource-request-operator/interfaces/doc.go create mode 100644 internal/resource-request-operator/interfaces/updaterInterface.go create mode 100644 internal/resource-request-operator/offerUpdater.go diff --git a/apis/sharing/v1alpha1/resourceoffer_types.go b/apis/sharing/v1alpha1/resourceoffer_types.go index 2b87df1018..558d903bba 100644 --- a/apis/sharing/v1alpha1/resourceoffer_types.go +++ b/apis/sharing/v1alpha1/resourceoffer_types.go @@ -18,11 +18,6 @@ type ResourceOfferSpec struct { Labels map[string]string `json:"labels,omitempty"` // Prices contains the possible prices for every kind of resource (cpu, memory, image). Prices corev1.ResourceList `json:"prices,omitempty"` - // Timestamp is the time instant when this ResourceOffer was created. - Timestamp metav1.Time `json:"timestamp"` - // TimeToLive is the time instant until this ResourceOffer will be valid. - // If not refreshed, an ResourceOffer will expire after 30 minutes. - TimeToLive metav1.Time `json:"timeToLive"` // WithdrawalTimestamp is set when a graceful deletion is requested by the user. WithdrawalTimestamp *metav1.Time `json:"withdrawalTimestamp,omitempty"` } diff --git a/apis/sharing/v1alpha1/zz_generated.deepcopy.go b/apis/sharing/v1alpha1/zz_generated.deepcopy.go index 89f1f3eff2..520263c680 100644 --- a/apis/sharing/v1alpha1/zz_generated.deepcopy.go +++ b/apis/sharing/v1alpha1/zz_generated.deepcopy.go @@ -108,8 +108,6 @@ func (in *ResourceOfferSpec) DeepCopyInto(out *ResourceOfferSpec) { (*out)[key] = val.DeepCopy() } } - in.Timestamp.DeepCopyInto(&out.Timestamp) - in.TimeToLive.DeepCopyInto(&out.TimeToLive) if in.WithdrawalTimestamp != nil { in, out := &in.WithdrawalTimestamp, &out.WithdrawalTimestamp *out = (*in).DeepCopy() diff --git a/cmd/advertisement-operator/main.go b/cmd/advertisement-operator/main.go index bd12b12c6d..704aad3abe 100644 --- a/cmd/advertisement-operator/main.go +++ b/cmd/advertisement-operator/main.go @@ -80,13 +80,15 @@ func main() { var liqoNamespace, kubeletImage, initKubeletImage string var resyncPeriod int64 var offloadingStatusControllerRequeueTime int64 + var offerUpdateThreshold uint64 flag.StringVar(&metricsAddr, "metrics-addr", defaultMetricsaddr, "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") flag.BoolVar(&enableLeaderElection, "enable-leader-election", false, "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.") - + flag.Uint64Var(&offerUpdateThreshold, "offer-update-threshold-perc", uint64(5), + "Set the threshold percentage of quantity of resources modified which triggers the resourceOffer update.") flag.Int64Var(&resyncPeriod, "resyncPeriod", int64(10*time.Hour), "Period after that operators and informers will requeue events.") flag.Int64Var(&offloadingStatusControllerRequeueTime, "offloadingStatusControllerRequeueTime", int64(10*time.Second), "Period after that the offloadingStatusController is awaken on every namespaceOffloading in order to set its status.") @@ -108,6 +110,11 @@ func main() { os.Exit(1) } + if offerUpdateThreshold > 100 { + klog.Error("offerUpdateThreshold exceeds 100") + os.Exit(1) + } + if localKubeconfig != "" { if err := os.Setenv("KUBECONFIG", localKubeconfig); err != nil { os.Exit(1) @@ -154,8 +161,9 @@ func main() { } newBroadcaster := &resourceRequestOperator.Broadcaster{} - - if err := newBroadcaster.SetupBroadcaster(clientset, time.Duration(resyncPeriod)); err != nil { + updater := &resourceRequestOperator.OfferUpdater{} + updater.Setup(clusterId, mgr.GetScheme(), newBroadcaster, mgr.GetClient()) + if err := newBroadcaster.SetupBroadcaster(clientset, updater, time.Duration(resyncPeriod), offerUpdateThreshold); err != nil { klog.Error(err) os.Exit(1) } diff --git a/deployments/liqo/crds/sharing.liqo.io_resourceoffers.yaml b/deployments/liqo/crds/sharing.liqo.io_resourceoffers.yaml index d2ba0f486b..c13d3a991f 100644 --- a/deployments/liqo/crds/sharing.liqo.io_resourceoffers.yaml +++ b/deployments/liqo/crds/sharing.liqo.io_resourceoffers.yaml @@ -155,17 +155,6 @@ spec: type: string type: array type: object - timeToLive: - description: TimeToLive is the time instant until this ResourceOffer - will be valid. If not refreshed, an ResourceOffer will expire after - 30 minutes. - format: date-time - type: string - timestamp: - description: Timestamp is the time instant when this ResourceOffer - was created. - format: date-time - type: string withdrawalTimestamp: description: WithdrawalTimestamp is set when a graceful deletion is requested by the user. @@ -173,8 +162,6 @@ spec: type: string required: - clusterId - - timeToLive - - timestamp type: object status: description: ResourceOfferStatus defines the observed state of ResourceOffer. diff --git a/internal/resource-request-operator/broadcaster.go b/internal/resource-request-operator/broadcaster.go index 7281d4922c..04f1b0a763 100644 --- a/internal/resource-request-operator/broadcaster.go +++ b/internal/resource-request-operator/broadcaster.go @@ -14,6 +14,7 @@ import ( resourcehelper "k8s.io/kubectl/pkg/util/resource" configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1" + "github.com/liqotech/liqo/internal/resource-request-operator/interfaces" crdclient "github.com/liqotech/liqo/pkg/crdClient" "github.com/liqotech/liqo/pkg/utils" "github.com/liqotech/liqo/pkg/virtualKubelet/forge" @@ -21,20 +22,27 @@ import ( // Broadcaster is an object which is used to get resources of the cluster. type Broadcaster struct { - allocatable corev1.ResourceList - resourcePodMap map[string]corev1.ResourceList - clusterConfig configv1alpha1.ClusterConfig - nodeMutex sync.RWMutex - podMutex sync.RWMutex - configMutex sync.RWMutex - nodeInformer cache.SharedIndexInformer - podInformer cache.SharedIndexInformer + allocatable corev1.ResourceList + resourcePodMap map[string]corev1.ResourceList + lastReadResources map[string]corev1.ResourceList + clusterConfig configv1alpha1.ClusterConfig + nodeMutex sync.RWMutex + podMutex sync.RWMutex + configMutex sync.RWMutex + nodeInformer cache.SharedIndexInformer + podInformer cache.SharedIndexInformer + updater interfaces.UpdaterInterface + updateThresholdPercentage uint64 } // SetupBroadcaster create the informer e run it to signal node changes updating Offers. -func (b *Broadcaster) SetupBroadcaster(clientset kubernetes.Interface, resyncPeriod time.Duration) error { +func (b *Broadcaster) SetupBroadcaster(clientset kubernetes.Interface, updater interfaces.UpdaterInterface, + resyncPeriod time.Duration, offerUpdateThreshold uint64) error { b.allocatable = corev1.ResourceList{} + b.updateThresholdPercentage = offerUpdateThreshold + b.updater = updater b.resourcePodMap = map[string]corev1.ResourceList{} + b.lastReadResources = map[string]corev1.ResourceList{} factory := informers.NewSharedInformerFactory(clientset, resyncPeriod) b.nodeInformer = factory.Core().V1().Nodes().Informer() b.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -54,6 +62,7 @@ func (b *Broadcaster) SetupBroadcaster(clientset kubernetes.Interface, resyncPer // StartBroadcaster starts two shared Informers, one for nodes and one for pods launching two separated goroutines. func (b *Broadcaster) StartBroadcaster(ctx context.Context, group *sync.WaitGroup) { + go b.updater.Start(ctx, group) go b.startNodeInformer(ctx, group) go b.startPodInformer(ctx, group) } @@ -82,7 +91,8 @@ func (b *Broadcaster) setConfig(configuration *configv1alpha1.ClusterConfig) { b.clusterConfig = *configuration } -func (b *Broadcaster) getConfig() *configv1alpha1.ClusterConfig { +// GetConfig returns an instance of a ClusterConfig resource. +func (b *Broadcaster) GetConfig() *configv1alpha1.ClusterConfig { b.configMutex.RLock() defer b.configMutex.RUnlock() configCopy := b.clusterConfig.DeepCopy() @@ -173,8 +183,15 @@ func (b *Broadcaster) onPodDelete(obj interface{}) { // write nodes resources in thread safe mode. func (b *Broadcaster) writeClusterResources(newResources corev1.ResourceList) { b.nodeMutex.Lock() + b.podMutex.RLock() defer b.nodeMutex.Unlock() + defer b.podMutex.RUnlock() b.allocatable = newResources.DeepCopy() + for clusterID := range b.resourcePodMap { + if b.checkThreshold(clusterID) { + b.enqueueForCreationOrUpdate(clusterID) + } + } } // write pods resources in thread safe mode. @@ -182,6 +199,9 @@ func (b *Broadcaster) writePodsResources(clusterID string, newResources corev1.R b.podMutex.Lock() defer b.podMutex.Unlock() b.resourcePodMap[clusterID] = newResources.DeepCopy() + if b.checkThreshold(clusterID) { + b.enqueueForCreationOrUpdate(clusterID) + } } // ReadResources return in thread safe mode a scaled value of the resources. @@ -189,6 +209,7 @@ func (b *Broadcaster) ReadResources(clusterID string) corev1.ResourceList { toRead := b.readClusterResources() podsResources := b.readPodResources(clusterID) addResources(toRead, podsResources) + b.lastReadResources[clusterID] = toRead for resourceName, quantity := range toRead { scaled := quantity b.scaleResources(resourceName, &scaled) @@ -197,6 +218,19 @@ func (b *Broadcaster) ReadResources(clusterID string) corev1.ResourceList { return toRead } +func (b *Broadcaster) enqueueForCreationOrUpdate(clusterID string) { + b.updater.Push(clusterID) +} + +// RemoveClusterID removes a clusterID from all broadcaster internal structures +// it is useful when a particular foreign cluster has no more peering and its ResourceRequest has been deleted. +func (b *Broadcaster) RemoveClusterID(clusterID string) { + b.podMutex.Lock() + defer b.podMutex.Unlock() + delete(b.resourcePodMap, clusterID) + delete(b.lastReadResources, clusterID) +} + func (b *Broadcaster) readClusterResources() corev1.ResourceList { b.nodeMutex.RLock() defer b.nodeMutex.RUnlock() @@ -212,8 +246,24 @@ func (b *Broadcaster) readPodResources(clusterID string) corev1.ResourceList { return corev1.ResourceList{} } +func (b *Broadcaster) checkThreshold(clusterID string) bool { + for resourceName, resources := range b.resourcePodMap[clusterID] { + clusterResources := b.allocatable[resourceName] + lastRead := b.lastReadResources[clusterID][resourceName] + clusterResourcesValue := clusterResources.Value() + resourcePodValue := resources.Value() + lastReadValue := lastRead.Value() + diff := (clusterResourcesValue + resourcePodValue) - lastReadValue + if diff > lastReadValue*int64(b.updateThresholdPercentage)/100 { + return true + } + } + + return false +} + func (b *Broadcaster) scaleResources(resourceName corev1.ResourceName, quantity *resource.Quantity) { - percentage := int64(b.getConfig().Spec.AdvertisementConfig.OutgoingConfig.ResourceSharingPercentage) + percentage := int64(b.GetConfig().Spec.AdvertisementConfig.OutgoingConfig.ResourceSharingPercentage) switch resourceName { case corev1.ResourceCPU: diff --git a/internal/resource-request-operator/interfaces/clusterResourceInterface.go b/internal/resource-request-operator/interfaces/clusterResourceInterface.go new file mode 100644 index 0000000000..cd121edad8 --- /dev/null +++ b/internal/resource-request-operator/interfaces/clusterResourceInterface.go @@ -0,0 +1,18 @@ +package interfaces + +import ( + corev1 "k8s.io/api/core/v1" + + configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1" +) + +// ClusterResourceInterface represents a generic subset of Broadcaster exported methods to be used instead of a direct access to +// the Broadcaster instance and get/update some cluster resources information. +type ClusterResourceInterface interface { + // ReadResources returns all free cluster resources calculated for a given clusterID scaled by a percentage value. + ReadResources(clusterID string) corev1.ResourceList + // RemoveClusterID removes given clusterID from all internal structures and it will be no more valid. + RemoveClusterID(clusterID string) + // GetConfig returns a ClusterConfig instance. + GetConfig() *configv1alpha1.ClusterConfig +} diff --git a/internal/resource-request-operator/interfaces/doc.go b/internal/resource-request-operator/interfaces/doc.go new file mode 100644 index 0000000000..4c8fc96511 --- /dev/null +++ b/internal/resource-request-operator/interfaces/doc.go @@ -0,0 +1,2 @@ +// Package interfaces contains all the ResourceRequestOperator interfaces representing some of its components. +package interfaces diff --git a/internal/resource-request-operator/interfaces/updaterInterface.go b/internal/resource-request-operator/interfaces/updaterInterface.go new file mode 100644 index 0000000000..b9d66546bc --- /dev/null +++ b/internal/resource-request-operator/interfaces/updaterInterface.go @@ -0,0 +1,15 @@ +package interfaces + +import ( + "context" + "sync" +) + +// UpdaterInterface represents a generic subset of Updater exported methods to be used instead of a direct access to +// a particular Updater instance. +type UpdaterInterface interface { + // Start runs an instance of an updater which will be stopped when ctx.Done() is closed. + Start(ctx context.Context, group *sync.WaitGroup) + // Push adds the clusterID to the internal queue to be processed as soon as possible. + Push(clusterID string) +} diff --git a/internal/resource-request-operator/offerUpdater.go b/internal/resource-request-operator/offerUpdater.go new file mode 100644 index 0000000000..e839c0d75d --- /dev/null +++ b/internal/resource-request-operator/offerUpdater.go @@ -0,0 +1,182 @@ +package resourcerequestoperator + +import ( + "context" + "crypto/rand" + "fmt" + "math/big" + "sync" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1" + sharingv1alpha1 "github.com/liqotech/liqo/apis/sharing/v1alpha1" + crdreplicator "github.com/liqotech/liqo/internal/crdReplicator" + "github.com/liqotech/liqo/internal/resource-request-operator/interfaces" + "github.com/liqotech/liqo/pkg/discovery" +) + +// requeueTimeout define a period of processed items requeue. +const requeueTimeout = 5 * time.Minute + +// maxRandom is used to generate a random delta to add to requeueTimeout to avoid syncing. +const maxRandom = 60 + +// OfferUpdater is a component which wraps all ResourceOffer update logic. +type OfferUpdater struct { + queue workqueue.RateLimitingInterface + client.Client + broadcasterInt interfaces.ClusterResourceInterface + homeClusterID string + scheme *runtime.Scheme +} + +// Setup initializes all parameters of the OfferUpdater component. +func (u *OfferUpdater) Setup(clusterID string, scheme *runtime.Scheme, broadcaster interfaces.ClusterResourceInterface, k8Client client.Client) { + u.broadcasterInt = broadcaster + u.Client = k8Client + u.homeClusterID = clusterID + u.scheme = scheme + u.queue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Offer update queue") +} + +// Start runs the OfferUpdate worker. +func (u *OfferUpdater) Start(ctx context.Context, group *sync.WaitGroup) { + defer u.queue.ShutDown() + group.Add(1) + defer group.Done() + go u.startRunner(ctx) + <-ctx.Done() +} + +func (u *OfferUpdater) startRunner(ctx context.Context) { + wait.Until(u.run, 2*time.Second, ctx.Done()) +} + +func (u *OfferUpdater) run() { + for u.processNextItem() { + } +} + +func (u *OfferUpdater) processNextItem() bool { + obj, shutdown := u.queue.Get() + if shutdown { + return false + } + err := func(obj interface{}) error { + defer u.queue.Done(obj) + var clusterID string + var ok bool + if clusterID, ok = obj.(string); !ok { + // As the item in the workqueue is actually invalid, we call + // Forget here else we'd go into a loop of attempting to + // process a work item that is invalid. + u.queue.Forget(obj) + return fmt.Errorf("error getting object %v from OfferUpater queue. It is not a string", obj) + } + // call createOrUpdate which after some controls generate a new resourceOffer for this clusterID or update it if exists. + if requeue, err := u.createOrUpdateOffer(clusterID); err != nil { + if requeue { + // requeue is true due to a transient error so put the item back on the workqueue. + u.queue.AddRateLimited(clusterID) + } else { + // requeue == false means that the clusterID is no more valid and so it will be not requeued. + u.Remove(clusterID) + u.broadcasterInt.RemoveClusterID(clusterID) + } + return fmt.Errorf("error during updating ResourceOffer for cluster %s: %w", clusterID, err) + } + return nil + }(obj) + if err != nil { + klog.Errorf("Error occurred during ResourceOffer update %s", err) + return true + } + klog.Infof("Update cluster %s processed", obj.(string)) + + // requeue after timeout seconds + u.queue.AddAfter(obj, getRandomTimeout()) + return true +} + +func (u *OfferUpdater) createOrUpdateOffer(clusterID string) (bool, error) { + list, err := u.getResourceRequest(clusterID) + if err != nil { + return true, err + } else if len(list.Items) != 1 { + // invalid clusterID so return requeue = false. The clusterID will be removed from the workqueue and broadacaster maps. + return false, fmt.Errorf("ClusterID %s is no more valid. Deleting", clusterID) + } + request := list.Items[0] + resources := u.broadcasterInt.ReadResources(clusterID) + offer := &sharingv1alpha1.ResourceOffer{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: request.GetNamespace(), + Name: offerPrefix + u.homeClusterID, + }, + } + + op, err := controllerutil.CreateOrUpdate(context.Background(), u.Client, offer, func() error { + if offer.Labels != nil { + offer.Labels[discovery.ClusterIDLabel] = request.Spec.ClusterIdentity.ClusterID + offer.Labels[crdreplicator.LocalLabelSelector] = "true" + offer.Labels[crdreplicator.DestinationLabel] = request.Spec.ClusterIdentity.ClusterID + } else { + offer.Labels = map[string]string{ + discovery.ClusterIDLabel: request.Spec.ClusterIdentity.ClusterID, + crdreplicator.LocalLabelSelector: "true", + crdreplicator.DestinationLabel: request.Spec.ClusterIdentity.ClusterID, + } + } + offer.Spec.ClusterId = u.homeClusterID + offer.Spec.ResourceQuota.Hard = resources.DeepCopy() + offer.Spec.Labels = u.broadcasterInt.GetConfig().Spec.DiscoveryConfig.ClusterLabels + return controllerutil.SetControllerReference(&request, offer, u.scheme) + }) + + if err != nil { + klog.Error(err) + return true, err + } + klog.Infof("%s -> %s Offer: %s/%s", u.homeClusterID, op, offer.Namespace, offer.Name) + return true, nil +} + +func (u *OfferUpdater) getResourceRequest(clusterID string) (*discoveryv1alpha1.ResourceRequestList, error) { + resourceRequestList := &discoveryv1alpha1.ResourceRequestList{} + err := u.Client.List(context.Background(), resourceRequestList, client.MatchingLabels{ + crdreplicator.RemoteLabelSelector: clusterID, + }) + if err != nil { + return nil, err + } + return resourceRequestList, nil +} + +// Push add new clusterID to update queue which will be processes as soon as possible. +func (u *OfferUpdater) Push(clusterID string) { + u.queue.Add(clusterID) +} + +// Remove removes a specified clusterID from the update queue and it will be no more processed. +func (u *OfferUpdater) Remove(clusterID string) { + u.queue.Forget(clusterID) + klog.Infof("Removed cluster %s from update queue", clusterID) +} + +func getRandomTimeout() time.Duration { + max := new(big.Int) + max.SetInt64(int64(maxRandom)) + n, err := rand.Int(rand.Reader, max) + if err != nil { + return requeueTimeout + } + return requeueTimeout + time.Duration(n.Int64())*time.Second +} diff --git a/internal/resource-request-operator/resourceRequest_controller.go b/internal/resource-request-operator/resourceRequest_controller.go index 8c71e47437..1697c25166 100644 --- a/internal/resource-request-operator/resourceRequest_controller.go +++ b/internal/resource-request-operator/resourceRequest_controller.go @@ -2,7 +2,6 @@ package resourcerequestoperator import ( "context" - "time" "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" @@ -26,7 +25,6 @@ type ResourceRequestReconciler struct { const ( offerPrefix = "resourceoffer-" - timeToLive = 30 * time.Minute ) // +kubebuilder:rbac:groups=discovery.liqo.io,resources=resourceRequests,verbs=get;list;watch;create;update;patch; @@ -89,7 +87,7 @@ func (r *ResourceRequestReconciler) Reconcile(ctx context.Context, req ctrl.Requ }() if resourceRequest.Spec.WithdrawalTimestamp.IsZero() { - err = r.generateResourceOffer(ctx, &resourceRequest) + r.Broadcaster.enqueueForCreationOrUpdate(remoteClusterID) if err != nil { klog.Errorf("%s -> Error generating resourceOffer: %s", remoteClusterID, err) return ctrl.Result{}, err diff --git a/internal/resource-request-operator/resourceRequest_operator_test.go b/internal/resource-request-operator/resourceRequest_operator_test.go index 33f7c9a892..a0dc36db5c 100644 --- a/internal/resource-request-operator/resourceRequest_operator_test.go +++ b/internal/resource-request-operator/resourceRequest_operator_test.go @@ -38,7 +38,7 @@ var ( now = metav1.Now() ) -func createTestNodes() (*corev1.Node, *corev1.Node) { +func createTestNodes() (node1, node2 *corev1.Node) { resources := corev1.ResourceList{} resources[corev1.ResourceCPU] = *resource.NewQuantity(2, resource.DecimalSI) resources[corev1.ResourceMemory] = *resource.NewQuantity(1000000, resource.DecimalSI) @@ -83,7 +83,7 @@ func createTestNodes() (*corev1.Node, *corev1.Node) { return first, second } -func createTestPods() (*corev1.Pod, *corev1.Pod) { +func createTestPods() (podWithLabels, podWithoutLabels *corev1.Pod) { resources := corev1.ResourceList{} resources[corev1.ResourceCPU] = *resource.NewQuantity(1, resource.DecimalSI) resources[corev1.ResourceMemory] = *resource.NewQuantity(50000, resource.DecimalSI) @@ -136,6 +136,43 @@ func createTestPods() (*corev1.Pod, *corev1.Pod) { return pod1, wrongPod } +func checkResourceOfferUpdate(nodeResources, podResources []corev1.ResourceList) bool { + offer := &sharingv1alpha1.ResourceOffer{} + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: offerPrefix + clusterID, + Namespace: ResourcesNamespace, + }, offer) + if err != nil { + return false + } + offerResources := offer.Spec.ResourceQuota.Hard + testList := corev1.ResourceList{} + for _, nodeResource := range nodeResources { + for resourceName, quantity := range nodeResource { + toAdd := testList[resourceName].DeepCopy() + toAdd.Add(quantity) + testList[resourceName] = toAdd.DeepCopy() + } + } + + for _, podResource := range podResources { + for resourceName, quantity := range podResource { + toSub := testList[resourceName].DeepCopy() + toSub.Sub(quantity) + testList[resourceName] = toSub.DeepCopy() + } + } + + for resourceName, quantity := range offerResources { + toCheck := testList[resourceName].DeepCopy() + scale(resourceName, &toCheck) + if quantity.Cmp(toCheck) != 0 { + return false + } + } + return true +} + func scale(resourceName corev1.ResourceName, quantity *resource.Quantity) { percentage := int64(50) switch resourceName { @@ -157,7 +194,7 @@ func createResourceRequest() *discoveryv1alpha1.ResourceRequest { Name: ResourceRequestName, Namespace: ResourcesNamespace, Labels: map[string]string{ - crdreplicator.RemoteLabelSelector: clusterId, + crdreplicator.RemoteLabelSelector: homeClusterID, crdreplicator.ReplicationStatuslabel: "true", }, }, @@ -186,9 +223,9 @@ var _ = Describe("ResourceRequest Operator", func() { node2 *corev1.Node ) BeforeEach(func() { + createdResourceRequest = createResourceRequest() node1, node2 = createTestNodes() _, podWithoutLabel = createTestPods() - createdResourceRequest = createResourceRequest() }) AfterEach(func() { err := clientset.CoreV1().Nodes().DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{}) @@ -243,7 +280,7 @@ var _ = Describe("ResourceRequest Operator", func() { By("Checking Offer creation") createdResourceOffer := &sharingv1alpha1.ResourceOffer{} offerName := types.NamespacedName{ - Name: offerPrefix + clusterId, + Name: offerPrefix + clusterID, Namespace: ResourcesNamespace, } klog.Info(offerName) @@ -252,7 +289,7 @@ var _ = Describe("ResourceRequest Operator", func() { }, timeout, interval).ShouldNot(HaveOccurred()) By("Checking all ResourceOffer parameters") - Expect(createdResourceOffer.Name).Should(ContainSubstring(clusterId)) + Expect(createdResourceOffer.Name).Should(ContainSubstring(clusterID)) Expect(createdResourceOffer.Labels[discovery.ClusterIDLabel]).Should(Equal(createdResourceRequest.Spec.ClusterIdentity.ClusterID)) Expect(createdResourceOffer.Labels[crdreplicator.LocalLabelSelector]).Should(Equal("true")) Expect(createdResourceOffer.Labels[crdreplicator.DestinationLabel]).Should(Equal(createdResourceRequest.Spec.ClusterIdentity.ClusterID)) @@ -264,14 +301,27 @@ var _ = Describe("ResourceRequest Operator", func() { By("Checking resources at offer creation") podReq, _ := resourcehelper.PodRequestsAndLimits(podWithoutLabel) - offerResources := createdResourceOffer.Spec.ResourceQuota.Hard - for resourceName, quantity := range offerResources { - testValue := node1.Status.Allocatable[resourceName].DeepCopy() - testValue.Add(node2.Status.Allocatable[resourceName]) - testValue.Sub(podReq[resourceName]) - scale(resourceName, &testValue) - Expect(quantity.Cmp(testValue)).Should(BeZero()) - } + Eventually(func() bool { + offer := &sharingv1alpha1.ResourceOffer{} + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: offerPrefix + clusterID, + Namespace: ResourcesNamespace, + }, offer) + if err != nil { + return false + } + offerResources := offer.Spec.ResourceQuota.Hard + for resourceName, quantity := range offerResources { + testValue := node2.Status.Allocatable[resourceName].DeepCopy() + testValue.Add(node1.Status.Allocatable[resourceName]) + testValue.Sub(podReq[resourceName]) + scale(resourceName, &testValue) + if quantity.Cmp(testValue) != 0 { + return false + } + } + return true + }, timeout, interval).Should(BeTrue()) By("Checking ResourceOffer invalidation on request set deleting phase") err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { @@ -290,10 +340,19 @@ var _ = Describe("ResourceRequest Operator", func() { Expect(err).ToNot(HaveOccurred()) // set the vk status in the ResourceOffer to created - createdResourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusCreated - createdResourceOffer.Status.Phase = sharingv1alpha1.ResourceOfferAccepted - Expect(k8sClient.Status().Update(ctx, createdResourceOffer)).ToNot(HaveOccurred()) - + err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: createdResourceOffer.Name, + Namespace: createdResourceOffer.Namespace, + }, createdResourceOffer) + if err != nil { + return err + } + createdResourceOffer.Status.VirtualKubeletStatus = sharingv1alpha1.VirtualKubeletStatusCreated + createdResourceOffer.Status.Phase = sharingv1alpha1.ResourceOfferAccepted + return k8sClient.Status().Update(ctx, createdResourceOffer) + }) + Expect(err).ToNot(HaveOccurred()) var resourceOffer sharingv1alpha1.ResourceOffer Eventually(func() bool { if err := k8sClient.Get(ctx, offerName, &resourceOffer); err != nil { @@ -341,6 +400,17 @@ var _ = Describe("ResourceRequest Operator", func() { Context("Testing broadcaster", func() { It("Broadcaster should update resources in correct way", func() { + var resourceRequest discoveryv1alpha1.ResourceRequest + Eventually(func() []string { + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: ResourceRequestName, + Namespace: ResourcesNamespace, + }, &resourceRequest) + if err != nil { + return []string{} + } + return resourceRequest.Finalizers + }, timeout, interval).Should(ContainElement(tenantFinalizer)) podReq, _ := resourcehelper.PodRequestsAndLimits(podWithoutLabel) By("Checking update node ready condition") node1.Status.Conditions[0] = corev1.NodeCondition{ @@ -362,12 +432,33 @@ var _ = Describe("ResourceRequest Operator", func() { } return true }, timeout, interval).Should(BeTrue()) + By("Checking if ResourceOffer has been update and has correct amount of resources") + Eventually(func() bool { + nodeList := []corev1.ResourceList{ + 0: node2.Status.Allocatable, + } + podList := []corev1.ResourceList{ + 0: podReq, + } + return checkResourceOfferUpdate(nodeList, podList) + }, timeout, interval).Should(BeTrue()) node1.Status.Conditions[0] = corev1.NodeCondition{ Type: corev1.NodeReady, Status: corev1.ConditionTrue, } node1, err = clientset.CoreV1().Nodes().UpdateStatus(ctx, node1, metav1.UpdateOptions{}) Expect(err).ToNot(HaveOccurred()) + By("Checking inserting of node1 again in ResourceOffer") + Eventually(func() bool { + nodeList := []corev1.ResourceList{ + 0: node1.Status.Allocatable, + 1: node2.Status.Allocatable, + } + podList := []corev1.ResourceList{ + 0: podReq, + } + return checkResourceOfferUpdate(nodeList, podList) + }, timeout, interval).Should(BeTrue()) Eventually(func() bool { resourcesRead := newBroadcaster.ReadResources(homeClusterID) for resourceName, quantity := range resourcesRead { @@ -381,7 +472,8 @@ var _ = Describe("ResourceRequest Operator", func() { } return true }, timeout, interval).Should(BeTrue()) - By("Checking update resources") + + By("Checking update node resources") toUpdate := node1.Status.Allocatable.DeepCopy() for _, quantity := range toUpdate { quantity.Sub(*resource.NewQuantity(1, quantity.Format)) @@ -402,6 +494,17 @@ var _ = Describe("ResourceRequest Operator", func() { } return true }, timeout, interval).Should(BeTrue()) + By("Checking if ResourceOffer has been updated correctly") + Eventually(func() bool { + nodeList := []corev1.ResourceList{ + 0: node2.Status.Allocatable, + 1: node1.Status.Allocatable, + } + podList := []corev1.ResourceList{ + 0: podReq, + } + return checkResourceOfferUpdate(nodeList, podList) + }, timeout, interval).Should(BeTrue()) By("Checking Node Delete") err = clientset.CoreV1().Nodes().Delete(ctx, node1.Name, metav1.DeleteOptions{}) Expect(err).ToNot(HaveOccurred()) @@ -417,6 +520,16 @@ var _ = Describe("ResourceRequest Operator", func() { } return true }, timeout, interval).Should(BeTrue()) + By("Checking if ResourceOffer has been updated correctly") + Eventually(func() bool { + nodeList := []corev1.ResourceList{ + 0: node2.Status.Allocatable, + } + podList := []corev1.ResourceList{ + 0: podReq, + } + return checkResourceOfferUpdate(nodeList, podList) + }, timeout, interval).Should(BeTrue()) }) }) }) diff --git a/internal/resource-request-operator/suite_test.go b/internal/resource-request-operator/suite_test.go index f2682c7eca..95d86df14d 100644 --- a/internal/resource-request-operator/suite_test.go +++ b/internal/resource-request-operator/suite_test.go @@ -26,7 +26,7 @@ import ( var ( cfg *rest.Config k8sClient client.Client - clusterId string + clusterID string clientset kubernetes.Interface testEnv *envtest.Environment newBroadcaster Broadcaster @@ -71,7 +71,10 @@ func createCluster() { }) Expect(err).ToNot(HaveOccurred()) clientset = kubernetes.NewForConfigOrDie(k8sManager.GetConfig()) - err = newBroadcaster.SetupBroadcaster(clientset, 5*time.Second) + clusterID = clusterid.NewStaticClusterID("test-cluster").GetClusterID() + updater := OfferUpdater{} + updater.Setup(clusterID, k8sManager.GetScheme(), &newBroadcaster, k8sManager.GetClient()) + err = newBroadcaster.SetupBroadcaster(clientset, &updater, 5*time.Second, 5) Expect(err).ToNot(HaveOccurred()) newBroadcaster.StartBroadcaster(ctx, &group) testClusterConf := &configv1alpha1.ClusterConfig{ @@ -84,11 +87,10 @@ func createCluster() { }, } newBroadcaster.setConfig(testClusterConf) - clusterId = clusterid.NewStaticClusterID("test-cluster").GetClusterID() err = (&ResourceRequestReconciler{ Client: k8sManager.GetClient(), Scheme: k8sManager.GetScheme(), - ClusterID: clusterId, + ClusterID: clusterID, Broadcaster: &newBroadcaster, }).SetupWithManager(k8sManager) Expect(err).ToNot(HaveOccurred()) diff --git a/internal/resource-request-operator/utils.go b/internal/resource-request-operator/utils.go index 7bb9c582ec..b1bc0d38bf 100644 --- a/internal/resource-request-operator/utils.go +++ b/internal/resource-request-operator/utils.go @@ -3,9 +3,7 @@ package resourcerequestoperator import ( "context" "fmt" - "time" - corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -15,49 +13,9 @@ import ( discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1" sharingv1alpha1 "github.com/liqotech/liqo/apis/sharing/v1alpha1" - crdreplicator "github.com/liqotech/liqo/internal/crdReplicator" "github.com/liqotech/liqo/pkg/discovery" ) -// generateResourceOffer generates a new local ResourceOffer. -func (r *ResourceRequestReconciler) generateResourceOffer(ctx context.Context, request *discoveryv1alpha1.ResourceRequest) error { - resources := r.Broadcaster.ReadResources(request.Spec.ClusterIdentity.ClusterID) - offer := &sharingv1alpha1.ResourceOffer{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: request.GetNamespace(), - Name: offerPrefix + r.ClusterID, - }, - } - - op, err := controllerutil.CreateOrUpdate(ctx, r.Client, offer, func() error { - offer.Labels = map[string]string{ - discovery.ClusterIDLabel: request.Spec.ClusterIdentity.ClusterID, - crdreplicator.LocalLabelSelector: "true", - crdreplicator.DestinationLabel: request.Spec.ClusterIdentity.ClusterID, - } - creationTime := metav1.NewTime(time.Now()) - spec := sharingv1alpha1.ResourceOfferSpec{ - ClusterId: r.ClusterID, - Images: []corev1.ContainerImage{}, - ResourceQuota: corev1.ResourceQuotaSpec{ - Hard: resources, - }, - Labels: r.Broadcaster.clusterConfig.Spec.DiscoveryConfig.ClusterLabels, - Timestamp: creationTime, - TimeToLive: metav1.NewTime(creationTime.Add(timeToLive)), - } - offer.Spec = spec - return controllerutil.SetControllerReference(request, offer, r.Scheme) - }) - - if err != nil { - klog.Error(err) - return err - } - klog.Infof("%s -> %s Offer: %s/%s", r.ClusterID, op, offer.Namespace, offer.Name) - return nil -} - // ensureForeignCluster ensures the ForeignCluster existence, if not exists we have to add a new one // with IncomingPeering discovery method. func (r *ResourceRequestReconciler) ensureForeignCluster(ctx context.Context, diff --git a/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_test.go b/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_test.go index 34b11e738f..6cd3f43cf8 100644 --- a/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_test.go +++ b/pkg/liqo-controller-manager/resourceoffer-controller/resourceoffer_controller_test.go @@ -155,9 +155,7 @@ var _ = Describe("ResourceOffer Controller", func() { }, }, Spec: sharingv1alpha1.ResourceOfferSpec{ - ClusterId: clusterID, - Timestamp: metav1.Now(), - TimeToLive: metav1.NewTime(time.Now().Add(1 * time.Hour)), + ClusterId: clusterID, }, }, expectedPhase: sharingv1alpha1.ResourceOfferAccepted, @@ -171,9 +169,7 @@ var _ = Describe("ResourceOffer Controller", func() { Namespace: testNamespace, }, Spec: sharingv1alpha1.ResourceOfferSpec{ - ClusterId: clusterID, - Timestamp: metav1.Now(), - TimeToLive: metav1.NewTime(time.Now().Add(1 * time.Hour)), + ClusterId: clusterID, }, }, expectedPhase: "", @@ -193,9 +189,7 @@ var _ = Describe("ResourceOffer Controller", func() { }, }, Spec: sharingv1alpha1.ResourceOfferSpec{ - ClusterId: clusterID, - Timestamp: metav1.Now(), - TimeToLive: metav1.NewTime(time.Now().Add(1 * time.Hour)), + ClusterId: clusterID, }, } key := client.ObjectKeyFromObject(resourceOffer) diff --git a/pkg/virtualKubelet/liqoNodeProvider/nodeProvider_test.go b/pkg/virtualKubelet/liqoNodeProvider/nodeProvider_test.go index 35576002a2..29b79efe25 100644 --- a/pkg/virtualKubelet/liqoNodeProvider/nodeProvider_test.go +++ b/pkg/virtualKubelet/liqoNodeProvider/nodeProvider_test.go @@ -172,9 +172,7 @@ var _ = Describe("NodeProvider", func() { }, }, Spec: sharingv1alpha1.ResourceOfferSpec{ - ClusterId: "remote-id", - Timestamp: metav1.NewTime(time.Now()), - TimeToLive: metav1.NewTime(time.Now().Add(1 * time.Hour)), + ClusterId: "remote-id", ResourceQuota: v1.ResourceQuotaSpec{ Hard: v1.ResourceList{ v1.ResourceCPU: *resource.NewQuantity(2, resource.DecimalSI), @@ -266,9 +264,7 @@ var _ = Describe("NodeProvider", func() { }, }, Spec: sharingv1alpha1.ResourceOfferSpec{ - ClusterId: "remote-id", - Timestamp: metav1.NewTime(time.Now()), - TimeToLive: metav1.NewTime(time.Now().Add(1 * time.Hour)), + ClusterId: "remote-id", ResourceQuota: v1.ResourceQuotaSpec{ Hard: v1.ResourceList{ v1.ResourceCPU: *resource.NewQuantity(2, resource.DecimalSI),