Skip to content

Commit

Permalink
rgw: inject tls certs for bucket notification and topic operations
Browse files Browse the repository at this point in the history
The certs for accessing TLS enabled RGW is saved as secrets and inject
them if controllers for notification and topics if request is sent to
TLS enabled RGW endpoint.

Signed-off-by: Jiffin Tony Thottan <thottanjiffin@gmail.com>
  • Loading branch information
thotz committed Jan 20, 2022
1 parent d0d83ed commit 6e1f01e
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 43 deletions.
33 changes: 16 additions & 17 deletions pkg/operator/ceph/object/notification/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package notification

import (
"context"
"net/http"

"github.com/aws/aws-sdk-go/service/s3"
"github.com/ceph/go-ceph/rgw/admin"
Expand All @@ -44,26 +43,14 @@ type provisioner struct {
objectStoreName types.NamespacedName
}

func getUserCredentials(opManagerContext context.Context, username string, objStore *cephv1.CephObjectStore, objContext *object.Context) (accessKey string, secretKey string, err error) {
func getUserCredentials(adminOpsCtx *object.AdminOpsContext, opManagerContext context.Context, username string) (accessKey string, secretKey string, err error) {
if len(username) == 0 {
err = errors.New("no user name provided")
return
}

adminAccessKey, adminSecretKey, err := object.GetAdminOPSUserCredentials(objContext, &objStore.Spec)
if err != nil {
err = errors.Wrapf(err, "failed to get Ceph RGW admin ops user credentials when getting user %q", username)
return
}

adminOpsClient, err := admin.New(objContext.Endpoint, adminAccessKey, adminSecretKey, &http.Client{})
if err != nil {
err = errors.Wrapf(err, "failed to build admin ops API connection to get user %q", username)
return
}

var u admin.User
u, err = adminOpsClient.GetUser(opManagerContext, admin.User{ID: username})
u, err = adminOpsCtx.AdminOpsClient.GetUser(opManagerContext, admin.User{ID: username})
if err != nil {
err = errors.Wrapf(err, "failed to get ceph user %q", username)
return
Expand All @@ -88,12 +75,24 @@ func newS3Agent(p provisioner) (*object.S3Agent, error) {
// CephClusterSpec is needed for GetAdminOPSUserCredentials()
objContext.CephClusterSpec = *p.clusterSpec

accessKey, secretKey, err := getUserCredentials(p.opManagerContext, p.owner, objStore, objContext)
adminOpsCtx, err := object.NewMultisiteAdminOpsContext(objContext, &objStore.Spec)
if err != nil {
return nil, errors.Wrapf(err, "failed to get admin Ops context for CephObjectStore %v", p.objectStoreName)

}
accessKey, secretKey, err := getUserCredentials(adminOpsCtx, p.opManagerContext, p.owner)
if err != nil {
return nil, errors.Wrapf(err, "failed to get owner credentials for %q", p.owner)
}
tlsCert := make([]byte, 0)
if objStore.Spec.IsTLSEnabled() {
tlsCert, _, err = object.GetTlsCaCert(objContext, &objStore.Spec)
if err != nil {
return nil, errors.Wrap(err, "failed to fetch TLS certificate for the object store")
}
}

return object.NewS3Agent(accessKey, secretKey, objContext.Endpoint, objContext.ZoneGroup, logger.LevelAt(capnslog.DEBUG), objContext.Context.KubeConfig.CertData)
return object.NewS3Agent(accessKey, secretKey, objContext.Endpoint, objContext.ZoneGroup, logger.LevelAt(capnslog.DEBUG), tlsCert)
}

// TODO: convert all rules without restrictions once the AWS SDK supports that
Expand Down
6 changes: 5 additions & 1 deletion pkg/operator/ceph/object/topic/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ func createSNSClient(p provisioner, objectStoreName types.NamespacedName) (*sns.
}
tlsEnabled := objStore.Spec.IsTLSEnabled()
if tlsEnabled {
tlsCert := objContext.Context.KubeConfig.CertData
tlsCert, _, err := object.GetTlsCaCert(objContext, &objStore.Spec)
if err != nil {
return nil, errors.Wrap(err, "failed to get TLS certificate for the object store")
}
if len(tlsCert) > 0 {
client.Transport = object.BuildTransportTLS(tlsCert, false)
}
Expand All @@ -101,6 +104,7 @@ func createSNSClient(p provisioner, objectStoreName types.NamespacedName) (*sns.
WithEndpoint(objContext.Endpoint).
WithMaxRetries(3).
WithDisableSSL(!tlsEnabled).
WithHTTPClient(&client).
WithLogLevel(logLevel),
)
if err != nil {
Expand Down
20 changes: 12 additions & 8 deletions tests/framework/clients/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,22 @@ func (b *BucketOperation) CheckOBMaxObject(obcName, maxobject string) bool {
}

// Checks the bucket notifications set on RGW backend bucket
func (b *BucketOperation) CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, s3endpoint string) bool {
func (b *BucketOperation) CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName string, helper *TestClient, tlsEnabled bool) bool {
var s3client *rgw.S3Agent
s3AccessKey, _ := b.GetAccessKey(obcName)
s3SecretKey, _ := b.GetSecretKey(obcName)

//TODO : add TLS check
s3client, err := rgw.NewS3Agent(s3AccessKey, s3SecretKey, s3endpoint, "", true, nil)
var err error
s3endpoint, _ := helper.ObjectClient.GetEndPointUrl(namespace, storeName)
s3AccessKey, _ := helper.BucketClient.GetAccessKey(obcName)
s3SecretKey, _ := helper.BucketClient.GetSecretKey(obcName)
if tlsEnabled {
s3client, err = rgw.NewInsecureS3Agent(s3AccessKey, s3SecretKey, s3endpoint, "", true)
} else {
s3client, err = rgw.NewS3Agent(s3AccessKey, s3SecretKey, s3endpoint, "", true, nil)
}
if err != nil {
logger.Errorf("S3 client creation failed with error %v", err)
logger.Infof("failed to s3client due to %v", err)
return false
}

logger.Infof("endpoint (%s) Accesskey (%s) secret (%s)", s3endpoint, s3AccessKey, s3SecretKey)
notifications, err := s3client.Client.GetBucketNotificationConfiguration(&s3.GetBucketNotificationConfigurationRequest{
Bucket: &bucketname,
})
Expand Down
26 changes: 10 additions & 16 deletions tests/integration/ceph_bucket_notification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,20 @@ import (

"github.com/rook/rook/pkg/daemon/ceph/client"
rgw "github.com/rook/rook/pkg/operator/ceph/object"
"github.com/rook/rook/tests/framework/clients"
"github.com/rook/rook/tests/framework/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func (s *ObjectSuite) TestBucketNotifications() {
func testBucketNotifications(s suite.Suite, helper *clients.TestClient, k8sh *utils.K8sHelper, namespace, storeName string) {
if utils.IsPlatformOpenShift() {
s.T().Skip("bucket notification tests skipped on openshift")
}

objectStoreServicePrefix = objectStoreServicePrefixUniq
bucketNotificationLabelPrefix := "bucket-notification-"
storeName := "test-store-bucket-notification"
tlsEnable := false
namespace := s.settings.Namespace
obcNamespace := "default"
helper := s.helper
k8sh := s.k8sh
logger.Infof("Running on Rook Cluster %s", namespace)
createCephObjectStore(s.T(), helper, k8sh, namespace, storeName, 3, tlsEnable)

ctx := context.TODO()
clusterInfo := client.AdminTestClusterInfo(namespace)
Expand All @@ -57,7 +51,7 @@ func (s *ObjectSuite) TestBucketNotifications() {
notificationName := "my-notification"
topicName := "my-topic"
httpEndpointService := "my-notification-sink"
s3endpoint, _ := helper.ObjectClient.GetEndPointUrl(namespace, storeName)
logger.Infof("Testing Bucket Notifications on %s", storeName)

t.Run("create CephBucketTopic", func(t *testing.T) {
err := helper.TopicClient.CreateTopic(topicName, storeName, httpEndpointService)
Expand Down Expand Up @@ -111,7 +105,7 @@ func (s *ObjectSuite) TestBucketNotifications() {

t.Run("check CephBucketNotification created for bucket", func(t *testing.T) {
notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, s3endpoint)
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, helper, objectStore.Spec.IsTLSEnabled())
})
assert.True(t, notificationPresent)
logger.Info("CephBucketNotification created successfully on bucket")
Expand All @@ -127,7 +121,7 @@ func (s *ObjectSuite) TestBucketNotifications() {
notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
// TODO : add api to fetch all the notification from backend to see if it is unaffected
t.Skipped()
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, s3endpoint)
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, helper, objectStore.Spec.IsTLSEnabled())
})
assert.True(t, notificationPresent)
})
Expand All @@ -142,7 +136,7 @@ func (s *ObjectSuite) TestBucketNotifications() {
notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
// TODO : add api to fetch all the notification from backend to see if it is unaffected
t.Skipped()
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, s3endpoint)
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, helper, objectStore.Spec.IsTLSEnabled())
})
assert.True(t, notificationPresent)
})
Expand All @@ -158,7 +152,7 @@ func (s *ObjectSuite) TestBucketNotifications() {
// check whether existing bucket notification uneffected
var notificationPresent bool
for i := 0; i < 4; i++ {
notificationPresent = helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, s3endpoint)
notificationPresent = helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, helper, objectStore.Spec.IsTLSEnabled())
if !notificationPresent {
break
}
Expand Down Expand Up @@ -196,7 +190,7 @@ func (s *ObjectSuite) TestBucketNotifications() {
t.Run("new-notification should be configured for bucket", func(t *testing.T) {
// check whether bucket notification added
notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, newNotificationName, s3endpoint)
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, newNotificationName, helper, objectStore.Spec.IsTLSEnabled())
})
assert.True(t, notificationPresent)
})
Expand Down Expand Up @@ -269,7 +263,7 @@ func (s *ObjectSuite) TestBucketNotifications() {
t.Run("notification should be configured after creating the topic", func(t *testing.T) {
// check whether bucket notification added, should pass since topic got created
notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, reverseOBCName, reverseBucketName, reverseNotificationName, s3endpoint)
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, reverseOBCName, reverseBucketName, reverseNotificationName, helper, objectStore.Spec.IsTLSEnabled())
})
assert.True(t, notificationPresent)
})
Expand Down
8 changes: 7 additions & 1 deletion tests/integration/ceph_object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,18 @@ func runObjectE2ETest(helper *clients.TestClient, k8sh *utils.K8sHelper, s suite

// now test operation of the first object store
testObjectStoreOperations(s, helper, k8sh, namespace, storeName)

bucketNotificationTestStoreName := "bucket-notification-" + storeName
createCephObjectStore(s.T(), helper, k8sh, namespace, bucketNotificationTestStoreName, 1, tlsEnable)
testBucketNotifications(s, helper, k8sh, namespace, bucketNotificationTestStoreName)
}

func testObjectStoreOperations(s suite.Suite, helper *clients.TestClient, k8sh *utils.K8sHelper, namespace, storeName string) {
ctx := context.TODO()
clusterInfo := client.AdminTestClusterInfo(namespace)
t := s.T()

logger.Infof("Testing Object Operations on %s", storeName)
t.Run("create CephObjectStoreUser", func(t *testing.T) {
createCephObjectUser(s, helper, k8sh, namespace, storeName, userid, true, true)
i := 0
Expand Down Expand Up @@ -358,7 +363,8 @@ func testObjectStoreOperations(s suite.Suite, helper *clients.TestClient, k8sh *
assert.True(t, k8sh.CheckPodCountAndState("rook-ceph-mgr", namespace, 1, "Running"))
})

t.Run("CephObjectStore should delete now that dependents are gone", func(t *testing.T) {
// tests are complete, now delete the objectstore
s.T().Run("CephObjectStore should delete now that dependents are gone", func(t *testing.T) {
// wait initially since it will almost never detect on the first try without this.
time.Sleep(3 * time.Second)

Expand Down

0 comments on commit 6e1f01e

Please sign in to comment.