Skip to content

Commit

Permalink
rgw: refactor bucket notification code
Browse files Browse the repository at this point in the history
this should make sure code reuse between the OBC label controller and
the CephBucketNotification controller.
done as part of the cleanup work from:
rook#8426

this also include adding unit tests for:
- topic controller
- notification controller
- obc label controller

Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
  • Loading branch information
yuvalif authored and parth-gr committed Feb 22, 2022
1 parent 44fd2b0 commit 99284c5
Show file tree
Hide file tree
Showing 13 changed files with 1,256 additions and 276 deletions.
6 changes: 3 additions & 3 deletions pkg/operator/ceph/controller/predicate.go
Expand Up @@ -19,6 +19,7 @@ package controller
import (
"encoding/json"
"os"
"reflect"
"strings"
"syscall"

Expand Down Expand Up @@ -374,9 +375,8 @@ func WatchControllerPredicate() predicate.Funcs {
logger.Debugf("object %q matched on update but %q label is set, doing nothing", objNew.Name, DoNotReconcileLabelName)
return false
}
diff := cmp.Diff(objOld.Labels, objNew.Labels, resourceQtyComparer)
if diff != "" {
logger.Infof("CR labels has changed for %q. diff=%s", objNew.Name, diff)
if !reflect.DeepEqual(objOld.Labels, objNew.Labels) {
logger.Infof("CR labels has changed for %q", objNew.Name)
return true
} else if objOld.Spec.ObjectBucketName != objNew.Spec.ObjectBucketName {
logger.Infof("CR %q bucket name changed from %q to %q", objNew.Name, objOld.Spec.ObjectBucketName, objNew.Spec.ObjectBucketName)
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/ceph/object/bucket/provisioner.go
Expand Up @@ -428,7 +428,7 @@ func (p *Provisioner) composeObjectBucket() *bktv1alpha1.ObjectBucket {
},
},
AdditionalState: map[string]string{
cephUser: p.cephUserName,
CephUser: p.cephUserName,
},
}

Expand Down Expand Up @@ -685,7 +685,7 @@ func (p Provisioner) updateAdditionalSettings(ob *bktv1alpha1.ObjectBucket) erro
return errors.Wrapf(err, "failed to parse maxSize quota for user %q", p.cephUserName)
}
}
objectUser, err := p.adminOpsClient.GetUser(p.clusterInfo.Context, admin.User{ID: ob.Spec.Connection.AdditionalState[cephUser]})
objectUser, err := p.adminOpsClient.GetUser(p.clusterInfo.Context, admin.User{ID: ob.Spec.Connection.AdditionalState[CephUser]})
if err != nil {
return errors.Wrapf(err, "failed to fetch user %q", p.cephUserName)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/ceph/object/bucket/util.go
Expand Up @@ -35,7 +35,7 @@ var logger = capnslog.NewPackageLogger("github.com/rook/rook", "op-bucket-prov")

const (
genUserLen = 8
cephUser = "cephUser"
CephUser = "cephUser"
objectStoreName = "objectStoreName"
objectStoreNamespace = "objectStoreNamespace"
objectStoreEndpoint = "endpoint"
Expand Down Expand Up @@ -72,7 +72,7 @@ func isStaticBucket(sc *storagev1.StorageClass) (string, bool) {
}

func getCephUser(ob *bktv1alpha1.ObjectBucket) string {
return ob.Spec.AdditionalState[cephUser]
return ob.Spec.AdditionalState[CephUser]
}

func (p *Provisioner) getObjectStore() (*cephv1.CephObjectStore, error) {
Expand Down
165 changes: 75 additions & 90 deletions pkg/operator/ceph/object/notification/controller.go
Expand Up @@ -14,11 +14,11 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// Package notification to manage a rook bucket notifications.
package notification

import (
"context"
"strings"
"time"

"github.com/coreos/pkg/capnslog"
Expand All @@ -29,6 +29,8 @@ import (
cephclient "github.com/rook/rook/pkg/daemon/ceph/client"
"github.com/rook/rook/pkg/operator/ceph/cluster/mon"
opcontroller "github.com/rook/rook/pkg/operator/ceph/controller"
"github.com/rook/rook/pkg/operator/ceph/object"
"github.com/rook/rook/pkg/operator/ceph/object/bucket"
"github.com/rook/rook/pkg/operator/ceph/object/topic"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -55,8 +57,6 @@ var waitForRequeueIfObjectBucketNotReady = reconcile.Result{Requeue: true, Reque
type ReconcileNotifications struct {
client client.Client
context *clusterd.Context
clusterInfo *cephclient.ClusterInfo
clusterSpec *cephv1.ClusterSpec
opManagerContext context.Context
}

Expand Down Expand Up @@ -99,7 +99,6 @@ func addNotificationReconciler(mgr manager.Manager, r reconcile.Reconciler) erro
// The Controller will requeue the Request to be processed again if the returned error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (r *ReconcileNotifications) Reconcile(context context.Context, request reconcile.Request) (reconcile.Result, error) {
// workaround because the rook logging mechanism is not compatible with the controller-runtime logging interface
reconcileResponse, err := r.reconcile(request)
if err != nil {
logger.Errorf("failed to reconcile %v", err)
Expand All @@ -121,36 +120,6 @@ func (r *ReconcileNotifications) reconcile(request reconcile.Request) (reconcile
return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", request.NamespacedName)
}

// get the topic associated with the notification
bucketTopic := &cephv1.CephBucketTopic{}
topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic}
if err := r.client.Get(r.opManagerContext, topicName, bucketTopic); err != nil {
if kerrors.IsNotFound(err) {
logger.Infof("CephBucketTopic %q not provisioned yet", topicName)
return waitForRequeueIfTopicNotReady, nil
}
return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve CephBucketTopic %q", topicName)
}

// find the namespace for the ceph cluster (may be different than the namespace of the notification CR)
// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, reconcileResponse := opcontroller.IsReadyToReconcile(
r.opManagerContext,
r.client,
types.NamespacedName{Namespace: bucketTopic.Spec.ObjectStoreNamespace},
controllerName,
)
if !isReadyToReconcile {
// This handles the case where the Ceph Cluster is gone and we want to delete that CR
if !notification.GetDeletionTimestamp().IsZero() && !cephClusterExists {
// Return and do not requeue. Successful deletion.
return reconcile.Result{}, nil
}
logger.Debug("Ceph cluster not yet present.")
return reconcileResponse, nil
}
r.clusterSpec = &cephCluster.Spec

// DELETE: the CR was deleted
if !notification.GetDeletionTimestamp().IsZero() {
logger.Debugf("CephBucketNotification %q was deleted", notification.Name)
Expand All @@ -159,53 +128,24 @@ func (r *ReconcileNotifications) reconcile(request reconcile.Request) (reconcile
return reconcile.Result{}, nil
}

// Populate clusterInfo during each reconcile
r.clusterInfo, _, _, err = mon.LoadClusterInfo(r.context, r.opManagerContext, cephCluster.Namespace)
// get the topic associated with the notification, and make sure it is provisioned
topicName := types.NamespacedName{Namespace: notification.Namespace, Name: notification.Spec.Topic}
bucketTopic, err := topic.GetProvisioned(r.client, r.opManagerContext, topicName)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to populate cluster info")
logger.Infof("CephBucketTopic %q not provisioned yet", topicName)
return waitForRequeueIfTopicNotReady, nil
}

// create the notification if needed
return r.updateCephBucketNotification(notification, bucketTopic)
}

func getCephObjectStoreName(ob bktv1alpha1.ObjectBucket) (objectStoreName types.NamespacedName, err error) {
// prase the following string: <prefix>-rgw-<store>.<namespace>.svc
// to ge the object store name and namespace
logger.Debugf("BucketHost of %q is %q",
types.NamespacedName{Name: ob.Name, Namespace: ob.Namespace}.String(),
ob.Spec.Endpoint.BucketHost,
)
parsedBucketHost := strings.Split(ob.Spec.Endpoint.BucketHost, ".")
if len(parsedBucketHost) < 3 {
err = errors.Errorf("malformed BucketHost %q", ob.Spec.Endpoint.BucketHost)
return
// Populate clusterInfo during each reconcile
clusterInfo, clusterSpec, err := getReadyCluster(r.client, r.opManagerContext, *r.context, bucketTopic.Spec.ObjectStoreNamespace)
if err != nil {
return opcontroller.WaitForRequeueIfCephClusterNotReady, errors.Wrapf(err, "cluster is not ready")
}
parsedSubdomain := strings.Split(parsedBucketHost[0], "-rgw-")
if len(parsedSubdomain) != 2 {
err = errors.Errorf("malformed BucketHost subdomain %q", parsedBucketHost[0])
return
if clusterInfo == nil || clusterSpec == nil {
return opcontroller.WaitForRequeueIfCephClusterNotReady, nil
}
objectStoreName.Namespace = parsedBucketHost[1]
objectStoreName.Name = parsedSubdomain[1]
return
}

// verify that object store is configured correctly for OB, CephBucketNotification and CephBucketTopic
func validateObjectStoreName(topic *cephv1.CephBucketTopic, bucketStoreName types.NamespacedName) error {
topicStoreName := types.NamespacedName{Name: topic.Spec.ObjectStoreName, Namespace: topic.Spec.ObjectStoreNamespace}
if topicStoreName != bucketStoreName {
return errors.Errorf("object store name mismatch between topic and bucket. %q != %q", topicStoreName, bucketStoreName)
}
return nil
}

func (r *ReconcileNotifications) updateCephBucketNotification(notification *cephv1.CephBucketNotification, bucketTopic *cephv1.CephBucketTopic) (reconcile.Result, error) {
topicARN, err := topic.GetARN(bucketTopic)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to get topic ARN for CephBucketTopic %q", types.NamespacedName{Name: bucketTopic.Name, Namespace: bucketTopic.Namespace})
}
// fetch all OBCs that has a label matching this CBN
// fetch all OBCs that has a label matching this CephBucketNotification
namespace := notification.Namespace
bnName := types.NamespacedName{Namespace: namespace, Name: notification.Name}
namespaceListOpt := client.InNamespace(namespace)
Expand All @@ -224,6 +164,10 @@ func (r *ReconcileNotifications) updateCephBucketNotification(notification *ceph

// loop through all OBCs in the list and get their OBs
for _, obc := range obcList.Items {
if obc.Spec.ObjectBucketName == "" {
logger.Infof("ObjectBucketClaim %q resource did not create the bucket yet. will retry", types.NamespacedName{Name: obc.Name, Namespace: obc.Namespace})
return waitForRequeueIfObjectBucketNotReady, nil
}
ob := bktv1alpha1.ObjectBucket{}
bucketName := types.NamespacedName{Namespace: namespace, Name: obc.Spec.ObjectBucketName}
if err := r.client.Get(r.opManagerContext, bucketName, &ob); err != nil {
Expand All @@ -237,26 +181,67 @@ func (r *ReconcileNotifications) updateCephBucketNotification(notification *ceph
return reconcile.Result{}, err
}

// provision the notification
provisioner := Provisioner{
Client: r.client,
Context: r.context,
ClusterInfo: r.clusterInfo,
ClusterSpec: r.clusterSpec,
opManagerContext: r.opManagerContext,
}
session, err := provisioner.createSession(
ob.Spec.AdditionalState["cephUser"],
objectStoreName,
err = createNotificationFunc(
provisioner{
context: r.context,
clusterInfo: clusterInfo,
clusterSpec: clusterSpec,
opManagerContext: r.opManagerContext,
owner: ob.Spec.AdditionalState[bucket.CephUser],
objectStoreName: objectStoreName,
},
&ob,
*bucketTopic.Status.ARN,
notification,
)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to create session for CephBucketNotification provisioning %q for ObjectBucketClaims %q", bnName, bucketName)
}
err = provisioner.Create(&ob, topicARN, notification, session)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to provision CephBucketNotification %q for ObjectBucketClaims %q", bnName, bucketName)
}
}

return reconcile.Result{}, nil
}

func getCephObjectStoreName(ob bktv1alpha1.ObjectBucket) (types.NamespacedName, error) {
// parse the following string: <prefix>-rgw-<store>.<namespace>.svc
// to ge the object store name and namespace
logger.Debugf("BucketHost of %q is %q",
types.NamespacedName{Name: ob.Name, Namespace: ob.Namespace}.String(),
ob.Spec.Connection.Endpoint.BucketHost,
)
objectStoreName, err := object.ParseDomainName(ob.Spec.Connection.Endpoint.BucketHost)
if err != nil {
return types.NamespacedName{}, errors.Wrapf(err, "malformed BucketHost %q", ob.Spec.Endpoint.BucketHost)
}
return objectStoreName, nil
}

// verify that object store is configured correctly for OB, CephBucketNotification and CephBucketTopic
func validateObjectStoreName(topic *cephv1.CephBucketTopic, bucketStoreName types.NamespacedName) error {
topicStoreName := types.NamespacedName{Name: topic.Spec.ObjectStoreName, Namespace: topic.Spec.ObjectStoreNamespace}
if topicStoreName != bucketStoreName {
return errors.Errorf("object store name mismatch between topic and bucket. %q != %q", topicStoreName, bucketStoreName)
}
return nil
}

// getReadyCluster get cluster info and spec if the cluster is ready
func getReadyCluster(client client.Client, opManagerContext context.Context, context clusterd.Context, objectStoreNamespace string) (*cephclient.ClusterInfo, *cephv1.ClusterSpec, error) {
// find the namespace for the ceph cluster (may be different than the namespace of the notification CR)
// Make sure a CephCluster is present otherwise do nothing
cephCluster, isReadyToReconcile, cephClusterExists, _ := opcontroller.IsReadyToReconcile(
opManagerContext,
client,
types.NamespacedName{Namespace: objectStoreNamespace},
controllerName,
)
if !isReadyToReconcile || !cephClusterExists {
logger.Debug("Ceph cluster not yet present.")
return nil, nil, nil
}
clusterInfo, _, _, err := mon.LoadClusterInfo(&context, opManagerContext, cephCluster.Namespace)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to populate cluster info")
}
return clusterInfo, &cephCluster.Spec, nil
}

0 comments on commit 99284c5

Please sign in to comment.