Skip to content

Commit

Permalink
ceph: topic deletion
Browse files Browse the repository at this point in the history
(will aquash before merge)

Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
  • Loading branch information
yuvalif committed Aug 29, 2021
1 parent 549b557 commit b7d9c63
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 43 deletions.
6 changes: 3 additions & 3 deletions pkg/operator/ceph/controller/predicate.go
Expand Up @@ -50,15 +50,15 @@ const (
func WatchControllerPredicate() predicate.Funcs {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
logger.Debug("create event from a CR")
logger.Debugf("create event from a CR: %q", e.Object.GetName())
return true
},
DeleteFunc: func(e event.DeleteEvent) bool {
logger.Debug("delete event from a CR")
logger.Debugf("delete event from a CR: %q", e.Object.GetName())
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
logger.Debug("update event from a CR")
logger.Debugf("update event from a CR: %q", e.ObjectOld.GetName())
// resource.Quantity has non-exportable fields, so we use its comparator method
resourceQtyComparer := cmp.Comparer(func(x, y resource.Quantity) bool { return x.Cmp(y) == 0 })

Expand Down
30 changes: 22 additions & 8 deletions pkg/operator/ceph/object/topic/controller.go
Expand Up @@ -125,7 +125,7 @@ func (r *ReconcileBucketTopic) reconcile(request reconcile.Request) (reconcile.R
err := r.client.Get(context.TODO(), request.NamespacedName, cephBucketTopic)
if err != nil {
if kerrors.IsNotFound(err) {
logger.Debugf("CephBucketTopic: %q not found. Ignoring since resource must be deleted", cephBucketTopic.Name)
logger.Debugf("CephBucketTopic: %q not found. Ignoring since resource must be deleted", request.NamespacedName)
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
Expand Down Expand Up @@ -157,21 +157,25 @@ func (r *ReconcileBucketTopic) reconcile(request reconcile.Request) (reconcile.R
}
r.clusterSpec = &cephCluster.Spec

// Populate clusterInfo during each reconcile
r.clusterInfo, _, _, err = mon.LoadClusterInfo(r.context, clusterNamespace)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to populate cluster info")
}

// DELETE: the CR was deleted
if !cephBucketTopic.GetDeletionTimestamp().IsZero() {
logger.Debugf("deleting CephBucketTopic: %q", cephBucketTopic.Name)
err = r.deleteCephBucketTopic(cephBucketTopic)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "failed to delete topic: %q", cephBucketTopic.Name)
}

// Return and do not requeue. Successful deletion.
return reconcile.Result{}, nil
}

// Populate clusterInfo during each reconcile
r.clusterInfo, _, _, err = mon.LoadClusterInfo(r.context, clusterNamespace)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to populate cluster info")
}

// validate the zone settings
// validate the topic settings
err = cephBucketTopic.ValidateCreate()
if err != nil {
updateStatus(r.client, request.NamespacedName, k8sutil.ReconcileFailedStatus, nil)
Expand Down Expand Up @@ -212,6 +216,16 @@ func (r *ReconcileBucketTopic) createCephBucketTopic(topic *cephv1.CephBucketTop
return
}

func (r *ReconcileBucketTopic) deleteCephBucketTopic(topic *cephv1.CephBucketTopic) error {
p := Provisioner{
Client: r.client,
Context: r.context,
ClusterInfo: r.clusterInfo,
ClusterSpec: r.clusterSpec,
}
return p.Delete(topic)
}

func (r *ReconcileBucketTopic) setFailedStatus(name types.NamespacedName, errMessage string, err error) (reconcile.Result, error) {
updateStatus(r.client, name, k8sutil.ReconcileFailedStatus, nil)
return reconcile.Result{}, errors.Wrapf(err, "%s", errMessage)
Expand Down
89 changes: 57 additions & 32 deletions pkg/operator/ceph/object/topic/provisioner.go
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
awsrequest "github.com/aws/aws-sdk-go/aws/request"
awssession "github.com/aws/aws-sdk-go/aws/session"
Expand Down Expand Up @@ -90,6 +91,38 @@ func (p *Provisioner) createSession(objectStoreName string) (*awssession.Session
return sess, nil
}

func createSNSClient(sess *awssession.Session) (snsClient *sns.SNS) {
snsClient = sns.New(sess)
// This is a hack to workaround the following RGW issue: https://tracker.ceph.com/issues/50039
// note that using: "github.com/aws/aws-sdk-go/private/signer/v2"
// * would add the signature to the query and not the header
// * use sha246 and not sha1
customSignername := "cephV2.SignRequestHandler"
snsClient.Handlers.Sign.Swap(awsv4signer.SignRequestHandler.Name, awsrequest.NamedHandler{
Name: customSignername,
Fn: func(req *awsrequest.Request) {
credentials, err := req.Config.Credentials.Get()
if err != nil {
logger.Debugf("%s failed to get credentials: %v", customSignername, err)
return
}
date := req.Time.UTC().Format(time.RFC1123Z)
contentType := "application/x-www-form-urlencoded; charset=utf-8"
stringToSign := req.HTTPRequest.Method + "\n\n" + contentType + "\n" + date + "\n" + req.HTTPRequest.URL.Path
hash := hmac.New(sha1.New, []byte(credentials.SecretAccessKey))
hash.Write([]byte(stringToSign))
signature := base64.StdEncoding.EncodeToString(hash.Sum(nil))
if len(req.HTTPRequest.Header["Authorization"]) == 0 {
req.HTTPRequest.Header.Add("Authorization", "AWS "+credentials.AccessKeyID+":"+signature)
}
if len(req.HTTPRequest.Header["Date"]) == 0 {
req.HTTPRequest.Header.Add("Date", date)
}
},
})
return
}

func (p *Provisioner) Create(topic *cephv1.CephBucketTopic) (*string, error) {
logger.Infof("creating topic: %q with endpoint: %q", topic.Name, topic.Spec.Endpoint)

Expand Down Expand Up @@ -124,36 +157,7 @@ func (p *Provisioner) Create(topic *cephv1.CephBucketTopic) (*string, error) {
attr["verify-ssl"] = &verifySSL
}

snsClient := sns.New(sess)
// This is a hack to workaround the following RGW issue: https://tracker.ceph.com/issues/50039
// note that using: "github.com/aws/aws-sdk-go/private/signer/v2"
// * would add the signature to the query and not the header
// * use sha246 and not sha1
customSignername := "cephV2.SignRequestHandler"
snsClient.Handlers.Sign.Swap(awsv4signer.SignRequestHandler.Name, awsrequest.NamedHandler{
Name: customSignername,
Fn: func(req *awsrequest.Request) {
credentials, err := req.Config.Credentials.Get()
if err != nil {
logger.Debugf("%s failed to get credentials: %v", customSignername, err)
return
}
date := req.Time.UTC().Format(time.RFC1123Z)
contentType := "application/x-www-form-urlencoded; charset=utf-8"
stringToSign := req.HTTPRequest.Method + "\n\n" + contentType + "\n" + date + "\n" + req.HTTPRequest.URL.Path
hash := hmac.New(sha1.New, []byte(credentials.SecretAccessKey))
hash.Write([]byte(stringToSign))
signature := base64.StdEncoding.EncodeToString(hash.Sum(nil))
if len(req.HTTPRequest.Header["Authorization"]) == 0 {
req.HTTPRequest.Header.Add("Authorization", "AWS "+credentials.AccessKeyID+":"+signature)
}
if len(req.HTTPRequest.Header["Date"]) == 0 {
req.HTTPRequest.Header.Add("Date", date)
}
},
})

topicOutput, err := snsClient.CreateTopic(&sns.CreateTopicInput{
topicOutput, err := createSNSClient(sess).CreateTopic(&sns.CreateTopicInput{
Name: &topic.Name,
Attributes: attr,
})
Expand All @@ -167,7 +171,28 @@ func (p *Provisioner) Create(topic *cephv1.CephBucketTopic) (*string, error) {
return topicOutput.TopicArn, nil
}

func (r *Provisioner) Delete(namespace string, topicARN string) error {
// TODO: implement
func (p *Provisioner) Delete(topic *cephv1.CephBucketTopic) error {
logger.Infof("deleting topic: %q", topic.Name)
if topic.Status.ARN == nil {
logger.Warningf("ignore topic deletion. topic %q was never successfully provisioned", topic.Name)
return nil
}

sess, err := p.createSession(topic.Spec.ObjectStoreName)
if err != nil {
return errors.Wrapf(err, "failed to create session for bucket topic: %q deletion", topic.Name)
}

_, err = createSNSClient(sess).DeleteTopic(&sns.DeleteTopicInput{TopicArn: topic.Status.ARN})

if err != nil {
if err.(awserr.Error).Code() != sns.ErrCodeNotFoundException {
return errors.Wrapf(err, "failed to delete bucket topic: %q", topic.Name)
}
logger.Warningf("ignore topic deletion. topic %q was already deleted", topic.Name)
}

logger.Infof("topic %q deleted", topic.Name)

return nil
}

0 comments on commit b7d9c63

Please sign in to comment.