Skip to content

Commit

Permalink
storageclusterpeer: setup the reconciler
Browse files Browse the repository at this point in the history
Signed-off-by: Rewant Soni <resoni@redhat.com>
  • Loading branch information
rewantsoni committed Feb 29, 2024
1 parent 3b5ad3b commit d8b406f
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 13 deletions.
6 changes: 6 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ rules:
verbs:
- delete
- list
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
- apiGroups:
- k8s.cni.cncf.io
resources:
Expand Down
206 changes: 194 additions & 12 deletions controllers/storageclusterpeer/storageclusterpeer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,227 @@ package storageclusterpeer

import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
providerClient "github.com/red-hat-storage/ocs-operator/v4/services/provider/client"
rookCephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
"sigs.k8s.io/controller-runtime/pkg/handler"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

const rbdMirrorBootstrapPeerSecretName = "rbdMirrorBootstrapPeerSecretName"

// StorageClusterPeerReconciler reconciles a StorageClusterPeer object
// nolint:revive
type StorageClusterPeerReconciler struct {
client.Client
Scheme *runtime.Scheme
Log logr.Logger

ctx context.Context
storageClusterPeer *ocsv1.StorageClusterPeer
cephBlockPoolList *rookCephv1.CephBlockPoolList
}

//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers/finalizers,verbs=update
//+kubebuilder:rbac:groups=ceph.rook.io,resources=cephblockpools;,verbs=list;create;update
//+kubebuilder:rbac:groups=core,resources=secrets,verbs=get

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the StorageClusterPeer object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.13.0/pkg/reconcile
func (r *StorageClusterPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)
func (r *StorageClusterPeerReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {

r.ctx = ctrllog.IntoContext(ctx, r.Log)
r.Log.Info("Reconciling StorageClusterPeer.")

// Fetch the StorageClassRequest instance
r.storageClusterPeer = &ocsv1.StorageClusterPeer{}
r.storageClusterPeer.Name = request.Name
r.storageClusterPeer.Namespace = request.Namespace

if err := r.get(r.storageClusterPeer); err != nil {
if errors.IsNotFound(err) {
r.Log.Info("StorageClusterPeer resource not found. Ignoring since object must be deleted.")
return reconcile.Result{}, nil
}
r.Log.Error(err, "Failed to get StorageClusterPeer.")
return reconcile.Result{}, err
}

r.storageClusterPeer.Status.Phase = ocsv1.StorageClusterPeerInitializing

r.cephBlockPoolList = &rookCephv1.CephBlockPoolList{}
if err := r.list(r.cephBlockPoolList, client.InNamespace(r.storageClusterPeer.Namespace)); err != nil {
return reconcile.Result{}, err
}

var result reconcile.Result
var reconcileError error

// TODO(user): your logic here
result, reconcileError = r.reconcilePhases()

return ctrl.Result{}, nil
// Apply status changes to the StorageClassRequest
statusError := r.Client.Status().Update(r.ctx, r.storageClusterPeer)
if statusError != nil {
r.Log.Info("Failed to update StorageClassRequest status.")
}

// Reconcile errors have higher priority than status update errors
if reconcileError != nil {
return result, reconcileError
}

if statusError != nil {
return result, statusError
}

return result, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *StorageClusterPeerReconciler) SetupWithManager(mgr ctrl.Manager) error {

enqueueStorageClusterPeerRequest := handler.EnqueueRequestsFromMapFunc(
func(context context.Context, obj client.Object) []reconcile.Request {
// Get the StorageClusterPeer objects
scpList := &ocsv1.StorageClusterPeerList{}
err := r.Client.List(context, scpList, &client.ListOptions{Namespace: obj.GetNamespace()})
if err != nil {
r.Log.Error(err, "Unable to list StorageClusterPeer objects")
return []reconcile.Request{}
}

// Return name and namespace of the StorageClusterPeers object
request := []reconcile.Request{}
for _, scp := range scpList.Items {
request = append(request, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: scp.Namespace,
Name: scp.Name,
},
})
}

return request
},
)

return ctrl.NewControllerManagedBy(mgr).
For(&ocsv1.StorageClusterPeer{}).
For(&ocsv1.StorageClusterPeer{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&rookCephv1.CephBlockPool{}, enqueueStorageClusterPeerRequest).
Complete(r)
}

func (r *StorageClusterPeerReconciler) reconcilePhases() (reconcile.Result, error) {
r.Log.Info("Running StorageClusterPeer controller")

//marked for deletion
if !r.storageClusterPeer.GetDeletionTimestamp().IsZero() {
r.storageClusterPeer.Status.Phase = ocsv1.StorageClusterPeerDeleting
//TODO: deletion?
fmt.Println("Marked for deletion...")
return reconcile.Result{}, nil
}
// not marked for deletion
ocsClient, err := r.newExternalClient()
if err != nil {
return reconcile.Result{}, err
}
defer ocsClient.Close()

r.storageClusterPeer.Status.Phase = ocsv1.StorageClusterPeerConfiguring

if err := r.reconcileCephBlockPool(); err != nil {
return reconcile.Result{}, err
}

if err := r.reconcilePeerBlockPool(ocsClient); err != nil {
return reconcile.Result{}, err
}

r.storageClusterPeer.Status.Phase = ocsv1.StorageClusterPeerReady

return reconcile.Result{}, nil
}

func (r *StorageClusterPeerReconciler) newExternalClient() (*providerClient.OCSProviderClient, error) {
ocsClient, err := providerClient.NewProviderClient(r.ctx, r.storageClusterPeer.Spec.OCSAPIServerURI, time.Second*10)
if err != nil {
return nil, fmt.Errorf("failed to create a new ocs client: %v", err)
}
return ocsClient, nil
}

func (r *StorageClusterPeerReconciler) reconcileCephBlockPool() error {

for _, cephBlockPool := range r.cephBlockPoolList.Items {

_, err := ctrl.CreateOrUpdate(r.ctx, r.Client, &cephBlockPool, func() error {
cephBlockPool.Spec.Mirroring.Enabled = true
cephBlockPool.Spec.Mirroring.Mode = "image"
return nil
})
if err != nil {
r.Log.Error(err, "unable to enable mirroring on the cephBlockPool",
"CephBlockPool", klog.KRef(cephBlockPool.Namespace, cephBlockPool.Name))
return err
}
}
return nil
}

func (r *StorageClusterPeerReconciler) reconcilePeerBlockPool(ocsClient *providerClient.OCSProviderClient) error {

for _, cephBlockPool := range r.cephBlockPoolList.Items {
if _, ok := cephBlockPool.Status.Info[rbdMirrorBootstrapPeerSecretName]; !ok {
r.Log.Info("waiting for bootstrap secret to be generated",
"CephBlockPool", klog.KRef(cephBlockPool.Namespace, cephBlockPool.Name))
return fmt.Errorf("waiting for bootstrap secret for %s blockpool in %s namespace to be generated",
cephBlockPool.Name, cephBlockPool.Namespace)
}

secret := &v1.Secret{}
secret.Name = cephBlockPool.Status.Info[rbdMirrorBootstrapPeerSecretName]
secret.Namespace = cephBlockPool.Namespace

if err := r.get(secret); err != nil {
r.Log.Error(err, "unable to fetch the bootstrap secret for blockPool",
"CephBlockPool", klog.KRef(cephBlockPool.Namespace, cephBlockPool.Name))
return err
}

_, err := ocsClient.PeerBlockPool(r.ctx, cephBlockPool.Name, secret.Data["pool"], secret.Data["token"])
if err != nil {
return err
}

}
return nil
}

func (r *StorageClusterPeerReconciler) get(obj client.Object) error {
key := client.ObjectKeyFromObject(obj)
return r.Client.Get(r.ctx, key, obj)
}

func (r *StorageClusterPeerReconciler) list(obj client.ObjectList, listOptions ...client.ListOption) error {
return r.Client.List(r.ctx, obj, listOptions...)
}
6 changes: 6 additions & 0 deletions deploy/csv-templates/ocs-operator.csv.yaml.in
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,12 @@ spec:
verbs:
- delete
- list
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
- apiGroups:
- k8s.cni.cncf.io
resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2090,6 +2090,12 @@ spec:
verbs:
- delete
- list
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
- apiGroups:
- k8s.cni.cncf.io
resources:
Expand Down
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ import (
apiclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
metrics "sigs.k8s.io/controller-runtime/pkg/metrics/server"

// +kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -217,13 +216,16 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "StorageClassRequest")
os.Exit(1)
}

if err = (&storageclusterpeer.StorageClusterPeerReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Log: ctrl.Log.WithName("controllers").WithName("StorageClusterPeer"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "StorageClusterPeer")
os.Exit(1)
}

// +kubebuilder:scaffold:builder

// Create OCSInitialization CR if it's not present
Expand Down

0 comments on commit d8b406f

Please sign in to comment.