Skip to content

Commit

Permalink
storageclusterpeer: setup the reconciler
Browse files Browse the repository at this point in the history
Signed-off-by: Rewant Soni <resoni@redhat.com>
  • Loading branch information
rewantsoni committed Mar 5, 2024
1 parent 459ecc2 commit 0847bb6
Show file tree
Hide file tree
Showing 5 changed files with 298 additions and 12 deletions.
6 changes: 6 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ rules:
verbs:
- delete
- list
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
- apiGroups:
- k8s.cni.cncf.io
resources:
Expand Down
288 changes: 277 additions & 11 deletions controllers/storageclusterpeer/storageclusterpeer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,311 @@ package storageclusterpeer

import (
"context"
"crypto/md5"
"encoding/hex"
"fmt"
"time"

"github.com/go-logr/logr"
ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
"github.com/red-hat-storage/ocs-operator/v4/controllers/util"
providerClient "github.com/red-hat-storage/ocs-operator/v4/services/provider/client"
rookCephv1 "github.com/rook/rook/pkg/apis/ceph.rook.io/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/utils/strings/slices"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/handler"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
const (
rbdMirrorBootstrapPeerSecretName = "rbdMirrorBootstrapPeerSecretName"
)

// StorageClusterPeerReconciler reconciles a StorageClusterPeer object
// nolint:revive
type StorageClusterPeerReconciler struct {
client.Client
Scheme *runtime.Scheme
Log logr.Logger

ctx context.Context
storageClusterPeer *ocsv1.StorageClusterPeer
cephBlockPoolList *rookCephv1.CephBlockPoolList
}

//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers/finalizers,verbs=update
//+kubebuilder:rbac:groups=ceph.rook.io,resources=cephblockpools;,verbs=list;create;update
//+kubebuilder:rbac:groups=core,resources=secrets,verbs=get

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the StorageClusterPeer object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.13.0/pkg/reconcile
func (r *StorageClusterPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)
func (r *StorageClusterPeerReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {

r.ctx = ctrllog.IntoContext(ctx, r.Log)
r.Log.Info("Reconciling StorageClusterPeer.")

// Fetch the StorageClusterPeer instance
r.storageClusterPeer = &ocsv1.StorageClusterPeer{}
r.storageClusterPeer.Name = request.Name
r.storageClusterPeer.Namespace = request.Namespace

if err := r.get(r.storageClusterPeer); err != nil {
if errors.IsNotFound(err) {
r.Log.Info("StorageClusterPeer resource not found. Ignoring since object must be deleted.")
return reconcile.Result{}, nil
}
r.Log.Error(err, "Failed to get StorageClusterPeer.")
return reconcile.Result{}, err
}

r.storageClusterPeer.Status.Phase = ocsv1.StorageClusterPeerInitializing

// TODO(user): your logic here
r.cephBlockPoolList = &rookCephv1.CephBlockPoolList{}
if err := r.list(r.cephBlockPoolList, client.InNamespace(r.storageClusterPeer.Namespace)); err != nil {
return reconcile.Result{}, err
}

return ctrl.Result{}, nil
var result reconcile.Result
var reconcileError error

result, reconcileError = r.reconcilePhases()

// Apply status changes to the StorageClusterPeer
statusError := r.Client.Status().Update(r.ctx, r.storageClusterPeer)
if statusError != nil {
r.Log.Info("Failed to update StorageClusterPeer status.")
}

// Reconcile errors have higher priority than status update errors
if reconcileError != nil {
return result, reconcileError
}

if statusError != nil {
return result, statusError
}

return result, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *StorageClusterPeerReconciler) SetupWithManager(mgr ctrl.Manager) error {

enqueueStorageClusterPeerRequest := handler.EnqueueRequestsFromMapFunc(
func(context context.Context, obj client.Object) []reconcile.Request {
// Get the StorageClusterPeer objects
scpList := &ocsv1.StorageClusterPeerList{}
err := r.Client.List(context, scpList, &client.ListOptions{Namespace: obj.GetNamespace()})
if err != nil {
r.Log.Error(err, "Unable to list StorageClusterPeer objects")
return []reconcile.Request{}
}

// Return name and namespace of the StorageClusterPeers object
request := []reconcile.Request{}
for _, scp := range scpList.Items {
request = append(request, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: scp.Namespace,
Name: scp.Name,
},
})
}

return request
},
)

return ctrl.NewControllerManagedBy(mgr).
For(&ocsv1.StorageClusterPeer{}).
For(&ocsv1.StorageClusterPeer{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&rookCephv1.CephBlockPool{}, enqueueStorageClusterPeerRequest).
Complete(r)
}

func (r *StorageClusterPeerReconciler) reconcilePhases() (reconcile.Result, error) {
r.Log.Info("Running StorageClusterPeer controller")

//marked for deletion
if !r.storageClusterPeer.GetDeletionTimestamp().IsZero() {
r.storageClusterPeer.Status.Phase = ocsv1.StorageClusterPeerDeleting

ocsClient, err := r.newExternalClient()
if err != nil {
return reconcile.Result{}, err
}
defer ocsClient.Close()

if err := r.reconcileRevokeBlockPoolPeering(ocsClient); err != nil {
return reconcile.Result{}, err
}

//remove finalizer
r.Log.Info("removing finalizer from the StorageClusterPeer resource")
r.storageClusterPeer.SetFinalizers(slices.Filter([]string{}, r.storageClusterPeer.GetFinalizers(), func(s string) bool {
return s != ocsv1.StorageClusterPeerFinalizer
}))
if err := r.Client.Update(r.ctx, r.storageClusterPeer); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to remove finalizer from StorageClusterPeer: %v", err)
}
r.Log.Info("finalizer removed successfully")
return reconcile.Result{}, nil
}
//not marked for deletion

// add finalizer
slices.Contains(r.storageClusterPeer.GetFinalizers(), ocsv1.StorageClusterPeerFinalizer)
if !slices.Contains(r.storageClusterPeer.GetFinalizers(), ocsv1.StorageClusterPeerFinalizer) {
r.Log.V(-1).Info("finalizer missing on the StorageClusterPeer resource, adding...")
r.storageClusterPeer.SetFinalizers(append(r.storageClusterPeer.GetFinalizers(), ocsv1.StorageClusterPeerFinalizer))
if err := r.update(r.storageClusterPeer); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to update StorageClusterPeer with finalizer: %v", err)
}
}

ocsClient, err := r.newExternalClient()
if err != nil {
return reconcile.Result{}, err
}
defer ocsClient.Close()

r.storageClusterPeer.Status.Phase = ocsv1.StorageClusterPeerConfiguring

if err := r.reconcileCephBlockPool(); err != nil {
return reconcile.Result{}, err
}

if err := r.reconcilePeerBlockPool(ocsClient); err != nil {
return reconcile.Result{}, err
}

r.storageClusterPeer.Status.Phase = ocsv1.StorageClusterPeerReady

return reconcile.Result{}, nil
}

func (r *StorageClusterPeerReconciler) newExternalClient() (*providerClient.OCSProviderClient, error) {
ocsClient, err := providerClient.NewProviderClient(r.ctx, r.storageClusterPeer.Spec.OCSAPIServerURI, time.Second*10)
if err != nil {
return nil, fmt.Errorf("failed to create a new ocs client: %v", err)
}
return ocsClient, nil
}

func (r *StorageClusterPeerReconciler) reconcileCephBlockPool() error {

for _, cephBlockPool := range r.cephBlockPoolList.Items {

_, err := ctrl.CreateOrUpdate(r.ctx, r.Client, &cephBlockPool, func() error {
cephBlockPool.Spec.Mirroring.Enabled = true
cephBlockPool.Spec.Mirroring.Mode = "image"
return nil
})
if err != nil {
r.Log.Error(err, "unable to enable mirroring on the cephBlockPool",
"CephBlockPool", klog.KRef(cephBlockPool.Namespace, cephBlockPool.Name))
return err
}
}
return nil
}

func (r *StorageClusterPeerReconciler) reconcilePeerBlockPool(ocsClient *providerClient.OCSProviderClient) error {

clusterID := util.GetClusterID(r.ctx, r.Client, &r.Log)
if clusterID == "" {
err := fmt.Errorf("failed to get clusterID from the ClusterVersion CR")
r.Log.Error(err, "failed to get ClusterVersion of the OCP Cluster")
return err
}

for _, cephBlockPool := range r.cephBlockPoolList.Items {

if cephBlockPool.Status.Info == nil || cephBlockPool.Status.Info[rbdMirrorBootstrapPeerSecretName] == "" {
r.Log.Info("waiting for bootstrap secret to be generated",
"CephBlockPool", klog.KRef(cephBlockPool.Namespace, cephBlockPool.Name))
return fmt.Errorf("waiting for bootstrap secret for %s blockpool in %s namespace to be generated",
cephBlockPool.Name, cephBlockPool.Namespace)
}

secret := &v1.Secret{}
secret.Name = cephBlockPool.Status.Info[rbdMirrorBootstrapPeerSecretName]
secret.Namespace = cephBlockPool.Namespace

if err := r.get(secret); err != nil {
r.Log.Error(err, "unable to fetch the bootstrap secret for blockPool",
"CephBlockPool", klog.KRef(cephBlockPool.Namespace, cephBlockPool.Name))
return err
}

_, err := ocsClient.PeerBlockPool(r.ctx, generateSecretName(cephBlockPool.Name, clusterID), secret.Data["pool"], secret.Data["token"])
if err != nil {
return err
}
}
return nil
}

func (r *StorageClusterPeerReconciler) reconcileRevokeBlockPoolPeering(ocsClient *providerClient.OCSProviderClient) error {

clusterID := util.GetClusterID(r.ctx, r.Client, &r.Log)
if clusterID == "" {
err := fmt.Errorf("failed to get clusterID from the ClusterVersion CR")
r.Log.Error(err, "failed to get ClusterVersion of the OCP Cluster")
return err
}

for _, cephBlockPool := range r.cephBlockPoolList.Items {

_, err := ocsClient.RevokeBlockPoolPeering(r.ctx, generateSecretName(cephBlockPool.Name, clusterID), []byte(cephBlockPool.Name))
if err != nil {
return err
}

_, err = ctrl.CreateOrUpdate(r.ctx, r.Client, &cephBlockPool, func() error {
// if bootstrap secret is set then it might be used as a backup for some other cluster
if cephBlockPool.Spec.Mirroring.Peers == nil || len(cephBlockPool.Spec.Mirroring.Peers.SecretNames) == 0 {
cephBlockPool.Spec.Mirroring.Enabled = false
cephBlockPool.Spec.Mirroring.Mode = ""
}
return nil
})
if err != nil {
r.Log.Error(err, "unable to disable mirroring on the cephBlockPool",
"CephBlockPool", klog.KRef(cephBlockPool.Namespace, cephBlockPool.Name))
return err
}
}
return nil
}

func (r *StorageClusterPeerReconciler) get(obj client.Object) error {
key := client.ObjectKeyFromObject(obj)
return r.Client.Get(r.ctx, key, obj)
}

func (r *StorageClusterPeerReconciler) list(obj client.ObjectList, listOptions ...client.ListOption) error {
return r.Client.List(r.ctx, obj, listOptions...)
}

func (r *StorageClusterPeerReconciler) update(obj client.Object, updateOptions ...client.UpdateOption) error {
return r.Client.Update(r.ctx, obj, updateOptions...)
}

func generateSecretName(poolName, clusterID string) string {
md5Sum := md5.Sum([]byte(fmt.Sprintf("%s-%s", poolName, clusterID)))
return fmt.Sprintf("bootstrap-secret-%s", hex.EncodeToString(md5Sum[:16]))
}
6 changes: 6 additions & 0 deletions deploy/csv-templates/ocs-operator.csv.yaml.in
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,12 @@ spec:
verbs:
- delete
- list
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
- apiGroups:
- k8s.cni.cncf.io
resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2090,6 +2090,12 @@ spec:
verbs:
- delete
- list
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
- apiGroups:
- k8s.cni.cncf.io
resources:
Expand Down
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ import (
apiclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
metrics "sigs.k8s.io/controller-runtime/pkg/metrics/server"

// +kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -217,13 +216,16 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "StorageClassRequest")
os.Exit(1)
}

if err = (&storageclusterpeer.StorageClusterPeerReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Log: ctrl.Log.WithName("controllers").WithName("StorageClusterPeer"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "StorageClusterPeer")
os.Exit(1)
}

// +kubebuilder:scaffold:builder

// Create OCSInitialization CR if it's not present
Expand Down

0 comments on commit 0847bb6

Please sign in to comment.