Skip to content

Commit

Permalink
Merge pull request #9565 from thotz/rgw-tls-cert-fix-bucket-notificat…
Browse files Browse the repository at this point in the history
…ions

rgw: inject tls certs for bucket notification and topic operations
  • Loading branch information
BlaineEXE committed Jan 24, 2022
2 parents 16f8467 + a97747c commit 18c1397
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 43 deletions.
33 changes: 16 additions & 17 deletions pkg/operator/ceph/object/notification/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package notification

import (
"context"
"net/http"

"github.com/aws/aws-sdk-go/service/s3"
"github.com/ceph/go-ceph/rgw/admin"
Expand All @@ -44,26 +43,14 @@ type provisioner struct {
objectStoreName types.NamespacedName
}

func getUserCredentials(opManagerContext context.Context, username string, objStore *cephv1.CephObjectStore, objContext *object.Context) (accessKey string, secretKey string, err error) {
func getUserCredentials(adminOpsCtx *object.AdminOpsContext, opManagerContext context.Context, username string) (accessKey string, secretKey string, err error) {
if len(username) == 0 {
err = errors.New("no user name provided")
return
}

adminAccessKey, adminSecretKey, err := object.GetAdminOPSUserCredentials(objContext, &objStore.Spec)
if err != nil {
err = errors.Wrapf(err, "failed to get Ceph RGW admin ops user credentials when getting user %q", username)
return
}

adminOpsClient, err := admin.New(objContext.Endpoint, adminAccessKey, adminSecretKey, &http.Client{})
if err != nil {
err = errors.Wrapf(err, "failed to build admin ops API connection to get user %q", username)
return
}

var u admin.User
u, err = adminOpsClient.GetUser(opManagerContext, admin.User{ID: username})
u, err = adminOpsCtx.AdminOpsClient.GetUser(opManagerContext, admin.User{ID: username})
if err != nil {
err = errors.Wrapf(err, "failed to get ceph user %q", username)
return
Expand All @@ -88,12 +75,24 @@ func newS3Agent(p provisioner) (*object.S3Agent, error) {
// CephClusterSpec is needed for GetAdminOPSUserCredentials()
objContext.CephClusterSpec = *p.clusterSpec

accessKey, secretKey, err := getUserCredentials(p.opManagerContext, p.owner, objStore, objContext)
adminOpsCtx, err := object.NewMultisiteAdminOpsContext(objContext, &objStore.Spec)
if err != nil {
return nil, errors.Wrapf(err, "failed to get admin Ops context for CephObjectStore %q", p.objectStoreName)

}
accessKey, secretKey, err := getUserCredentials(adminOpsCtx, p.opManagerContext, p.owner)
if err != nil {
return nil, errors.Wrapf(err, "failed to get owner credentials for %q", p.owner)
}
tlsCert := make([]byte, 0)
if objStore.Spec.IsTLSEnabled() {
tlsCert, _, err = object.GetTlsCaCert(objContext, &objStore.Spec)
if err != nil {
return nil, errors.Wrap(err, "failed to fetch TLS certificate for the object store")
}
}

return object.NewS3Agent(accessKey, secretKey, objContext.Endpoint, objContext.ZoneGroup, logger.LevelAt(capnslog.DEBUG), objContext.Context.KubeConfig.CertData)
return object.NewS3Agent(accessKey, secretKey, objContext.Endpoint, objContext.ZoneGroup, logger.LevelAt(capnslog.DEBUG), tlsCert)
}

// TODO: convert all rules without restrictions once the AWS SDK supports that
Expand Down
6 changes: 5 additions & 1 deletion pkg/operator/ceph/object/topic/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ func createSNSClient(p provisioner, objectStoreName types.NamespacedName) (*sns.
}
tlsEnabled := objStore.Spec.IsTLSEnabled()
if tlsEnabled {
tlsCert := objContext.Context.KubeConfig.CertData
tlsCert, _, err := object.GetTlsCaCert(objContext, &objStore.Spec)
if err != nil {
return nil, errors.Wrap(err, "failed to get TLS certificate for the object store")
}
if len(tlsCert) > 0 {
client.Transport = object.BuildTransportTLS(tlsCert, false)
}
Expand All @@ -101,6 +104,7 @@ func createSNSClient(p provisioner, objectStoreName types.NamespacedName) (*sns.
WithEndpoint(objContext.Endpoint).
WithMaxRetries(3).
WithDisableSSL(!tlsEnabled).
WithHTTPClient(&client).
WithLogLevel(logLevel),
)
if err != nil {
Expand Down
20 changes: 12 additions & 8 deletions tests/framework/clients/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,22 @@ func (b *BucketOperation) CheckOBMaxObject(obcName, maxobject string) bool {
}

// Checks the bucket notifications set on RGW backend bucket
func (b *BucketOperation) CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, s3endpoint string) bool {
func (b *BucketOperation) CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName string, helper *TestClient, tlsEnabled bool) bool {
var s3client *rgw.S3Agent
s3AccessKey, _ := b.GetAccessKey(obcName)
s3SecretKey, _ := b.GetSecretKey(obcName)

//TODO : add TLS check
s3client, err := rgw.NewS3Agent(s3AccessKey, s3SecretKey, s3endpoint, "", true, nil)
var err error
s3endpoint, _ := helper.ObjectClient.GetEndPointUrl(namespace, storeName)
s3AccessKey, _ := helper.BucketClient.GetAccessKey(obcName)
s3SecretKey, _ := helper.BucketClient.GetSecretKey(obcName)
if tlsEnabled {
s3client, err = rgw.NewInsecureS3Agent(s3AccessKey, s3SecretKey, s3endpoint, "", true)
} else {
s3client, err = rgw.NewS3Agent(s3AccessKey, s3SecretKey, s3endpoint, "", true, nil)
}
if err != nil {
logger.Errorf("S3 client creation failed with error %v", err)
logger.Infof("failed to s3client due to %v", err)
return false
}

logger.Infof("endpoint (%s) Accesskey (%s) secret (%s)", s3endpoint, s3AccessKey, s3SecretKey)
notifications, err := s3client.Client.GetBucketNotificationConfiguration(&s3.GetBucketNotificationConfigurationRequest{
Bucket: &bucketname,
})
Expand Down
26 changes: 10 additions & 16 deletions tests/integration/ceph_bucket_notification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,20 @@ import (

"github.com/rook/rook/pkg/daemon/ceph/client"
rgw "github.com/rook/rook/pkg/operator/ceph/object"
"github.com/rook/rook/tests/framework/clients"
"github.com/rook/rook/tests/framework/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func (s *ObjectSuite) TestBucketNotifications() {
func testBucketNotifications(s suite.Suite, helper *clients.TestClient, k8sh *utils.K8sHelper, namespace, storeName string) {
if utils.IsPlatformOpenShift() {
s.T().Skip("bucket notification tests skipped on openshift")
}

objectStoreServicePrefix = objectStoreServicePrefixUniq
bucketNotificationLabelPrefix := "bucket-notification-"
storeName := "test-store-bucket-notification"
tlsEnable := false
namespace := s.settings.Namespace
obcNamespace := "default"
helper := s.helper
k8sh := s.k8sh
logger.Infof("Running on Rook Cluster %s", namespace)
createCephObjectStore(s.T(), helper, k8sh, namespace, storeName, 3, tlsEnable)

ctx := context.TODO()
clusterInfo := client.AdminTestClusterInfo(namespace)
Expand All @@ -57,7 +51,7 @@ func (s *ObjectSuite) TestBucketNotifications() {
notificationName := "my-notification"
topicName := "my-topic"
httpEndpointService := "my-notification-sink"
s3endpoint, _ := helper.ObjectClient.GetEndPointUrl(namespace, storeName)
logger.Infof("Testing Bucket Notifications on %s", storeName)

t.Run("create CephBucketTopic", func(t *testing.T) {
err := helper.TopicClient.CreateTopic(topicName, storeName, httpEndpointService)
Expand Down Expand Up @@ -111,7 +105,7 @@ func (s *ObjectSuite) TestBucketNotifications() {

t.Run("check CephBucketNotification created for bucket", func(t *testing.T) {
notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, s3endpoint)
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, helper, objectStore.Spec.IsTLSEnabled())
})
assert.True(t, notificationPresent)
logger.Info("CephBucketNotification created successfully on bucket")
Expand All @@ -127,7 +121,7 @@ func (s *ObjectSuite) TestBucketNotifications() {
notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
// TODO : add api to fetch all the notification from backend to see if it is unaffected
t.Skipped()
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, s3endpoint)
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, helper, objectStore.Spec.IsTLSEnabled())
})
assert.True(t, notificationPresent)
})
Expand All @@ -142,7 +136,7 @@ func (s *ObjectSuite) TestBucketNotifications() {
notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
// TODO : add api to fetch all the notification from backend to see if it is unaffected
t.Skipped()
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, s3endpoint)
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, helper, objectStore.Spec.IsTLSEnabled())
})
assert.True(t, notificationPresent)
})
Expand All @@ -158,7 +152,7 @@ func (s *ObjectSuite) TestBucketNotifications() {
// check whether existing bucket notification uneffected
var notificationPresent bool
for i := 0; i < 4; i++ {
notificationPresent = helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, s3endpoint)
notificationPresent = helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, helper, objectStore.Spec.IsTLSEnabled())
if !notificationPresent {
break
}
Expand Down Expand Up @@ -196,7 +190,7 @@ func (s *ObjectSuite) TestBucketNotifications() {
t.Run("new-notification should be configured for bucket", func(t *testing.T) {
// check whether bucket notification added
notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, newNotificationName, s3endpoint)
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, newNotificationName, helper, objectStore.Spec.IsTLSEnabled())
})
assert.True(t, notificationPresent)
})
Expand Down Expand Up @@ -269,7 +263,7 @@ func (s *ObjectSuite) TestBucketNotifications() {
t.Run("notification should be configured after creating the topic", func(t *testing.T) {
// check whether bucket notification added, should pass since topic got created
notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, reverseOBCName, reverseBucketName, reverseNotificationName, s3endpoint)
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, reverseOBCName, reverseBucketName, reverseNotificationName, helper, objectStore.Spec.IsTLSEnabled())
})
assert.True(t, notificationPresent)
})
Expand Down
8 changes: 7 additions & 1 deletion tests/integration/ceph_object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,18 @@ func runObjectE2ETest(helper *clients.TestClient, k8sh *utils.K8sHelper, s suite

// now test operation of the first object store
testObjectStoreOperations(s, helper, k8sh, namespace, storeName)

bucketNotificationTestStoreName := "bucket-notification-" + storeName
createCephObjectStore(s.T(), helper, k8sh, namespace, bucketNotificationTestStoreName, 1, tlsEnable)
testBucketNotifications(s, helper, k8sh, namespace, bucketNotificationTestStoreName)
}

func testObjectStoreOperations(s suite.Suite, helper *clients.TestClient, k8sh *utils.K8sHelper, namespace, storeName string) {
ctx := context.TODO()
clusterInfo := client.AdminTestClusterInfo(namespace)
t := s.T()

logger.Infof("Testing Object Operations on %s", storeName)
t.Run("create CephObjectStoreUser", func(t *testing.T) {
createCephObjectUser(s, helper, k8sh, namespace, storeName, userid, true, true)
i := 0
Expand Down Expand Up @@ -358,7 +363,8 @@ func testObjectStoreOperations(s suite.Suite, helper *clients.TestClient, k8sh *
assert.True(t, k8sh.CheckPodCountAndState("rook-ceph-mgr", namespace, 1, "Running"))
})

t.Run("CephObjectStore should delete now that dependents are gone", func(t *testing.T) {
// tests are complete, now delete the objectstore
s.T().Run("CephObjectStore should delete now that dependents are gone", func(t *testing.T) {
// wait initially since it will almost never detect on the first try without this.
time.Sleep(3 * time.Second)

Expand Down

0 comments on commit 18c1397

Please sign in to comment.