Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rgw: inject tls certs for bucket notification and topic operations #9565

Merged
merged 1 commit into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 16 additions & 17 deletions pkg/operator/ceph/object/notification/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package notification

import (
"context"
"net/http"

"github.com/aws/aws-sdk-go/service/s3"
"github.com/ceph/go-ceph/rgw/admin"
Expand All @@ -44,26 +43,14 @@ type provisioner struct {
objectStoreName types.NamespacedName
}

func getUserCredentials(opManagerContext context.Context, username string, objStore *cephv1.CephObjectStore, objContext *object.Context) (accessKey string, secretKey string, err error) {
func getUserCredentials(adminOpsCtx *object.AdminOpsContext, opManagerContext context.Context, username string) (accessKey string, secretKey string, err error) {
if len(username) == 0 {
err = errors.New("no user name provided")
return
}

adminAccessKey, adminSecretKey, err := object.GetAdminOPSUserCredentials(objContext, &objStore.Spec)
if err != nil {
err = errors.Wrapf(err, "failed to get Ceph RGW admin ops user credentials when getting user %q", username)
return
}

adminOpsClient, err := admin.New(objContext.Endpoint, adminAccessKey, adminSecretKey, &http.Client{})
if err != nil {
err = errors.Wrapf(err, "failed to build admin ops API connection to get user %q", username)
return
}

var u admin.User
u, err = adminOpsClient.GetUser(opManagerContext, admin.User{ID: username})
u, err = adminOpsCtx.AdminOpsClient.GetUser(opManagerContext, admin.User{ID: username})
if err != nil {
err = errors.Wrapf(err, "failed to get ceph user %q", username)
return
Expand All @@ -88,12 +75,24 @@ func newS3Agent(p provisioner) (*object.S3Agent, error) {
// CephClusterSpec is needed for GetAdminOPSUserCredentials()
objContext.CephClusterSpec = *p.clusterSpec

accessKey, secretKey, err := getUserCredentials(p.opManagerContext, p.owner, objStore, objContext)
adminOpsCtx, err := object.NewMultisiteAdminOpsContext(objContext, &objStore.Spec)
if err != nil {
return nil, errors.Wrapf(err, "failed to get admin Ops context for CephObjectStore %q", p.objectStoreName)

}
accessKey, secretKey, err := getUserCredentials(adminOpsCtx, p.opManagerContext, p.owner)
if err != nil {
return nil, errors.Wrapf(err, "failed to get owner credentials for %q", p.owner)
}
tlsCert := make([]byte, 0)
if objStore.Spec.IsTLSEnabled() {
tlsCert, _, err = object.GetTlsCaCert(objContext, &objStore.Spec)
if err != nil {
return nil, errors.Wrap(err, "failed to fetch TLS certificate for the object store")
}
}

return object.NewS3Agent(accessKey, secretKey, objContext.Endpoint, objContext.ZoneGroup, logger.LevelAt(capnslog.DEBUG), objContext.Context.KubeConfig.CertData)
return object.NewS3Agent(accessKey, secretKey, objContext.Endpoint, objContext.ZoneGroup, logger.LevelAt(capnslog.DEBUG), tlsCert)
}

// TODO: convert all rules without restrictions once the AWS SDK supports that
Expand Down
6 changes: 5 additions & 1 deletion pkg/operator/ceph/object/topic/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ func createSNSClient(p provisioner, objectStoreName types.NamespacedName) (*sns.
}
tlsEnabled := objStore.Spec.IsTLSEnabled()
if tlsEnabled {
tlsCert := objContext.Context.KubeConfig.CertData
tlsCert, _, err := object.GetTlsCaCert(objContext, &objStore.Spec)
if err != nil {
return nil, errors.Wrap(err, "failed to get TLS certificate for the object store")
}
if len(tlsCert) > 0 {
client.Transport = object.BuildTransportTLS(tlsCert, false)
}
Expand All @@ -101,6 +104,7 @@ func createSNSClient(p provisioner, objectStoreName types.NamespacedName) (*sns.
WithEndpoint(objContext.Endpoint).
WithMaxRetries(3).
WithDisableSSL(!tlsEnabled).
WithHTTPClient(&client).
thotz marked this conversation as resolved.
Show resolved Hide resolved
WithLogLevel(logLevel),
)
if err != nil {
Expand Down
20 changes: 12 additions & 8 deletions tests/framework/clients/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,22 @@ func (b *BucketOperation) CheckOBMaxObject(obcName, maxobject string) bool {
}

// Checks the bucket notifications set on RGW backend bucket
func (b *BucketOperation) CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, s3endpoint string) bool {
func (b *BucketOperation) CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName string, helper *TestClient, tlsEnabled bool) bool {
var s3client *rgw.S3Agent
s3AccessKey, _ := b.GetAccessKey(obcName)
s3SecretKey, _ := b.GetSecretKey(obcName)

//TODO : add TLS check
s3client, err := rgw.NewS3Agent(s3AccessKey, s3SecretKey, s3endpoint, "", true, nil)
var err error
s3endpoint, _ := helper.ObjectClient.GetEndPointUrl(namespace, storeName)
s3AccessKey, _ := helper.BucketClient.GetAccessKey(obcName)
s3SecretKey, _ := helper.BucketClient.GetSecretKey(obcName)
if tlsEnabled {
s3client, err = rgw.NewInsecureS3Agent(s3AccessKey, s3SecretKey, s3endpoint, "", true)
} else {
s3client, err = rgw.NewS3Agent(s3AccessKey, s3SecretKey, s3endpoint, "", true, nil)
}
if err != nil {
logger.Errorf("S3 client creation failed with error %v", err)
logger.Infof("failed to s3client due to %v", err)
return false
}

logger.Infof("endpoint (%s) Accesskey (%s) secret (%s)", s3endpoint, s3AccessKey, s3SecretKey)
notifications, err := s3client.Client.GetBucketNotificationConfiguration(&s3.GetBucketNotificationConfigurationRequest{
Bucket: &bucketname,
})
Expand Down
26 changes: 10 additions & 16 deletions tests/integration/ceph_bucket_notification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,20 @@ import (

"github.com/rook/rook/pkg/daemon/ceph/client"
rgw "github.com/rook/rook/pkg/operator/ceph/object"
"github.com/rook/rook/tests/framework/clients"
"github.com/rook/rook/tests/framework/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func (s *ObjectSuite) TestBucketNotifications() {
func testBucketNotifications(s suite.Suite, helper *clients.TestClient, k8sh *utils.K8sHelper, namespace, storeName string) {
if utils.IsPlatformOpenShift() {
s.T().Skip("bucket notification tests skipped on openshift")
}

objectStoreServicePrefix = objectStoreServicePrefixUniq
bucketNotificationLabelPrefix := "bucket-notification-"
storeName := "test-store-bucket-notification"
tlsEnable := false
namespace := s.settings.Namespace
obcNamespace := "default"
helper := s.helper
k8sh := s.k8sh
logger.Infof("Running on Rook Cluster %s", namespace)
createCephObjectStore(s.T(), helper, k8sh, namespace, storeName, 3, tlsEnable)

ctx := context.TODO()
clusterInfo := client.AdminTestClusterInfo(namespace)
Expand All @@ -57,7 +51,7 @@ func (s *ObjectSuite) TestBucketNotifications() {
notificationName := "my-notification"
topicName := "my-topic"
httpEndpointService := "my-notification-sink"
s3endpoint, _ := helper.ObjectClient.GetEndPointUrl(namespace, storeName)
logger.Infof("Testing Bucket Notifications on %s", storeName)

t.Run("create CephBucketTopic", func(t *testing.T) {
err := helper.TopicClient.CreateTopic(topicName, storeName, httpEndpointService)
Expand Down Expand Up @@ -111,7 +105,7 @@ func (s *ObjectSuite) TestBucketNotifications() {

t.Run("check CephBucketNotification created for bucket", func(t *testing.T) {
notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, s3endpoint)
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, helper, objectStore.Spec.IsTLSEnabled())
})
assert.True(t, notificationPresent)
logger.Info("CephBucketNotification created successfully on bucket")
Expand All @@ -127,7 +121,7 @@ func (s *ObjectSuite) TestBucketNotifications() {
notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
// TODO : add api to fetch all the notification from backend to see if it is unaffected
t.Skipped()
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, s3endpoint)
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, helper, objectStore.Spec.IsTLSEnabled())
})
assert.True(t, notificationPresent)
})
Expand All @@ -142,7 +136,7 @@ func (s *ObjectSuite) TestBucketNotifications() {
notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
// TODO : add api to fetch all the notification from backend to see if it is unaffected
t.Skipped()
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, s3endpoint)
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, helper, objectStore.Spec.IsTLSEnabled())
})
assert.True(t, notificationPresent)
})
Expand All @@ -158,7 +152,7 @@ func (s *ObjectSuite) TestBucketNotifications() {
// check whether existing bucket notification uneffected
var notificationPresent bool
for i := 0; i < 4; i++ {
notificationPresent = helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, s3endpoint)
notificationPresent = helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, notificationName, helper, objectStore.Spec.IsTLSEnabled())
if !notificationPresent {
break
}
Expand Down Expand Up @@ -196,7 +190,7 @@ func (s *ObjectSuite) TestBucketNotifications() {
t.Run("new-notification should be configured for bucket", func(t *testing.T) {
// check whether bucket notification added
notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, newNotificationName, s3endpoint)
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, obcName, bucketname, newNotificationName, helper, objectStore.Spec.IsTLSEnabled())
})
assert.True(t, notificationPresent)
})
Expand Down Expand Up @@ -269,7 +263,7 @@ func (s *ObjectSuite) TestBucketNotifications() {
t.Run("notification should be configured after creating the topic", func(t *testing.T) {
// check whether bucket notification added, should pass since topic got created
notificationPresent := utils.Retry(12, 2*time.Second, "notification is created for bucket", func() bool {
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, reverseOBCName, reverseBucketName, reverseNotificationName, s3endpoint)
return helper.BucketClient.CheckBucketNotificationSetonRGW(namespace, storeName, reverseOBCName, reverseBucketName, reverseNotificationName, helper, objectStore.Spec.IsTLSEnabled())
})
assert.True(t, notificationPresent)
})
Expand Down
8 changes: 7 additions & 1 deletion tests/integration/ceph_object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,18 @@ func runObjectE2ETest(helper *clients.TestClient, k8sh *utils.K8sHelper, s suite

// now test operation of the first object store
testObjectStoreOperations(s, helper, k8sh, namespace, storeName)

bucketNotificationTestStoreName := "bucket-notification-" + storeName
createCephObjectStore(s.T(), helper, k8sh, namespace, bucketNotificationTestStoreName, 1, tlsEnable)
testBucketNotifications(s, helper, k8sh, namespace, bucketNotificationTestStoreName)
}

func testObjectStoreOperations(s suite.Suite, helper *clients.TestClient, k8sh *utils.K8sHelper, namespace, storeName string) {
ctx := context.TODO()
clusterInfo := client.AdminTestClusterInfo(namespace)
t := s.T()

logger.Infof("Testing Object Operations on %s", storeName)
t.Run("create CephObjectStoreUser", func(t *testing.T) {
createCephObjectUser(s, helper, k8sh, namespace, storeName, userid, true, true)
i := 0
Expand Down Expand Up @@ -358,7 +363,8 @@ func testObjectStoreOperations(s suite.Suite, helper *clients.TestClient, k8sh *
assert.True(t, k8sh.CheckPodCountAndState("rook-ceph-mgr", namespace, 1, "Running"))
})

t.Run("CephObjectStore should delete now that dependents are gone", func(t *testing.T) {
// tests are complete, now delete the objectstore
s.T().Run("CephObjectStore should delete now that dependents are gone", func(t *testing.T) {
// wait initially since it will almost never detect on the first try without this.
time.Sleep(3 * time.Second)

Expand Down