Skip to content

Commit

Permalink
ceph: fix parsing logic of the object store namespace
Browse files Browse the repository at this point in the history
(will squash before merge)

Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
  • Loading branch information
yuvalif committed Nov 1, 2021
1 parent 22e5f5f commit 947d6c6
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 10 deletions.
2 changes: 1 addition & 1 deletion cluster/examples/kubernetes/ceph/operator.yaml
Expand Up @@ -23,7 +23,7 @@ metadata:
namespace: rook-ceph # namespace:operator
data:
# The logging level for the operator: ERROR | WARNING | INFO | DEBUG
ROOK_LOG_LEVEL: "INFO"
ROOK_LOG_LEVEL: "DEBUG"

# Enable the CSI driver.
# To run the non-default version of the CSI driver, see the override-able image properties in operator.yaml
Expand Down
8 changes: 6 additions & 2 deletions pkg/operator/ceph/object/notification/controller.go
Expand Up @@ -170,8 +170,12 @@ func (r *ReconcileNotifications) reconcile(request reconcile.Request) (reconcile
}

func getCephObjectStoreName(ob bktv1alpha1.ObjectBucket) (objectStoreName types.NamespacedName, err error) {
// prase the following string: <namespace>-rgw-<store>.<namespace>.svc
// prase the following string: <prefix>-rgw-<store>.<namespace>.svc
// to ge the object store name and namespace
logger.Debugf("BucketHost of %q is %q",
types.NamespacedName{Name: ob.Name, Namespace: ob.Namespace}.String(),
ob.Spec.Endpoint.BucketHost,
)
parsedBucketHost := strings.Split(ob.Spec.Endpoint.BucketHost, ".")
if len(parsedBucketHost) < 3 {
err = errors.Errorf("malformed BucketHost %q", ob.Spec.Endpoint.BucketHost)
Expand All @@ -182,7 +186,7 @@ func getCephObjectStoreName(ob bktv1alpha1.ObjectBucket) (objectStoreName types.
err = errors.Errorf("malformed BucketHost subdomain %q", parsedBucketHost[0])
return
}
objectStoreName.Namespace = parsedSubdomain[0]
objectStoreName.Namespace = parsedBucketHost[1]
objectStoreName.Name = parsedSubdomain[1]
return
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/operator/ceph/object/notification/obc_label_controller.go
Expand Up @@ -83,6 +83,7 @@ func (r *ReconcileOBCLabels) Reconcile(context context.Context, request reconcil
}

func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Result, error) {
logger.Debugf("reconciling ObjectBucketClaim %v labels for bucket notifications", request.NamespacedName.String())
// Fetch the ObjectBucketClaim instance
obc := bktv1alpha1.ObjectBucketClaim{}
err := r.client.Get(r.opManagerContext, request.NamespacedName, &obc)
Expand All @@ -95,7 +96,7 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res
return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve ObjectBucketClaim %q", request.NamespacedName)
}

// if ObjectBucket was created yet, reschedule in 5 seconds
// reschedule if ObjectBucket was not created yet
if obc.Spec.ObjectBucketName == "" {
logger.Infof("ObjectBucketClaim %q resource did not create the bucket yet. will retry", request.NamespacedName)
return waitForRequeueIfObjectBucketNotReady, nil
Expand Down Expand Up @@ -178,7 +179,7 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res
bnName := types.NamespacedName{Namespace: obc.Namespace, Name: labelValue}
if err := r.client.Get(r.opManagerContext, bnName, notification); err != nil {
if kerrors.IsNotFound(err) {
logger.Infof("CephBucketNotification %q not provisioned yet", bnName)
logger.Infof("CephBucketNotification %q not found", bnName)
return waitForRequeueIfNotificationNotReady, nil
}
return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve CephBucketNotification %q", bnName)
Expand All @@ -189,14 +190,14 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res
topicName := types.NamespacedName{Namespace: obc.Namespace, Name: notification.Spec.Topic}
if err := r.client.Get(r.opManagerContext, topicName, bucketTopic); err != nil {
if kerrors.IsNotFound(err) {
logger.Infof("CephBucketTopic %q not provisioned yet", topicName)
logger.Infof("CephBucketTopic %q not found", topicName)
return waitForRequeueIfTopicNotReady, nil
}
return reconcile.Result{}, errors.Wrapf(err, "failed to retrieve CephBucketTopic %q", topicName)
}
topicARN, err := topic.GetARN(bucketTopic)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to get topic ARN for CephBucketTopic %q", topicName)
return waitForRequeueIfTopicNotReady, errors.Wrapf(err, "CephBucketTopic %q not provisioned", topicName)
}

if err = validateObjectStoreName(bucketTopic, objectStoreName); err != nil {
Expand All @@ -208,7 +209,7 @@ func (r *ReconcileOBCLabels) reconcile(request reconcile.Request) (reconcile.Res
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to provision CephBucketNotification %q", bnName)
}
logger.Debugf("provisioned CephBucketNotification %q", bnName)
logger.Infof("provisioned CephBucketNotification %q", bnName)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/ceph/object/topic/provisioner.go
Expand Up @@ -224,7 +224,7 @@ func (p *Provisioner) Delete(topic *cephv1.CephBucketTopic) error {
func GetARN(topic *cephv1.CephBucketTopic) (string, error) {
nsName := types.NamespacedName{Name: topic.Name, Namespace: topic.Namespace}
if topic.Status == nil || topic.Status.ARN == nil {
return "", errors.Errorf("no ARN in topic. CephBucketTopic %q was not provisioned successfully", nsName)
return "", errors.Errorf("no ARN in topic. CephBucketTopic %q was not provisioned yet", nsName)
}
topicARN := *topic.Status.ARN
parsedTopicARN, err := arn.Parse(topicARN)
Expand Down
1 change: 0 additions & 1 deletion tests/integration/ceph_bucket_notification_test.go
Expand Up @@ -121,7 +121,6 @@ func (s *ObjectSuite) TestBucketNotificationsInOrder() {
logger.Info("OBC, Secret and ConfigMap created")
})


t.Run("check CephBucketNotification created for bucket", func(t *testing.T) {
var s3client *rgw.S3Agent
s3endpoint, _ := helper.ObjectClient.GetEndPointUrl(namespace, storeName)
Expand Down

0 comments on commit 947d6c6

Please sign in to comment.