Skip to content

Commit

Permalink
add resource offer operator
Browse files Browse the repository at this point in the history
  • Loading branch information
aleoli committed May 24, 2021
1 parent 0c73e78 commit 2b13f3e
Show file tree
Hide file tree
Showing 12 changed files with 477 additions and 69 deletions.
21 changes: 12 additions & 9 deletions apis/sharing/v1alpha1/resourceoffer_types.go
Expand Up @@ -18,10 +18,6 @@ type ResourceOfferSpec struct {
ResourceQuota corev1.ResourceQuotaSpec `json:"resourceQuota,omitempty"`
// Labels contains the label to be added to the virtual node.
Labels map[string]string `json:"labels,omitempty"`
// Neighbors is a map where the key is the name of a virtual node (representing a foreign cluster) and the value are the resources allocatable on that node.
Neighbors map[corev1.ResourceName]corev1.ResourceList `json:"neighbors,omitempty"`
// Properties can contain any additional information about the cluster.
Properties map[corev1.ResourceName]string `json:"properties,omitempty"`
// Prices contains the possible prices for every kind of resource (cpu, memory, image).
Prices corev1.ResourceList `json:"prices,omitempty"`
// Timestamp is the time instant when this ResourceOffer was created.
Expand All @@ -35,19 +31,26 @@ type ResourceOfferSpec struct {
type OfferPhase string

const (
// ResourceOfferPending indicates a pending phase, an action is required.
ResourceOfferPending OfferPhase = "Pending"
// ResourceOfferManualActionRequired indicates that a manual action is required.
ResourceOfferManualActionRequired OfferPhase = "ManualActionRequired"
// ResourceOfferAccepted indicates an accepted offer.
ResourceOfferAccepted OfferPhase = "Accepted"
ResourceOfferRefused OfferPhase = "Refused"
// ResourceOfferRefused indicates a refused offer.
ResourceOfferRefused OfferPhase = "Refused"
)

// ResourceOfferStatus defines the observed state of ResourceOffer.
type ResourceOfferStatus struct {
// ResourceOfferStatus is the status of this ResourceOffer.
// Phase is the status of this ResourceOffer.
// When the offer is created it is checked by the operator, which sets this field to "Accepted" or "Refused" on tha base of cluster configuration.
// If the ResourceOffer is accepted a virtual-kubelet for the foreign cluster will be created.
// +kubebuilder:validation:Enum="";"Accepted";"Refused"
ResourceOfferStatus OfferPhase `json:"resourceOfferStatus"`
// +kubebuilder:validation:Enum="Pending";"ManualActionRequired";"Accepted";"Refused"
// +kubebuilder:default="Pending"
Phase OfferPhase `json:"phase"`
// VkCreated indicates if the virtual-kubelet for this ResourceOffer has been created or not.
VkCreated bool `json:"vkCreated"`
VkCreated bool `json:"vkCreated,omitempty"`
// VkReference is a reference to the deployment running the virtual-kubelet.
VkReference object_references.DeploymentReference `json:"vkReference,omitempty"`
// VnodeReference is a reference to the virtual node linked to this ResourceOffer
Expand Down
24 changes: 0 additions & 24 deletions apis/sharing/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 33 additions & 8 deletions cmd/advertisement-operator/main.go
Expand Up @@ -26,13 +26,15 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/klog"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"

discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1"
netv1alpha1 "github.com/liqotech/liqo/apis/net/v1alpha1"
advtypes "github.com/liqotech/liqo/apis/sharing/v1alpha1"
advop "github.com/liqotech/liqo/internal/advertisementoperator"
resourceRequestOperator "github.com/liqotech/liqo/internal/resource-request-operator"
crdclient "github.com/liqotech/liqo/pkg/crdClient"
resourceoffercontroller "github.com/liqotech/liqo/pkg/liqo-controller-manager/resourceoffer-controller"
"github.com/liqotech/liqo/pkg/mapperUtils"
"github.com/liqotech/liqo/pkg/vkMachinery"
"github.com/liqotech/liqo/pkg/vkMachinery/csr"
Expand Down Expand Up @@ -64,14 +66,18 @@ func init() {

func main() {
var metricsAddr, localKubeconfig, clusterId string
var probeAddr string
var enableLeaderElection bool
var kubeletNamespace, kubeletImage, initKubeletImage string
var resyncPeriod = 5 * time.Second

const resyncPeriod = 30 * time.Second

flag.StringVar(&metricsAddr, "metrics-addr", defaultMetricsaddr, "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection,
"enable-leader-election", false,
"Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")

flag.StringVar(&localKubeconfig, "local-kubeconfig", "", "The path to the kubeconfig of your local cluster.")
flag.StringVar(&clusterId, "cluster-id", "", "The cluster ID of your cluster")
flag.StringVar(&kubeletNamespace,
Expand All @@ -81,6 +87,7 @@ func main() {
flag.StringVar(&initKubeletImage,
"init-kubelet-image", defaultInitVKImage,
"The image of the virtual kubelet init container to be deployed")

flag.Parse()

if clusterId == "" {
Expand All @@ -95,11 +102,13 @@ func main() {
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
MapperProvider: mapperUtils.LiqoMapperProvider(scheme),
Scheme: scheme,
MetricsBindAddress: "0",
LeaderElection: enableLeaderElection,
Port: 9443,
MapperProvider: mapperUtils.LiqoMapperProvider(scheme),
Scheme: scheme,
MetricsBindAddress: metricsAddr,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "66cf253f.liqo.io",
Port: 9443,
})
if err != nil {
klog.Error(err)
Expand Down Expand Up @@ -155,7 +164,7 @@ func main() {
AcceptedAdvNum: acceptedAdv,
AdvClient: advClient,
DiscoveryClient: discoveryClient,
RetryTimeout: 1 * time.Minute,
RetryTimeout: resyncPeriod,
}

if err = advertisementReconciler.SetupWithManager(mgr); err != nil {
Expand All @@ -173,17 +182,33 @@ func main() {
klog.Fatal(err)
}

resourceOfferReconciler := resourceoffercontroller.NewResourceOfferController(mgr, resyncPeriod)
if err = resourceOfferReconciler.SetupWithManager(mgr); err != nil {
klog.Fatal(err)
}

// +kubebuilder:scaffold:builder

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
klog.Error(err, " unable to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
klog.Error(err, " unable to set up ready check")
os.Exit(1)
}

c := make(chan struct{})
var wg = &sync.WaitGroup{}
client, err := advertisementReconciler.InitCRDClient(localKubeconfig)
if err != nil {
os.Exit(1)
}
wg.Add(2)
wg.Add(3)
go advertisementReconciler.CleanOldAdvertisements(c, wg)
// TODO: this configuration watcher will be refactored before the release 0.3
go advertisementReconciler.WatchConfiguration(localKubeconfig, client, wg)
go resourceOfferReconciler.WatchConfiguration(localKubeconfig, client, wg)

klog.Info("starting manager as advertisementoperator")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
Expand Down
36 changes: 8 additions & 28 deletions deployments/liqo/crds/sharing.liqo.io_resourceoffers.yaml
Expand Up @@ -79,21 +79,6 @@ spec:
description: Labels contains the label to be added to the virtual
node.
type: object
neighbors:
additionalProperties:
additionalProperties:
anyOf:
- type: integer
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: ResourceList is a set of (resource name, quantity)
pairs.
type: object
description: Neighbors is a map where the key is the name of a virtual
node (representing a foreign cluster) and the value are the resources
allocatable on that node.
type: object
prices:
additionalProperties:
anyOf:
Expand All @@ -104,12 +89,6 @@ spec:
description: Prices contains the possible prices for every kind of
resource (cpu, memory, image).
type: object
properties:
additionalProperties:
type: string
description: Properties can contain any additional information about
the cluster.
type: object
resourceQuota:
description: ResourceQuota contains the quantity of resources made
available by the cluster.
Expand Down Expand Up @@ -192,14 +171,16 @@ spec:
status:
description: ResourceOfferStatus defines the observed state of ResourceOffer.
properties:
resourceOfferStatus:
description: ResourceOfferStatus is the status of this ResourceOffer.
When the offer is created it is checked by the operator, which sets
this field to "Accepted" or "Refused" on tha base of cluster configuration.
phase:
default: Pending
description: Phase is the status of this ResourceOffer. When the offer
is created it is checked by the operator, which sets this field
to "Accepted" or "Refused" on tha base of cluster configuration.
If the ResourceOffer is accepted a virtual-kubelet for the foreign
cluster will be created.
enum:
- ""
- Pending
- ManualActionRequired
- Accepted
- Refused
type: string
Expand Down Expand Up @@ -229,8 +210,7 @@ spec:
type: string
type: object
required:
- resourceOfferStatus
- vkCreated
- phase
type: object
type: object
served: true
Expand Down
14 changes: 14 additions & 0 deletions internal/crdReplicator/crdReplicator-operator.go
Expand Up @@ -40,6 +40,20 @@ var (
}
)

// ReplicatedResourcesLabelSelector is an helper label selector to list all the replicated resources.
var ReplicatedResourcesLabelSelector = metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: RemoteLabelSelector,
Operator: metav1.LabelSelectorOpExists,
},
{
Key: ReplicationStatuslabel,
Operator: metav1.LabelSelectorOpExists,
},
},
}

const (
operatorName = "crdReplicator-operator"
finalizer = "crdReplicator.liqo.io"
Expand Down
@@ -0,0 +1,19 @@
package resourceoffercontroller

import (
"time"

"sigs.k8s.io/controller-runtime/pkg/manager"
)

// NewResourceOfferController creates and returns a new reconciler for the ResourceOffers.
func NewResourceOfferController(mgr manager.Manager, resyncPeriod time.Duration) *ResourceOfferReconciler {
return &ResourceOfferReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),

eventsRecorder: mgr.GetEventRecorderFor("ResourceOffer"),

resyncPeriod: resyncPeriod,
}
}
2 changes: 2 additions & 0 deletions pkg/liqo-controller-manager/resourceoffer-controller/doc.go
@@ -0,0 +1,2 @@
// Package resourceoffercontroller implements the logic of the ResourceOffer Operator
package resourceoffercontroller
@@ -0,0 +1,103 @@
/*
Copyright 2021.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package resourceoffercontroller

import (
"context"
"sync"
"time"

kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"

configv1alpha1 "github.com/liqotech/liqo/apis/config/v1alpha1"
sharingv1alpha1 "github.com/liqotech/liqo/apis/sharing/v1alpha1"
crdreplicator "github.com/liqotech/liqo/internal/crdReplicator"
)

// ResourceOfferReconciler reconciles a ResourceOffer object.
type ResourceOfferReconciler struct {
client.Client
Scheme *runtime.Scheme

eventsRecorder record.EventRecorder

resyncPeriod time.Duration
configuration *configv1alpha1.ClusterConfig
configurationMutex sync.RWMutex
}

//+kubebuilder:rbac:groups=sharing.liqo.io,resources=resourceoffers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=sharing.liqo.io,resources=resourceoffers/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=sharing.liqo.io,resources=resourceoffers/finalizers,verbs=update

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *ResourceOfferReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// get resource offer
var resourceOffer sharingv1alpha1.ResourceOffer
if err := r.Get(ctx, req.NamespacedName, &resourceOffer); err != nil {
if kerrors.IsNotFound(err) {
// reconcile was triggered by a delete request
klog.Infof("ResourceRequest %v deleted", req.NamespacedName)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// not managed error
klog.Error(err)
return ctrl.Result{}, err
}

// we do that on ResourceOffer creation
if result, err := r.setOwnerReference(ctx, &resourceOffer); err != nil {
klog.Error(err)
return ctrl.Result{}, err
} else if result != controllerutil.OperationResultNone {
return ctrl.Result{}, nil
}

// filter resource offers and create a virtual-kubelet only for the good ones
if result, err := r.setResourceOfferPhase(ctx, &resourceOffer); err != nil {
klog.Error(err)
return ctrl.Result{}, err
} else if result != controllerutil.OperationResultNone {
return ctrl.Result{}, nil
}

// TODO: create the virtual-kubelet

return ctrl.Result{RequeueAfter: r.resyncPeriod}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *ResourceOfferReconciler) SetupWithManager(mgr ctrl.Manager) error {
p, err := predicate.LabelSelectorPredicate(crdreplicator.ReplicatedResourcesLabelSelector)
if err != nil {
klog.Error(err)
return err
}

return ctrl.NewControllerManagedBy(mgr).
For(&sharingv1alpha1.ResourceOffer{}).
WithEventFilter(p).
Complete(r)
}

0 comments on commit 2b13f3e

Please sign in to comment.