From 497a602d9922e44e424e9206a39e67074e00a3f5 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Sun, 22 Aug 2021 20:58:52 +0300 Subject: [PATCH] ceph: add store name to topics and notifications (will squash before merge) Signed-off-by: Yuval Lifshitz --- cluster/charts/rook-ceph/templates/resources.yaml | 8 ++++++++ .../kubernetes/ceph/bucket-notification.yaml | 1 + cluster/examples/kubernetes/ceph/bucket-topic.yaml | 1 + cluster/examples/kubernetes/ceph/crds.yaml | 8 ++++++++ pkg/apis/ceph.rook.io/v1/types.go | 4 ++++ pkg/operator/ceph/object/notification/provisioner.go | 12 +++++------- pkg/operator/ceph/object/topic/provisioner.go | 10 ++++------ 7 files changed, 31 insertions(+), 13 deletions(-) diff --git a/cluster/charts/rook-ceph/templates/resources.yaml b/cluster/charts/rook-ceph/templates/resources.yaml index f983c2233d085..8de021c713376 100644 --- a/cluster/charts/rook-ceph/templates/resources.yaml +++ b/cluster/charts/rook-ceph/templates/resources.yaml @@ -450,10 +450,14 @@ spec: type: object type: array type: object + objectStoreName: + description: The name of the object store on which to define the topic + type: string topic: description: The name of the topic associated with this notification type: string required: + - objectStoreName - topic type: object status: @@ -558,6 +562,9 @@ spec: description: Indicate whether the server certificate is validated by the client or not type: boolean type: object + objectStoreName: + description: The name of the object store on which to define the topic + type: string opaqueData: description: Data which is sent in each event type: string @@ -566,6 +573,7 @@ spec: type: boolean required: - endpoint + - objectStoreName type: object status: description: BucketTopicStatus represents the Status of a CephBucketTopic diff --git a/cluster/examples/kubernetes/ceph/bucket-notification.yaml b/cluster/examples/kubernetes/ceph/bucket-notification.yaml index 695cd2be66cfa..5d300f8b05249 100644 --- a/cluster/examples/kubernetes/ceph/bucket-notification.yaml +++ b/cluster/examples/kubernetes/ceph/bucket-notification.yaml @@ -4,6 +4,7 @@ metadata: name: my-notification spec: topic: my-topic + objectStoreName: my-store filter: # all filters must match for the notification to apply on an object keyFilters: diff --git a/cluster/examples/kubernetes/ceph/bucket-topic.yaml b/cluster/examples/kubernetes/ceph/bucket-topic.yaml index 8153c3a30f000..3ec17962a6840 100644 --- a/cluster/examples/kubernetes/ceph/bucket-topic.yaml +++ b/cluster/examples/kubernetes/ceph/bucket-topic.yaml @@ -9,6 +9,7 @@ spec: # endpoint: amqp://my-rabbitmq-service:5672/vhost1 # endpoint: amqps://my-rabbitmq-service:5671/vhost1 # endpoint: kafka://my-kafka-service:9092 + objectStoreName: my-store opaqueData: my@email.com persistent: false # use the ednpoint spec (http, amqp, kafka) that matches the endpoint diff --git a/cluster/examples/kubernetes/ceph/crds.yaml b/cluster/examples/kubernetes/ceph/crds.yaml index c9bc945abc686..7d31350456cd6 100644 --- a/cluster/examples/kubernetes/ceph/crds.yaml +++ b/cluster/examples/kubernetes/ceph/crds.yaml @@ -451,10 +451,14 @@ spec: type: object type: array type: object + objectStoreName: + description: The name of the object store on which to define the topic + type: string topic: description: The name of the topic associated with this notification type: string required: + - objectStoreName - topic type: object status: @@ -558,6 +562,9 @@ spec: description: Indicate whether the server certificate is validated by the client or not type: boolean type: object + objectStoreName: + description: The name of the object store on which to define the topic + type: string opaqueData: description: Data which is sent in each event type: string @@ -566,6 +573,7 @@ spec: type: boolean required: - endpoint + - objectStoreName type: object status: description: BucketTopicStatus represents the Status of a CephBucketTopic diff --git a/pkg/apis/ceph.rook.io/v1/types.go b/pkg/apis/ceph.rook.io/v1/types.go index d805e6ed4fef3..c1cdd9d68a1da 100755 --- a/pkg/apis/ceph.rook.io/v1/types.go +++ b/pkg/apis/ceph.rook.io/v1/types.go @@ -1573,6 +1573,8 @@ type CephBucketTopicList struct { type BucketTopicSpec struct { // The URI of an endpoint to push notification to Endpoint string `json:"endpoint"` + // The name of the object store on which to define the topic + ObjectStoreName string `json:"objectStoreName"` // Data which is sent in each event // +optional OpaqueData string `json:"opaqueData,omitempty"` @@ -1652,6 +1654,8 @@ type CephBucketNotificationList struct { type BucketNotificationSpec struct { // The name of the topic associated with this notification Topic string `json:"topic"` + // The name of the object store on which to define the topic + ObjectStoreName string `json:"objectStoreName"` // List of events that should trigger the notification // +optional Events []*string `json:"events,omitempty"` diff --git a/pkg/operator/ceph/object/notification/provisioner.go b/pkg/operator/ceph/object/notification/provisioner.go index b6ca77af12b1c..2ca57b2098237 100644 --- a/pkg/operator/ceph/object/notification/provisioner.go +++ b/pkg/operator/ceph/object/notification/provisioner.go @@ -77,20 +77,18 @@ func (p *Provisioner) getCephUser(username string, objStore *cephv1.CephObjectSt return } -func (p *Provisioner) createSession(owner string) (*awssession.Session, error) { - objStores, err := p.Context.RookClientset.CephV1().CephObjectStores(p.ClusterInfo.Namespace).List(context.TODO(), metav1.ListOptions{}) +func (p *Provisioner) createSession(owner string, storeName string) (*awssession.Session, error) { + objStore, err := p.Context.RookClientset.CephV1().CephObjectStores(p.ClusterInfo.Namespace).Get(context.TODO(), storeName, metav1.GetOptions{}) if err != nil { return nil, err } - // TODO: which store to take? - objStore := objStores.Items[0] - objContext, err := object.NewMultisiteContext(p.Context, p.ClusterInfo, &objStore) + objContext, err := object.NewMultisiteContext(p.Context, p.ClusterInfo, objStore) if err != nil { return nil, err } - accessKey, secretKey, err := p.getCephUser(owner, &objStore, objContext) + accessKey, secretKey, err := p.getCephUser(owner, objStore, objContext) if err != nil { return nil, err } @@ -125,7 +123,7 @@ func (p *Provisioner) createSession(owner string) (*awssession.Session, error) { } func (p *Provisioner) Create(bucket *bktv1alpha1.ObjectBucket, topicARN string, notification *cephv1.CephBucketNotification) error { - sess, err := p.createSession(bucket.Spec.AdditionalState["cephUser"]) + sess, err := p.createSession(bucket.Spec.AdditionalState["cephUser"], notification.Spec.ObjectStoreName) if err != nil { return errors.Wrapf(err, "failed to create session for bucket notification: %q provisioning", notification.Name) } diff --git a/pkg/operator/ceph/object/topic/provisioner.go b/pkg/operator/ceph/object/topic/provisioner.go index 22372eb06fc71..bb7c5ed230dca 100644 --- a/pkg/operator/ceph/object/topic/provisioner.go +++ b/pkg/operator/ceph/object/topic/provisioner.go @@ -41,15 +41,13 @@ type Provisioner struct { ClusterSpec *cephv1.ClusterSpec } -func (p *Provisioner) createSession() (*awssession.Session, error) { - cephObjectStores, err := p.Context.RookClientset.CephV1().CephObjectStores(p.ClusterInfo.Namespace).List(context.TODO(), metav1.ListOptions{}) +func (p *Provisioner) createSession(objectStoreName string) (*awssession.Session, error) { + cephObjectStore, err := p.Context.RookClientset.CephV1().CephObjectStores(p.ClusterInfo.Namespace).Get(context.TODO(), objectStoreName, metav1.GetOptions{}) if err != nil { return nil, err } - // TODO: which store to take? - cephObjectStore := cephObjectStores.Items[0] - objContext, err := object.NewMultisiteContext(p.Context, p.ClusterInfo, &cephObjectStore) + objContext, err := object.NewMultisiteContext(p.Context, p.ClusterInfo, cephObjectStore) if err != nil { return nil, err } @@ -91,7 +89,7 @@ func (p *Provisioner) createSession() (*awssession.Session, error) { func (p *Provisioner) Create(topic *cephv1.CephBucketTopic) (*string, error) { logger.Infof("creating topic: %q with endpoint: %q", topic.Name, topic.Spec.Endpoint) - sess, err := p.createSession() + sess, err := p.createSession(topic.Spec.ObjectStoreName) if err != nil { return nil, errors.Wrapf(err, "failed to create session for bucket topic: %q provisioning", topic.Name) }