Skip to content

Commit

Permalink
test: add more test cases for bucket notfication
Browse files Browse the repository at this point in the history
Added following test cases for bucket notification integration test
suite:

* different order: OBC - Topic - Notification
* different order: OBC - Notification - Topic
* adding a label to an existing OBC
* deleting a label from an existing OBC

Signed-off-by: Jiffin Tony Thottan <thottanjiffin@gmail.com>
  • Loading branch information
thotz committed Nov 17, 2021
1 parent c3b5d3b commit 618bdd4
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 56 deletions.
34 changes: 34 additions & 0 deletions tests/framework/clients/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
b64 "encoding/base64"
"fmt"

"github.com/aws/aws-sdk-go/service/s3"
bktv1alpha1 "github.com/kube-object-storage/lib-bucket-provisioner/pkg/apis/objectbucket.io/v1alpha1"
rgw "github.com/rook/rook/pkg/operator/ceph/object"
"github.com/rook/rook/tests/framework/installer"
"github.com/rook/rook/tests/framework/utils"
)
Expand Down Expand Up @@ -146,3 +148,35 @@ func (b *BucketOperation) CheckOBMaxObject(obcName, maxobject string) bool {
fetchMaxObject, _ := b.k8sh.GetResource("ob", obName, "--output", "jsonpath={.spec.endpoint.additionalConfig.maxObjects}")
return maxobject == fetchMaxObject
}

// Checks the bucket notifications for an bucket
func (b *BucketOperation) CheckBucketNotifications(namespace, storeName, obcName, bucketname, notificationName, s3endpoint string) bool {
var s3client *rgw.S3Agent
s3AccessKey, _ := b.GetAccessKey(obcName)
s3SecretKey, _ := b.GetSecretKey(obcName)

//TODO : add TLS check
s3client, err := rgw.NewS3Agent(s3AccessKey, s3SecretKey, s3endpoint, "", true, nil)

if err != nil {
logger.Errorf("S3 client creation failed with error %v", err)
return false
}
logger.Infof("endpoint (%s) Accesskey (%s) secret (%s) ", s3endpoint, s3AccessKey, s3SecretKey)

notifications, err := s3client.Client.GetBucketNotificationConfiguration(&s3.GetBucketNotificationConfigurationRequest{
Bucket: &bucketname,
})
if err != nil {
logger.Infof("failed to fetch bucket notifications configuration due to %v", err)
return false
}
logger.Infof("%d bucket notifications found in: %v", len(notifications.TopicConfigurations), notifications)
for _, notification := range notifications.TopicConfigurations {
if *notification.Id == notificationName {
return true
}
logger.Infof("bucket notifications name mismatch %q != %q", *notification.Id, notificationName)
}
return false
}
8 changes: 7 additions & 1 deletion tests/framework/utils/k8s_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"time"

"github.com/coreos/pkg/capnslog"
bktclient "github.com/kube-object-storage/lib-bucket-provisioner/pkg/client/clientset/versioned"
"github.com/pkg/errors"
rookclient "github.com/rook/rook/pkg/client/clientset/versioned"
"github.com/rook/rook/pkg/clusterd"
Expand All @@ -56,6 +57,7 @@ type K8sHelper struct {
remoteExecutor *exec.RemotePodCommandExecutor
Clientset *kubernetes.Clientset
RookClientset *rookclient.Clientset
BucketClientset *bktclient.Clientset
RunningInCluster bool
T func() *testing.T
}
Expand Down Expand Up @@ -94,13 +96,17 @@ func CreateK8sHelper(t func() *testing.T) (*K8sHelper, error) {
if err != nil {
return nil, fmt.Errorf("failed to get rook clientset. %+v", err)
}
bucketClientset, err := bktclient.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to get rook clientset. %+v", err)
}

remoteExecutor := &exec.RemotePodCommandExecutor{
ClientSet: clientset,
RestClient: config,
}

h := &K8sHelper{executor: executor, Clientset: clientset, RookClientset: rookClientset, T: t, remoteExecutor: remoteExecutor}
h := &K8sHelper{executor: executor, Clientset: clientset, RookClientset: rookClientset, BucketClientset: bucketClientset, T: t, remoteExecutor: remoteExecutor}
if strings.Contains(config.Host, "//10.") {
h.RunningInCluster = true
}
Expand Down
212 changes: 157 additions & 55 deletions tests/integration/ceph_bucket_notification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,24 @@ import (
"testing"
"time"

"github.com/aws/aws-sdk-go/service/s3"
"github.com/rook/rook/pkg/daemon/ceph/client"
rgw "github.com/rook/rook/pkg/operator/ceph/object"
"github.com/rook/rook/tests/framework/utils"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func (s *ObjectSuite) TestBucketNotificationsInOrder() {
func (s *ObjectSuite) TestBucketNotifications() {
if utils.IsPlatformOpenShift() {
s.T().Skip("bucket notification tests skipped on openshift")
}

objectStoreServicePrefix = objectStoreServicePrefixUniq
storeName := "test-store"
bucketNotificationLabelPrefix := "bucket-notification-"
storeName := "test-store-bucket-notification"
tlsEnable := false
namespace := s.settings.Namespace
obcNamespace := "default"
helper := s.helper
k8sh := s.k8sh
logger.Infof("Running on Rook Cluster %s", namespace)
Expand All @@ -47,20 +48,6 @@ func (s *ObjectSuite) TestBucketNotificationsInOrder() {
clusterInfo := client.AdminClusterInfo(namespace)
t := s.T()

t.Run("create CephObjectStoreUser", func(t *testing.T) {
createCephObjectUser(s.Suite, helper, k8sh, namespace, storeName, userid, true, true)
i := 0
for i = 0; i < 4; i++ {
if helper.ObjectUserClient.UserSecretExists(namespace, storeName, userid) {
break
}
logger.Info("waiting 5 more seconds for user secret to exist")
time.Sleep(5 * time.Second)
}
assert.NotEqual(t, 4, i)
})
logger.Info("object store user created")

context := k8sh.MakeContext()
objectStore, err := k8sh.RookClientset.CephV1().CephObjectStores(namespace).Get(ctx, storeName, metav1.GetOptions{})
assert.Nil(t, err)
Expand All @@ -70,6 +57,7 @@ func (s *ObjectSuite) TestBucketNotificationsInOrder() {
notificationName := "my-notification"
topicName := "my-topic"
httpEndpointService := "my-notification-sink"
s3endpoint, _ := helper.ObjectClient.GetEndPointUrl(namespace, storeName)

t.Run("create CephBucketTopic", func(t *testing.T) {
err := helper.TopicClient.CreateTopic(topicName, storeName, httpEndpointService)
Expand Down Expand Up @@ -122,38 +110,165 @@ func (s *ObjectSuite) TestBucketNotificationsInOrder() {
})

t.Run("check CephBucketNotification created for bucket", func(t *testing.T) {
var s3client *rgw.S3Agent
s3endpoint, _ := helper.ObjectClient.GetEndPointUrl(namespace, storeName)
s3AccessKey, _ := helper.BucketClient.GetAccessKey(obcName)
s3SecretKey, _ := helper.BucketClient.GetSecretKey(obcName)
if objectStore.Spec.IsTLSEnabled() {
s3client, err = rgw.NewInsecureS3Agent(s3AccessKey, s3SecretKey, s3endpoint, rgwcontext.ZoneGroup, true)
} else {
s3client, err = rgw.NewS3Agent(s3AccessKey, s3SecretKey, s3endpoint, rgwcontext.ZoneGroup, true, nil)
}
notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
return helper.BucketClient.CheckBucketNotifications(namespace, storeName, obcName, bucketname, notificationName, s3endpoint)
})
assert.True(t, notificationPresent)
logger.Info("CephBucketNotification created successfully on bucket")
})

t.Run("adding new label to the ObjectBucketClaim", func(t *testing.T) {
obc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Get(ctx, obcName, metav1.GetOptions{})
assert.Nil(t, err)
obc.Labels["test-label"] = "test-value"
_, err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Update(ctx, obc, metav1.UpdateOptions{})
assert.Nil(t, err)
// check whether existing bucket notification uneffected
notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
return helper.BucketClient.CheckBucketNotifications(namespace, storeName, obcName, bucketname, notificationName, s3endpoint)
})
assert.True(t, notificationPresent)
})

t.Run("removing label from the ObjectBucketClaim", func(t *testing.T) {
obc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Get(ctx, obcName, metav1.GetOptions{})
assert.Nil(t, err)
logger.Infof("endpoint (%s) Accesskey (%s) secret (%s) region (%s)", s3endpoint, s3AccessKey, s3SecretKey, rgwcontext.ZoneGroup)
delete(obc.Labels, "test-label")
_, err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Update(ctx, obc, metav1.UpdateOptions{})
assert.Nil(t, err)
// check whether existing bucket notification uneffected
notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
return helper.BucketClient.CheckBucketNotifications(namespace, storeName, obcName, bucketname, notificationName, s3endpoint)
})
assert.True(t, notificationPresent)
})

created := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
notifications, err := s3client.Client.GetBucketNotificationConfiguration(&s3.GetBucketNotificationConfigurationRequest{
Bucket: &bucketname,
})
if err != nil {
logger.Infof("failed to fetch bucket notifications configuration due to %v", err)
return false
t.Run("remove notification from the ObjectBucketClaim", func(t *testing.T) {
obc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Get(ctx, obcName, metav1.GetOptions{})
assert.Nil(t, err)
delete(obc.Labels, bucketNotificationLabelPrefix+notificationName)
_, err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Update(ctx, obc, metav1.UpdateOptions{})
assert.Nil(t, err)
// check whether existing bucket notification uneffected
var notificationPresent bool
for i := 0; i < 4; i++ {
notificationPresent = helper.BucketClient.CheckBucketNotifications(namespace, storeName, obcName, bucketname, notificationName, s3endpoint)
if !notificationPresent {
break
}
logger.Infof("%d bucket notifications found in: %v", len(notifications.TopicConfigurations), notifications)
for _, notification := range notifications.TopicConfigurations {
if *notification.Id == notificationName {
return true
}
logger.Infof("bucket notifications name mismatch %q != %q", *notification.Id, notificationName)
time.Sleep(5 * time.Second)
}
assert.False(t, notificationPresent)
})

t.Run("add new topic, notification to existing ObjectBucketClaim", func(t *testing.T) {
newNotificationName := "new-notification"
newTopicName := "new-topic"
err := helper.TopicClient.CreateTopic(newTopicName, storeName, httpEndpointService)
assert.Nil(t, err)
created := utils.Retry(12, 2*time.Second, "topic is created", func() bool {
return helper.TopicClient.CheckTopic(newTopicName)
})
assert.True(t, created)
err = helper.NotificationClient.CreateNotification(newNotificationName, newTopicName)
assert.Nil(t, err)
created = utils.Retry(12, 2*time.Second, "notification is created", func() bool {
return helper.NotificationClient.CheckNotification(newNotificationName)
})
assert.True(t, created)
obc, err := k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Get(ctx, obcName, metav1.GetOptions{})
assert.Nil(t, err)
obc.Labels[bucketNotificationLabelPrefix+newNotificationName] = newNotificationName
_, err = k8sh.BucketClientset.ObjectbucketV1alpha1().ObjectBucketClaims(obcNamespace).Update(ctx, obc, metav1.UpdateOptions{})
assert.Nil(t, err)
// check whether bucket notification added
notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
return helper.BucketClient.CheckBucketNotifications(namespace, storeName, obcName, bucketname, newNotificationName, s3endpoint)
})
assert.True(t, notificationPresent)
err = helper.NotificationClient.DeleteNotification(newNotificationName, topicName)
assert.Nil(t, err)
err = helper.TopicClient.DeleteTopic(newTopicName, storeName, httpEndpointService)
assert.Nil(t, err)
})

t.Run("reverse order of creating notification,topic and adding it to ObjectBucketClaim", func(t *testing.T) {
reverseNotificationName := "reverse-notification"
reverseTopicName := "reverse-topic"
reverseOBCName := "reverse-obc"
reverseBucketName := "reverse-bucket"
err := helper.BucketClient.CreateObcNotification(reverseOBCName, bucketStorageClassName, reverseBucketName, reverseNotificationName, true)
assert.Nil(t, err)

created := utils.Retry(12, 2*time.Second, "OBC is created", func() bool {
return helper.BucketClient.CheckOBC(reverseOBCName, "bound")
})
assert.True(t, created)
logger.Info("OBC created successfully")

var bkt rgw.ObjectBucket
i := 0
for i = 0; i < 4; i++ {
b, code, err := rgw.GetBucket(rgwcontext, reverseBucketName)
if b != nil && err == nil {
bkt = *b
break
}
return false
logger.Warningf("cannot get bucket %q, retrying... bucket: %v. code: %d, err: %v", reverseBucketName, b, code, err)
logger.Infof("(%d) check bucket exists, sleeping for 5 seconds ...", i)
time.Sleep(5 * time.Second)
}
assert.NotEqual(t, 4, i)
assert.Equal(t, reverseBucketName, bkt.Name)
err = helper.NotificationClient.CreateNotification(reverseNotificationName, reverseTopicName)
assert.Nil(t, err)
created = utils.Retry(12, 2*time.Second, "notification is created", func() bool {
return helper.NotificationClient.CheckNotification(reverseNotificationName)
})
assert.True(t, created)
logger.Info("CephBucketNotification created successfully on bucket")

// check whether bucket notification added, should fail since topic is not created
notificationPresent := helper.BucketClient.CheckBucketNotifications(namespace, storeName, reverseOBCName, reverseBucketName, reverseNotificationName, s3endpoint)
assert.False(t, notificationPresent)

err = helper.TopicClient.CreateTopic(reverseTopicName, storeName, httpEndpointService)
assert.Nil(t, err)
created = utils.Retry(12, 2*time.Second, "topic is created", func() bool {
return helper.TopicClient.CheckTopic(reverseTopicName)
})
assert.True(t, created)

// check whether bucket notification added, should pass since topic got created
notificationPresent = utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
return helper.BucketClient.CheckBucketNotifications(namespace, storeName, reverseOBCName, reverseBucketName, reverseNotificationName, s3endpoint)
})
assert.True(t, notificationPresent)
err = helper.NotificationClient.DeleteNotification(reverseNotificationName, reverseTopicName)
assert.Nil(t, err)
err = helper.TopicClient.DeleteTopic(reverseTopicName, storeName, httpEndpointService)
assert.Nil(t, err)

err = helper.BucketClient.DeleteObc(reverseOBCName, bucketStorageClassName, reverseBucketName, maxObject, true)
assert.Nil(t, err)
logger.Info("Checking to see if the obc, secret, and cm have all been deleted")
for i = 0; i < 4 && !helper.BucketClient.CheckOBC(reverseOBCName, "deleted"); i++ {
logger.Infof("(%d) obc deleted check, sleeping for 5 seconds ...", i)
time.Sleep(5 * time.Second)
}
assert.NotEqual(t, 4, i)

logger.Info("ensure OBC bucket was deleted")
var rgwErr int
for i = 0; i < 4; i++ {
_, rgwErr, _ = rgw.GetBucket(rgwcontext, reverseBucketName)
if rgwErr == rgw.RGWErrorNotFound {
break
}
logger.Infof("(%d) check bucket deleted, sleeping for 5 seconds ...", i)
time.Sleep(5 * time.Second)
}
assert.NotEqual(t, 4, i)
assert.Equal(t, rgwErr, rgw.RGWErrorNotFound)
})

t.Run("delete ObjectBucketClaim", func(t *testing.T) {
Expand Down Expand Up @@ -194,19 +309,6 @@ func (s *ObjectSuite) TestBucketNotificationsInOrder() {
assert.Nil(t, err)
})

t.Run("delete CephObjectStoreUser", func(t *testing.T) {
dosuErr := helper.ObjectUserClient.Delete(namespace, userid)
assert.Nil(t, dosuErr)
logger.Info("Object store user deleted successfully")
logger.Info("Checking to see if the user secret has been deleted")
i := 0
for i = 0; i < 4 && helper.ObjectUserClient.UserSecretExists(namespace, storeName, userid) == true; i++ {
logger.Infof("(%d) secret check sleeping for 5 seconds ...", i)
time.Sleep(5 * time.Second)
}
assert.False(t, helper.ObjectUserClient.UserSecretExists(namespace, storeName, userid))
})

t.Run("delete CephObjectStore", func(t *testing.T) {
deleteObjectStore(t, k8sh, namespace, storeName)
assertObjectStoreDeletion(t, k8sh, namespace, storeName)
Expand Down

0 comments on commit 618bdd4

Please sign in to comment.