Skip to content

Commit

Permalink
ceph: add store name to topics and notifications
Browse files Browse the repository at this point in the history
(will squash before merge)

Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
  • Loading branch information
yuvalif committed Aug 22, 2021
1 parent efed261 commit 497a602
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 13 deletions.
8 changes: 8 additions & 0 deletions cluster/charts/rook-ceph/templates/resources.yaml
Expand Up @@ -450,10 +450,14 @@ spec:
type: object
type: array
type: object
objectStoreName:
description: The name of the object store on which to define the topic
type: string
topic:
description: The name of the topic associated with this notification
type: string
required:
- objectStoreName
- topic
type: object
status:
Expand Down Expand Up @@ -558,6 +562,9 @@ spec:
description: Indicate whether the server certificate is validated by the client or not
type: boolean
type: object
objectStoreName:
description: The name of the object store on which to define the topic
type: string
opaqueData:
description: Data which is sent in each event
type: string
Expand All @@ -566,6 +573,7 @@ spec:
type: boolean
required:
- endpoint
- objectStoreName
type: object
status:
description: BucketTopicStatus represents the Status of a CephBucketTopic
Expand Down
1 change: 1 addition & 0 deletions cluster/examples/kubernetes/ceph/bucket-notification.yaml
Expand Up @@ -4,6 +4,7 @@ metadata:
name: my-notification
spec:
topic: my-topic
objectStoreName: my-store
filter:
# all filters must match for the notification to apply on an object
keyFilters:
Expand Down
1 change: 1 addition & 0 deletions cluster/examples/kubernetes/ceph/bucket-topic.yaml
Expand Up @@ -9,6 +9,7 @@ spec:
# endpoint: amqp://my-rabbitmq-service:5672/vhost1
# endpoint: amqps://my-rabbitmq-service:5671/vhost1
# endpoint: kafka://my-kafka-service:9092
objectStoreName: my-store
opaqueData: my@email.com
persistent: false
# use the ednpoint spec (http, amqp, kafka) that matches the endpoint
Expand Down
8 changes: 8 additions & 0 deletions cluster/examples/kubernetes/ceph/crds.yaml
Expand Up @@ -451,10 +451,14 @@ spec:
type: object
type: array
type: object
objectStoreName:
description: The name of the object store on which to define the topic
type: string
topic:
description: The name of the topic associated with this notification
type: string
required:
- objectStoreName
- topic
type: object
status:
Expand Down Expand Up @@ -558,6 +562,9 @@ spec:
description: Indicate whether the server certificate is validated by the client or not
type: boolean
type: object
objectStoreName:
description: The name of the object store on which to define the topic
type: string
opaqueData:
description: Data which is sent in each event
type: string
Expand All @@ -566,6 +573,7 @@ spec:
type: boolean
required:
- endpoint
- objectStoreName
type: object
status:
description: BucketTopicStatus represents the Status of a CephBucketTopic
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/ceph.rook.io/v1/types.go
Expand Up @@ -1573,6 +1573,8 @@ type CephBucketTopicList struct {
type BucketTopicSpec struct {
// The URI of an endpoint to push notification to
Endpoint string `json:"endpoint"`
// The name of the object store on which to define the topic
ObjectStoreName string `json:"objectStoreName"`
// Data which is sent in each event
// +optional
OpaqueData string `json:"opaqueData,omitempty"`
Expand Down Expand Up @@ -1652,6 +1654,8 @@ type CephBucketNotificationList struct {
type BucketNotificationSpec struct {
// The name of the topic associated with this notification
Topic string `json:"topic"`
// The name of the object store on which to define the topic
ObjectStoreName string `json:"objectStoreName"`
// List of events that should trigger the notification
// +optional
Events []*string `json:"events,omitempty"`
Expand Down
12 changes: 5 additions & 7 deletions pkg/operator/ceph/object/notification/provisioner.go
Expand Up @@ -77,20 +77,18 @@ func (p *Provisioner) getCephUser(username string, objStore *cephv1.CephObjectSt
return
}

func (p *Provisioner) createSession(owner string) (*awssession.Session, error) {
objStores, err := p.Context.RookClientset.CephV1().CephObjectStores(p.ClusterInfo.Namespace).List(context.TODO(), metav1.ListOptions{})
func (p *Provisioner) createSession(owner string, storeName string) (*awssession.Session, error) {
objStore, err := p.Context.RookClientset.CephV1().CephObjectStores(p.ClusterInfo.Namespace).Get(context.TODO(), storeName, metav1.GetOptions{})
if err != nil {
return nil, err
}
// TODO: which store to take?
objStore := objStores.Items[0]

objContext, err := object.NewMultisiteContext(p.Context, p.ClusterInfo, &objStore)
objContext, err := object.NewMultisiteContext(p.Context, p.ClusterInfo, objStore)
if err != nil {
return nil, err
}

accessKey, secretKey, err := p.getCephUser(owner, &objStore, objContext)
accessKey, secretKey, err := p.getCephUser(owner, objStore, objContext)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -125,7 +123,7 @@ func (p *Provisioner) createSession(owner string) (*awssession.Session, error) {
}

func (p *Provisioner) Create(bucket *bktv1alpha1.ObjectBucket, topicARN string, notification *cephv1.CephBucketNotification) error {
sess, err := p.createSession(bucket.Spec.AdditionalState["cephUser"])
sess, err := p.createSession(bucket.Spec.AdditionalState["cephUser"], notification.Spec.ObjectStoreName)
if err != nil {
return errors.Wrapf(err, "failed to create session for bucket notification: %q provisioning", notification.Name)
}
Expand Down
10 changes: 4 additions & 6 deletions pkg/operator/ceph/object/topic/provisioner.go
Expand Up @@ -41,15 +41,13 @@ type Provisioner struct {
ClusterSpec *cephv1.ClusterSpec
}

func (p *Provisioner) createSession() (*awssession.Session, error) {
cephObjectStores, err := p.Context.RookClientset.CephV1().CephObjectStores(p.ClusterInfo.Namespace).List(context.TODO(), metav1.ListOptions{})
func (p *Provisioner) createSession(objectStoreName string) (*awssession.Session, error) {
cephObjectStore, err := p.Context.RookClientset.CephV1().CephObjectStores(p.ClusterInfo.Namespace).Get(context.TODO(), objectStoreName, metav1.GetOptions{})
if err != nil {
return nil, err
}
// TODO: which store to take?
cephObjectStore := cephObjectStores.Items[0]

objContext, err := object.NewMultisiteContext(p.Context, p.ClusterInfo, &cephObjectStore)
objContext, err := object.NewMultisiteContext(p.Context, p.ClusterInfo, cephObjectStore)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -91,7 +89,7 @@ func (p *Provisioner) createSession() (*awssession.Session, error) {
func (p *Provisioner) Create(topic *cephv1.CephBucketTopic) (*string, error) {
logger.Infof("creating topic: %q with endpoint: %q", topic.Name, topic.Spec.Endpoint)

sess, err := p.createSession()
sess, err := p.createSession(topic.Spec.ObjectStoreName)
if err != nil {
return nil, errors.Wrapf(err, "failed to create session for bucket topic: %q provisioning", topic.Name)
}
Expand Down

0 comments on commit 497a602

Please sign in to comment.