Skip to content
5 changes: 5 additions & 0 deletions pkg/api/v1/atlasdatabaseuser_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ func (p *AtlasDatabaseUser) ClearScopes() *AtlasDatabaseUser {
return p
}

func (p *AtlasDatabaseUser) WithDeleteAfterDate(date string) *AtlasDatabaseUser {
p.Spec.DeleteAfterDate = date
return p
}

func DefaultDBUser(namespace, username, projectName string) *AtlasDatabaseUser {
return NewDBUser(namespace, username, username, projectName).WithRole("clusterMonitor", "admin", "")
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
mdbv1 "github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1/status"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/atlas"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/connectionsecret"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/customresource"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/statushandler"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/watch"
Expand Down Expand Up @@ -171,24 +170,8 @@ func (r AtlasDatabaseUserReconciler) Delete(e event.DeleteEvent) error {

log.Infow("Started DatabaseUser deletion process in Atlas", "projectID", project.ID(), "userName", userName)

secrets, err := connectionsecret.ListByUserName(r.Client, dbUser.Namespace, project.ID(), userName)
if err != nil {
return fmt.Errorf("failed to find connection secrets for the user: %w", err)
}

for _, secret := range secrets {
// Solves the "Implicit memory aliasing in for loop" linter error
s := secret.DeepCopy()
err = r.Client.Delete(context.Background(), s)
if err != nil {
log.Errorf("Failed to remove connection Secret: %v", err)
} else {
log.Debugw("Removed connection Secret", "secret", kube.ObjectKeyFromObject(s))
}
}
if len(secrets) > 0 {
log.Infof("Removed %d connection secrets", len(secrets))
}
// We ignore the error as it will be printed by the function
_ = removeStaleSecretsByUserName(r.Client, project.ID(), userName, *dbUser, log)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to a single function as this code is reused by different places


return nil
}
33 changes: 21 additions & 12 deletions pkg/controller/atlasdatabaseuser/connectionsecrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"context"

"go.mongodb.org/atlas/mongodbatlas"
"go.uber.org/zap"
"sigs.k8s.io/controller-runtime/pkg/client"

mdbv1 "github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/connectionsecret"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/workflow"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/util/kube"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/util/stringutil"
)

Expand Down Expand Up @@ -70,8 +72,10 @@ func cleanupStaleSecrets(ctx *workflow.Context, k8sClient client.Client, project
if err := removeStaleByScope(ctx, k8sClient, projectID, user); err != nil {
return err
}
if err := removeStaleByUserName(ctx, k8sClient, projectID, user); err != nil {
return err
// Performing the cleanup of old secrets only if the username has changed
if user.Status.UserName != user.Spec.Username {
// Note, that we pass the username from the status, not from the spec
return removeStaleSecretsByUserName(k8sClient, projectID, user.Status.UserName, user, ctx.Log)
}
return nil
}
Expand Down Expand Up @@ -101,20 +105,25 @@ func removeStaleByScope(ctx *workflow.Context, k8sClient client.Client, projectI
return nil
}

// removeStaleByUserName removes the stale secrets when the database user name changes (as it's used as a part of Secret name)
func removeStaleByUserName(ctx *workflow.Context, k8sClient client.Client, projectID string, user mdbv1.AtlasDatabaseUser) error {
if user.Status.UserName == user.Spec.Username {
return nil
}
secrets, err := connectionsecret.ListByUserName(k8sClient, user.Namespace, projectID, user.Status.UserName)
// removeStaleSecretsByUserName removes the stale secrets when the database user name changes (as it's used as a part of Secret name)
func removeStaleSecretsByUserName(k8sClient client.Client, projectID, userName string, user mdbv1.AtlasDatabaseUser, log *zap.SugaredLogger) error {
secrets, err := connectionsecret.ListByUserName(k8sClient, user.Namespace, projectID, userName)
if err != nil {
return err
}
for i, s := range secrets {
var lastError error
removed := 0
for i := range secrets {
if err = k8sClient.Delete(context.Background(), &secrets[i]); err != nil {
return err
log.Errorf("Failed to remove connection Secret: %v", err)
lastError = err
} else {
log.Debugw("Removed connection Secret", "secret", kube.ObjectKeyFromObject(&secrets[i]))
removed++
}
ctx.Log.Debugw("Removed connection Secret as the database user name has changed", "secretname", s.Name)
}
return nil
if removed > 0 {
log.Infof("Removed %d connection secrets", removed)
}
return lastError
}
104 changes: 86 additions & 18 deletions pkg/controller/atlasdatabaseuser/databaseuser.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,101 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"go.mongodb.org/atlas/mongodbatlas"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

mdbv1 "github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1/status"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/atlas"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/workflow"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/util/compat"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/util/timeutil"
)

func (r *AtlasDatabaseUserReconciler) ensureDatabaseUser(ctx *workflow.Context, project mdbv1.AtlasProject, dbUser mdbv1.AtlasDatabaseUser) workflow.Result {
retryAfterUpdate := workflow.InProgress(workflow.DatabaseUserClustersAppliedChanges, "Clusters are scheduled to handle database users updates")

apiUser, err := dbUser.ToAtlas(r.Client)
if err != nil {
return workflow.Terminate(workflow.Internal, err.Error())
}
secret := &corev1.Secret{}
if err := r.Client.Get(context.Background(), *dbUser.PasswordSecretObjectKey(), secret); err != nil {
return workflow.Terminate(workflow.Internal, err.Error())

if result := checkUserExpired(ctx.Log, r.Client, project.ID(), dbUser); !result.IsOk() {
return result
}
currentPasswordResourceVersion := secret.ResourceVersion

if err = validateScopes(ctx, project.ID(), dbUser); err != nil {
return workflow.Terminate(workflow.DatabaseUserInvalidSpec, err.Error())
}

if result := performUpdateInAtlas(ctx, r.Client, project, dbUser, apiUser); !result.IsOk() {
return result
}

if result := checkClustersHaveReachedGoalState(ctx, project.ID(), dbUser); !result.IsOk() {
return result
}

if result := createOrUpdateConnectionSecrets(ctx, r.Client, project, dbUser); !result.IsOk() {
return result
}

// We need to remove the old Atlas User right after all the connection secrets are ensured if username has changed.
if result := handleUserNameChange(ctx, project.ID(), dbUser); !result.IsOk() {
return result
}

// We mark the status.Username only when everything is finished including connection secrets
ctx.EnsureStatusOption(status.AtlasDatabaseUserNameOption(dbUser.Spec.Username))

return workflow.OK()
}

func handleUserNameChange(ctx *workflow.Context, projectID string, dbUser mdbv1.AtlasDatabaseUser) workflow.Result {
if dbUser.Spec.Username != dbUser.Status.UserName && dbUser.Status.UserName != "" {
ctx.Log.Infow("'spec.username' has changed - removing the old user from Atlas", "newUserName", dbUser.Spec.Username, "oldUserName", dbUser.Status.UserName)

_, err := ctx.Client.DatabaseUsers.Delete(context.Background(), dbUser.Spec.DatabaseName, projectID, dbUser.Status.UserName)
if err != nil {
// There may be some rare errors due to the databaseName change or maybe the user has already been removed - this
// is not-critical (the stale connection secret has already been removed) and we shouldn't retry to avoid infinite retries
ctx.Log.Errorf("Failed to remove user %s from Atlas: %s", dbUser.Status.UserName, err)
}
}
return workflow.OK()
}

func checkUserExpired(log *zap.SugaredLogger, k8sClient client.Client, projectID string, dbUser mdbv1.AtlasDatabaseUser) workflow.Result {
if dbUser.Spec.DeleteAfterDate == "" {
return workflow.OK()
}

deleteAfter, err := timeutil.ParseISO8601(dbUser.Spec.DeleteAfterDate)
if err != nil {
return workflow.Terminate(workflow.DatabaseUserInvalidSpec, err.Error()).WithoutRetry()
}
if deleteAfter.Before(time.Now()) {
if err = removeStaleSecretsByUserName(k8sClient, projectID, dbUser.Spec.Username, dbUser, log); err != nil {
return workflow.Terminate(workflow.Internal, err.Error())
}
return workflow.Terminate(workflow.DatabaseUserExpired, "The database user is expired and has been removed from Atlas").WithoutRetry()
}
return workflow.OK()
}

func performUpdateInAtlas(ctx *workflow.Context, k8sClient client.Client, project mdbv1.AtlasProject, dbUser mdbv1.AtlasDatabaseUser, apiUser *mongodbatlas.DatabaseUser) workflow.Result {
secret := &corev1.Secret{}
if err := k8sClient.Get(context.Background(), *dbUser.PasswordSecretObjectKey(), secret); err != nil {
return workflow.Terminate(workflow.Internal, err.Error())
}
currentPasswordResourceVersion := secret.ResourceVersion

retryAfterUpdate := workflow.InProgress(workflow.DatabaseUserClustersAppliedChanges, "Clusters are scheduled to handle database users updates")

// Try to find the user
u, _, err := ctx.Client.DatabaseUsers.Get(context.Background(), dbUser.Spec.DatabaseName, project.ID(), dbUser.Spec.Username)
if err != nil {
Expand Down Expand Up @@ -66,18 +131,6 @@ func (r *AtlasDatabaseUserReconciler) ensureDatabaseUser(ctx *workflow.Context,
// after the successful update we'll retry reconciliation so that clusters had a chance to start working
return retryAfterUpdate
}

if result := checkClustersHaveReachedGoalState(ctx, project.ID(), dbUser); !result.IsOk() {
return result
}

if result := createOrUpdateConnectionSecrets(ctx, r.Client, project, dbUser); !result.IsOk() {
return result
}

// We mark the status.Username only when everything is finished including connection secrets
ctx.EnsureStatusOption(status.AtlasDatabaseUserNameOption(dbUser.Spec.Username))

return workflow.OK()
}

Expand Down Expand Up @@ -180,6 +233,21 @@ func userMatchesSpec(log *zap.SugaredLogger, atlasSpec *mongodbatlas.DatabaseUse
return false, err
}

// performing some normalization of dates
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is necessary to correctly compare strings as the date returned by Atlas may be different than the one submitted by the Atlas Operator

if atlasSpec.DeleteAfterDate != "" {
atlasDeleteDate, err := timeutil.ParseISO8601(atlasSpec.DeleteAfterDate)
if err != nil {
return false, err
}
atlasSpec.DeleteAfterDate = timeutil.FormatISO8601(atlasDeleteDate)
}
if operatorSpec.DeleteAfterDate != "" {
operatorDeleteDate, err := timeutil.ParseISO8601(operatorSpec.DeleteAfterDate)
if err != nil {
return false, err
}
userMerged.DeleteAfterDate = timeutil.FormatISO8601(operatorDeleteDate)
}
d := cmp.Diff(*atlasSpec, userMerged, cmpopts.EquateEmpty())
if d != "" {
log.Debugf("Users differs from spec: %s", d)
Expand Down
82 changes: 82 additions & 0 deletions pkg/controller/atlasdatabaseuser/databaseuser_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,30 @@
package atlasdatabaseuser

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.mongodb.org/atlas/mongodbatlas"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

mdbv1 "github.com/mongodb/mongodb-atlas-kubernetes/pkg/api/v1"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/controller/connectionsecret"
"github.com/mongodb/mongodb-atlas-kubernetes/pkg/util/kube"
)

func init() {
logger, _ := zap.NewDevelopment()
zap.ReplaceGlobals(logger)
}

func TestFilterScopeClusters(t *testing.T) {
scopeSpecs := []mdbv1.ScopeSpec{{
Name: "dbLake",
Expand All @@ -24,3 +40,69 @@ func TestFilterScopeClusters(t *testing.T) {
scopeClusters := filterScopeClusters(mdbv1.AtlasDatabaseUser{Spec: mdbv1.AtlasDatabaseUserSpec{Scopes: scopeSpecs}}, clusters)
assert.Equal(t, []string{"cluster1"}, scopeClusters)
}

func TestCheckUserExpired(t *testing.T) {
// Fake client
scheme := runtime.NewScheme()
utilruntime.Must(corev1.AddToScheme(scheme))
utilruntime.Must(mdbv1.AddToScheme(scheme))
fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build()

t.Run("Validate DeleteAfterDate", func(t *testing.T) {
result := checkUserExpired(zap.S(), fakeClient, "", *mdbv1.DefaultDBUser("ns", "theuser", "").WithDeleteAfterDate("foo"))
assert.False(t, result.IsOk())
assert.Equal(t, reconcile.Result{}, result.ReconcileResult())

result = checkUserExpired(zap.S(), fakeClient, "", *mdbv1.DefaultDBUser("ns", "theuser", "").WithDeleteAfterDate("2021/11/30T15:04:05"))
assert.False(t, result.IsOk())
})
t.Run("User Marked Expired", func(t *testing.T) {
data := dataForSecret()
// Create a connection secret
_, err := connectionsecret.Ensure(fakeClient, "testNs", "project1", "603e7bf38a94956835659ae5", "cluster1", data)
assert.NoError(t, err)
// The secret for the other project
_, err = connectionsecret.Ensure(fakeClient, "testNs", "project2", "dsfsdf234234sdfdsf23423", "cluster1", data)
assert.NoError(t, err)

before := time.Now().Add(time.Minute * -1).Format("2006-01-02T15:04:05")
result := checkUserExpired(zap.S(), fakeClient, "603e7bf38a94956835659ae5", *mdbv1.DefaultDBUser("testNs", data.DBUserName, "").WithDeleteAfterDate(before))
assert.False(t, result.IsOk())
assert.Equal(t, reconcile.Result{}, result.ReconcileResult())

// The secret has been removed
secret := corev1.Secret{}
secretName := fmt.Sprintf("%s-%s-%s", "project1", "cluster1", kube.NormalizeIdentifier(data.DBUserName))
err = fakeClient.Get(context.Background(), kube.ObjectKey("testNs", secretName), &secret)
assert.Error(t, err)

// The other secret still exists
secretName = fmt.Sprintf("%s-%s-%s", "project2", "cluster1", kube.NormalizeIdentifier(data.DBUserName))
err = fakeClient.Get(context.Background(), kube.ObjectKey("testNs", secretName), &secret)
assert.NoError(t, err)
})
t.Run("No expiration happened", func(t *testing.T) {
data := dataForSecret()
// Create a connection secret
_, err := connectionsecret.Ensure(fakeClient, "testNs", "project1", "603e7bf38a94956835659ae5", "cluster1", data)
assert.NoError(t, err)
after := time.Now().Add(time.Minute * 1).Format("2006-01-02T15:04:05")
result := checkUserExpired(zap.S(), fakeClient, "603e7bf38a94956835659ae5", *mdbv1.DefaultDBUser("testNs", data.DBUserName, "").WithDeleteAfterDate(after))
assert.True(t, result.IsOk())

// The secret is still there
secret := corev1.Secret{}
secretName := fmt.Sprintf("%s-%s-%s", "project1", "cluster1", kube.NormalizeIdentifier(data.DBUserName))
err = fakeClient.Get(context.Background(), kube.ObjectKey("testNs", secretName), &secret)
assert.NoError(t, err)
})
}

func dataForSecret() connectionsecret.ConnectionData {
return connectionsecret.ConnectionData{
DBUserName: "admin",
ConnURL: "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?authSource=admin",
SrvConnURL: "mongodb+srv://mongodb.example.com:27017/?authSource=admin",
Password: "m@gick%",
}
}
1 change: 1 addition & 0 deletions pkg/controller/workflow/reason.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ const (
DatabaseUserStaleConnectionSecrets ConditionReason = "DatabaseUserStaleConnectionSecrets"
DatabaseUserClustersAppliedChanges ConditionReason = "ClustersAppliedDatabaseUsersChanges"
DatabaseUserInvalidSpec ConditionReason = "DatabaseUserInvalidSpec"
DatabaseUserExpired ConditionReason = "DatabaseUserExpired"
)
2 changes: 2 additions & 0 deletions pkg/controller/workflow/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func (r Result) WithRetry(retry time.Duration) Result {
return r
}

// WithoutRetry indicates that no retry must happen after the reconciliation is over. This should usually be used
// in cases when retry won't fix the situation like when the spec is incorrect and requires the user to update it.
func (r Result) WithoutRetry() Result {
r.requeueAfter = -1
return r
Expand Down
14 changes: 14 additions & 0 deletions pkg/util/timeutil/timeutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,17 @@ func ParseISO8601(dateTime string) (time.Time, error) {
}
return parse, err
}

// MustParseISO8601 returns time or panics. Mostly needed for tests.
func MustParseISO8601(dateTime string) time.Time {
iso8601, err := ParseISO8601(dateTime)
if err != nil {
panic(err.Error())
}
return iso8601
}

// FormatISO8601 returns the ISO8601 string format for the dateTime.
func FormatISO8601(dateTime time.Time) string {
return dateTime.Format("2006-01-02T15:04:05.999Z")
}
Loading