From 8c07e683b58ab888f037ab0f62eb78b42f85e7d1 Mon Sep 17 00:00:00 2001 From: Julien Benhaim Date: Tue, 7 Oct 2025 18:21:49 +0200 Subject: [PATCH 1/6] Update doc From 19f9602fa84b1fcb02e851e0ecb09f7258060824 Mon Sep 17 00:00:00 2001 From: Julien Benhaim Date: Wed, 15 Oct 2025 22:26:50 +0200 Subject: [PATCH 2/6] Refactor state reading --- api/v1/mdb/mongodb_types.go | 3 ++ controllers/om/deployment/testing_utils.go | 2 +- .../operator/mongodbreplicaset_controller.go | 41 +++++++++++++++---- .../mongodbreplicaset_controller_test.go | 8 ++-- 4 files changed, 40 insertions(+), 14 deletions(-) diff --git a/api/v1/mdb/mongodb_types.go b/api/v1/mdb/mongodb_types.go index a009deaa3..199146431 100644 --- a/api/v1/mdb/mongodb_types.go +++ b/api/v1/mdb/mongodb_types.go @@ -243,6 +243,9 @@ func GetLastAdditionalMongodConfigByType(lastSpec *MongoDbSpec, configType Addit // GetLastAdditionalMongodConfigByType returns the last successfully achieved AdditionalMongodConfigType for the given component. func (m *MongoDB) GetLastAdditionalMongodConfigByType(configType AdditionalMongodConfigType) (*AdditionalMongodConfig, error) { + if m.Spec.GetResourceType() == ReplicaSet { + panic(errors.Errorf("this method cannot be used from ReplicaSet controller; use non-method GetLastAdditionalMongodConfigByType and pass lastSpec from the deployment state.")) + } if m.Spec.GetResourceType() == ShardedCluster { panic(errors.Errorf("this method cannot be used from ShardedCluster controller; use non-method GetLastAdditionalMongodConfigByType and pass lastSpec from the deployment state.")) } diff --git a/controllers/om/deployment/testing_utils.go b/controllers/om/deployment/testing_utils.go index d62a1aca2..79f0ebf82 100644 --- a/controllers/om/deployment/testing_utils.go +++ b/controllers/om/deployment/testing_utils.go @@ -26,7 +26,7 @@ func CreateFromReplicaSet(mongoDBImage string, forceEnterprise bool, rs *mdb.Mon ), zap.S()) d := om.NewDeployment() - lastConfig, err := rs.GetLastAdditionalMongodConfigByType(mdb.ReplicaSetConfig) + lastConfig, err := mdb.GetLastAdditionalMongodConfigByType(nil, mdb.ReplicaSetConfig) if err != nil { panic(err) } diff --git a/controllers/operator/mongodbreplicaset_controller.go b/controllers/operator/mongodbreplicaset_controller.go index bb30016a9..cd433238a 100644 --- a/controllers/operator/mongodbreplicaset_controller.go +++ b/controllers/operator/mongodbreplicaset_controller.go @@ -71,13 +71,36 @@ type ReconcileMongoDbReplicaSet struct { imageUrls images.ImageUrls forceEnterprise bool enableClusterMongoDBRoles bool + deploymentState *ReplicaSetDeploymentState initDatabaseNonStaticImageVersion string databaseNonStaticImageVersion string } +type ReplicaSetDeploymentState struct { + LastAchievedSpec *mdbv1.MongoDbSpec `json:"lastAchievedSpec"` +} + +func readState(rs *mdbv1.MongoDB) (*ReplicaSetDeploymentState, error) { + // Try to get the last achieved spec from annotations and store it in state + if lastAchievedSpec, err := rs.GetLastSpec(); err != nil { + return nil, err + } else { + return &ReplicaSetDeploymentState{LastAchievedSpec: lastAchievedSpec}, nil + } +} + var _ reconcile.Reconciler = &ReconcileMongoDbReplicaSet{} +func (r *ReconcileMongoDbReplicaSet) initializeState(rs *mdbv1.MongoDB) error { + if state, err := readState(rs); err == nil { + r.deploymentState = state + return nil + } else { + return err + } +} + func newReplicaSetReconciler(ctx context.Context, kubeClient client.Client, imageUrls images.ImageUrls, initDatabaseNonStaticImageVersion, databaseNonStaticImageVersion string, forceEnterprise bool, enableClusterMongoDBRoles bool, omFunc om.ConnectionFactory) *ReconcileMongoDbReplicaSet { return &ReconcileMongoDbReplicaSet{ ReconcileCommonController: NewReconcileCommonController(ctx, kubeClient), @@ -128,6 +151,10 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco return reconcileResult, err } + if err := r.initializeState(rs); err != nil { + return r.updateStatus(ctx, rs, workflow.Failed(xerrors.Errorf("Failed to initialize replica set state: %w", err)), log) + } + if !architectures.IsRunningStaticArchitecture(rs.Annotations) { agents.UpgradeAllIfNeeded(ctx, agents.ClientSecret{Client: r.client, SecretClient: r.SecretClient}, r.omConnectionFactory, GetWatchedNamespace(), false) } @@ -238,12 +265,8 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco } // 5. Actual reconciliation execution, Ops Manager and kubernetes resources update - lastSpec, err := rs.GetLastSpec() - if err != nil { - lastSpec = &mdbv1.MongoDbSpec{} - } - publishAutomationConfigFirst := publishAutomationConfigFirstRS(ctx, r.client, *rs, lastSpec, deploymentOpts.currentAgentAuthMode, projectConfig.SSLMMSCAConfigMap, log) + publishAutomationConfigFirst := publishAutomationConfigFirstRS(ctx, r.client, *rs, r.deploymentState.LastAchievedSpec, deploymentOpts.currentAgentAuthMode, projectConfig.SSLMMSCAConfigMap, log) status := workflow.RunInGivenOrder(publishAutomationConfigFirst, func() workflow.Status { return r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, rs, log, tlsCertPath, internalClusterCertPath, deploymentOpts, shouldMirrorKeyfileForMongot, false).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):") @@ -580,7 +603,7 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c caFilePath := fmt.Sprintf("%s/ca-pem", util.TLSCaMountPath) // If current operation is to Disable TLS, then we should the current members of the Replica Set, // this is, do not scale them up or down util TLS disabling has completed. - shouldLockMembers, err := updateOmDeploymentDisableTLSConfiguration(conn, r.imageUrls[mcoConstruct.MongodbImageEnv], r.forceEnterprise, membersNumberBefore, rs, log, caFilePath, tlsCertPath) + shouldLockMembers, err := updateOmDeploymentDisableTLSConfiguration(conn, r.imageUrls[mcoConstruct.MongodbImageEnv], r.forceEnterprise, membersNumberBefore, rs, caFilePath, tlsCertPath, r.deploymentState.LastAchievedSpec, log) if err != nil && !isRecovering { return workflow.Failed(err) } @@ -611,7 +634,7 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c return status } - lastRsConfig, err := rs.GetLastAdditionalMongodConfigByType(mdbv1.ReplicaSetConfig) + lastRsConfig, err := mdbv1.GetLastAdditionalMongodConfigByType(r.deploymentState.LastAchievedSpec, mdbv1.ReplicaSetConfig) if err != nil && !isRecovering { return workflow.Failed(err) } @@ -671,7 +694,7 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c // updateOmDeploymentDisableTLSConfiguration checks if TLS configuration needs // to be disabled. In which case it will disable it and inform to the calling // function. -func updateOmDeploymentDisableTLSConfiguration(conn om.Connection, mongoDBImage string, forceEnterprise bool, membersNumberBefore int, rs *mdbv1.MongoDB, log *zap.SugaredLogger, caFilePath, tlsCertPath string) (bool, error) { +func updateOmDeploymentDisableTLSConfiguration(conn om.Connection, mongoDBImage string, forceEnterprise bool, membersNumberBefore int, rs *mdbv1.MongoDB, caFilePath, tlsCertPath string, lastSpec *mdbv1.MongoDbSpec, log *zap.SugaredLogger) (bool, error) { tlsConfigWasDisabled := false err := conn.ReadUpdateDeployment( @@ -687,7 +710,7 @@ func updateOmDeploymentDisableTLSConfiguration(conn om.Connection, mongoDBImage // there's a scale up change at the same time). replicaSet := replicaset.BuildFromMongoDBWithReplicas(mongoDBImage, forceEnterprise, rs, membersNumberBefore, rs.CalculateFeatureCompatibilityVersion(), tlsCertPath) - lastConfig, err := rs.GetLastAdditionalMongodConfigByType(mdbv1.ReplicaSetConfig) + lastConfig, err := mdbv1.GetLastAdditionalMongodConfigByType(lastSpec, mdbv1.ReplicaSetConfig) if err != nil { return err } diff --git a/controllers/operator/mongodbreplicaset_controller_test.go b/controllers/operator/mongodbreplicaset_controller_test.go index 787883ffb..dfda4efc9 100644 --- a/controllers/operator/mongodbreplicaset_controller_test.go +++ b/controllers/operator/mongodbreplicaset_controller_test.go @@ -398,22 +398,22 @@ func TestUpdateDeploymentTLSConfiguration(t *testing.T) { deploymentNoTLS := deployment.CreateFromReplicaSet("fake-mongoDBImage", false, rsNoTLS) // TLS Disabled -> TLS Disabled - shouldLockMembers, err := updateOmDeploymentDisableTLSConfiguration(om.NewMockedOmConnection(deploymentNoTLS), "fake-mongoDBImage", false, 3, rsNoTLS, zap.S(), util.CAFilePathInContainer, "") + shouldLockMembers, err := updateOmDeploymentDisableTLSConfiguration(om.NewMockedOmConnection(deploymentNoTLS), "fake-mongoDBImage", false, 3, rsNoTLS, util.CAFilePathInContainer, "", nil, zap.S()) assert.NoError(t, err) assert.False(t, shouldLockMembers) // TLS Disabled -> TLS Enabled - shouldLockMembers, err = updateOmDeploymentDisableTLSConfiguration(om.NewMockedOmConnection(deploymentNoTLS), "fake-mongoDBImage", false, 3, rsWithTLS, zap.S(), util.CAFilePathInContainer, "") + shouldLockMembers, err = updateOmDeploymentDisableTLSConfiguration(om.NewMockedOmConnection(deploymentNoTLS), "fake-mongoDBImage", false, 3, rsWithTLS, util.CAFilePathInContainer, "", nil, zap.S()) assert.NoError(t, err) assert.False(t, shouldLockMembers) // TLS Enabled -> TLS Enabled - shouldLockMembers, err = updateOmDeploymentDisableTLSConfiguration(om.NewMockedOmConnection(deploymentWithTLS), "fake-mongoDBImage", false, 3, rsWithTLS, zap.S(), util.CAFilePathInContainer, "") + shouldLockMembers, err = updateOmDeploymentDisableTLSConfiguration(om.NewMockedOmConnection(deploymentWithTLS), "fake-mongoDBImage", false, 3, rsWithTLS, util.CAFilePathInContainer, "", nil, zap.S()) assert.NoError(t, err) assert.False(t, shouldLockMembers) // TLS Enabled -> TLS Disabled - shouldLockMembers, err = updateOmDeploymentDisableTLSConfiguration(om.NewMockedOmConnection(deploymentWithTLS), "fake-mongoDBImage", false, 3, rsNoTLS, zap.S(), util.CAFilePathInContainer, "") + shouldLockMembers, err = updateOmDeploymentDisableTLSConfiguration(om.NewMockedOmConnection(deploymentWithTLS), "fake-mongoDBImage", false, 3, rsNoTLS, util.CAFilePathInContainer, "", nil, zap.S()) assert.NoError(t, err) assert.True(t, shouldLockMembers) } From 0b70984e8bf1c2be0d61a22f3b6cadffbf615098 Mon Sep 17 00:00:00 2001 From: Julien Benhaim Date: Fri, 17 Oct 2025 21:51:25 +0200 Subject: [PATCH 3/6] Refactor state writing --- .../operator/mongodbreplicaset_controller.go | 82 +++++++++++++------ 1 file changed, 56 insertions(+), 26 deletions(-) diff --git a/controllers/operator/mongodbreplicaset_controller.go b/controllers/operator/mongodbreplicaset_controller.go index cd433238a..4e4b010f9 100644 --- a/controllers/operator/mongodbreplicaset_controller.go +++ b/controllers/operator/mongodbreplicaset_controller.go @@ -81,7 +81,10 @@ type ReplicaSetDeploymentState struct { LastAchievedSpec *mdbv1.MongoDbSpec `json:"lastAchievedSpec"` } -func readState(rs *mdbv1.MongoDB) (*ReplicaSetDeploymentState, error) { +var _ reconcile.Reconciler = &ReconcileMongoDbReplicaSet{} + +// readState abstract reading the state of the resource that we store on the cluster between reconciliations. +func (r *ReconcileMongoDbReplicaSet) readState(rs *mdbv1.MongoDB) (*ReplicaSetDeploymentState, error) { // Try to get the last achieved spec from annotations and store it in state if lastAchievedSpec, err := rs.GetLastSpec(); err != nil { return nil, err @@ -90,10 +93,40 @@ func readState(rs *mdbv1.MongoDB) (*ReplicaSetDeploymentState, error) { } } -var _ reconcile.Reconciler = &ReconcileMongoDbReplicaSet{} +// writeState abstract writing the state of the resource that we store on the cluster between reconciliations. +func (r *ReconcileMongoDbReplicaSet) writeState(ctx context.Context, rs *mdbv1.MongoDB, log *zap.SugaredLogger) error { + // Serialize the state to annotations + annotationsToAdd, err := getAnnotationsForResource(rs) + if err != nil { + return err + } + + // Add vault annotations if needed + if vault.IsVaultSecretBackend() { + secrets := rs.GetSecretsMountedIntoDBPod() + vaultMap := make(map[string]string) + for _, s := range secrets { + path := fmt.Sprintf("%s/%s/%s", r.VaultClient.DatabaseSecretMetadataPath(), rs.Namespace, s) + vaultMap = merge.StringToStringMap(vaultMap, r.VaultClient.GetSecretAnnotation(path)) + } + path := fmt.Sprintf("%s/%s/%s", r.VaultClient.OperatorScretMetadataPath(), rs.Namespace, rs.Spec.Credentials) + vaultMap = merge.StringToStringMap(vaultMap, r.VaultClient.GetSecretAnnotation(path)) + for k, val := range vaultMap { + annotationsToAdd[k] = val + } + } + + // Write annotations back to the resource + if err := annotations.SetAnnotations(ctx, rs, annotationsToAdd, r.client); err != nil { + return err + } + + log.Debugf("Successfully wrote deployment state for ReplicaSet %s/%s", rs.Namespace, rs.Name) + return nil +} func (r *ReconcileMongoDbReplicaSet) initializeState(rs *mdbv1.MongoDB) error { - if state, err := readState(rs); err == nil { + if state, err := r.readState(rs); err == nil { r.deploymentState = state return nil } else { @@ -101,6 +134,23 @@ func (r *ReconcileMongoDbReplicaSet) initializeState(rs *mdbv1.MongoDB) error { } } +// updateStatus overrides the common controller's updateStatus to ensure that the deployment state +// is written after every status update. This ensures state consistency even on early returns. +// It must be executed only once per reconcile (with a return) +func (r *ReconcileMongoDbReplicaSet) updateStatus(ctx context.Context, resource *mdbv1.MongoDB, status workflow.Status, log *zap.SugaredLogger, statusOptions ...mdbstatus.Option) (reconcile.Result, error) { + result, err := r.ReconcileCommonController.updateStatus(ctx, resource, status, log, statusOptions...) + if err != nil { + return result, err + } + + // Write deployment state after every status update + if err := r.writeState(ctx, resource, log); err != nil { + return r.ReconcileCommonController.updateStatus(ctx, resource, workflow.Failed(xerrors.Errorf("Failed to write deployment state after updating status: %w", err)), log) + } + + return result, nil +} + func newReplicaSetReconciler(ctx context.Context, kubeClient client.Client, imageUrls images.ImageUrls, initDatabaseNonStaticImageVersion, databaseNonStaticImageVersion string, forceEnterprise bool, enableClusterMongoDBRoles bool, omFunc om.ConnectionFactory) *ReconcileMongoDbReplicaSet { return &ReconcileMongoDbReplicaSet{ ReconcileCommonController: NewReconcileCommonController(ctx, kubeClient), @@ -284,29 +334,6 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco return r.updateStatus(ctx, rs, workflow.Pending("Continuing scaling operation for ReplicaSet %s, desiredMembers=%d, currentMembers=%d", rs.ObjectKey(), rs.DesiredReplicas(), scale.ReplicasThisReconciliation(rs)), log, mdbstatus.MembersOption(rs)) } - annotationsToAdd, err := getAnnotationsForResource(rs) - if err != nil { - return r.updateStatus(ctx, rs, workflow.Failed(err), log) - } - - if vault.IsVaultSecretBackend() { - secrets := rs.GetSecretsMountedIntoDBPod() - vaultMap := make(map[string]string) - for _, s := range secrets { - path := fmt.Sprintf("%s/%s/%s", r.VaultClient.DatabaseSecretMetadataPath(), rs.Namespace, s) - vaultMap = merge.StringToStringMap(vaultMap, r.VaultClient.GetSecretAnnotation(path)) - } - path := fmt.Sprintf("%s/%s/%s", r.VaultClient.OperatorScretMetadataPath(), rs.Namespace, rs.Spec.Credentials) - vaultMap = merge.StringToStringMap(vaultMap, r.VaultClient.GetSecretAnnotation(path)) - for k, val := range vaultMap { - annotationsToAdd[k] = val - } - } - - if err := annotations.SetAnnotations(ctx, rs, annotationsToAdd, r.client); err != nil { - return r.updateStatus(ctx, rs, workflow.Failed(err), log) - } - log.Infof("Finished reconciliation for MongoDbReplicaSet! %s", completionMessage(conn.BaseURL(), conn.GroupID())) return r.updateStatus(ctx, rs, workflow.OK(), log, mdbstatus.NewBaseUrlOption(deployment.Link(conn.BaseURL(), conn.GroupID())), mdbstatus.MembersOption(rs), mdbstatus.NewPVCsStatusOptionEmptyStatus()) } @@ -443,6 +470,8 @@ func (r *ReconcileMongoDbReplicaSet) reconcileStatefulSet(ctx context.Context, r if !workflowStatus.IsOK() { return workflowStatus } + + // TODO: check if updatestatus usage is correct here if workflow.ContainsPVCOption(workflowStatus.StatusOptions()) { _, _ = r.updateStatus(ctx, rs, workflow.Pending(""), log, workflowStatus.StatusOptions()...) } @@ -683,6 +712,7 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c return workflow.Failed(err) } + //TODO: check if updateStatus usage is correct hee if status := r.ensureBackupConfigurationAndUpdateStatus(ctx, conn, rs, r.SecretClient, log); !status.IsOK() && !isRecovering { return status } From d03e578af7c0e98021b5303838dc6c75c7fd796e Mon Sep 17 00:00:00 2001 From: Julien Benhaim Date: Fri, 17 Oct 2025 22:27:37 +0200 Subject: [PATCH 4/6] Refactor to helper pattern --- .../operator/mongodbreplicaset_controller.go | 233 +++++++++++------- 1 file changed, 138 insertions(+), 95 deletions(-) diff --git a/controllers/operator/mongodbreplicaset_controller.go b/controllers/operator/mongodbreplicaset_controller.go index 4e4b010f9..929af5fcb 100644 --- a/controllers/operator/mongodbreplicaset_controller.go +++ b/controllers/operator/mongodbreplicaset_controller.go @@ -64,14 +64,15 @@ import ( "github.com/mongodb/mongodb-kubernetes/pkg/vault/vaultwatcher" ) -// ReconcileMongoDbReplicaSet reconciles a MongoDB with a type of ReplicaSet +// ReconcileMongoDbReplicaSet reconciles a MongoDB with a type of ReplicaSet. +// WARNING: do not put any mutable state into this struct. +// Controller runtime uses and shares a single instance of it. type ReconcileMongoDbReplicaSet struct { *ReconcileCommonController omConnectionFactory om.ConnectionFactory imageUrls images.ImageUrls forceEnterprise bool enableClusterMongoDBRoles bool - deploymentState *ReplicaSetDeploymentState initDatabaseNonStaticImageVersion string databaseNonStaticImageVersion string @@ -83,10 +84,37 @@ type ReplicaSetDeploymentState struct { var _ reconcile.Reconciler = &ReconcileMongoDbReplicaSet{} +// ReplicaSetReconcilerHelper contains state and logic for a SINGLE reconcile execution. +// This object is NOT shared between reconcile invocations. +type ReplicaSetReconcilerHelper struct { + resource *mdbv1.MongoDB + deploymentState *ReplicaSetDeploymentState + reconciler *ReconcileMongoDbReplicaSet + log *zap.SugaredLogger +} + +func (r *ReconcileMongoDbReplicaSet) newReconcilerHelper( + ctx context.Context, + rs *mdbv1.MongoDB, + log *zap.SugaredLogger, +) (*ReplicaSetReconcilerHelper, error) { + helper := &ReplicaSetReconcilerHelper{ + resource: rs, + reconciler: r, + log: log, + } + + if err := helper.initialize(ctx); err != nil { + return nil, err + } + + return helper, nil +} + // readState abstract reading the state of the resource that we store on the cluster between reconciliations. -func (r *ReconcileMongoDbReplicaSet) readState(rs *mdbv1.MongoDB) (*ReplicaSetDeploymentState, error) { +func (h *ReplicaSetReconcilerHelper) readState() (*ReplicaSetDeploymentState, error) { // Try to get the last achieved spec from annotations and store it in state - if lastAchievedSpec, err := rs.GetLastSpec(); err != nil { + if lastAchievedSpec, err := h.resource.GetLastSpec(); err != nil { return nil, err } else { return &ReplicaSetDeploymentState{LastAchievedSpec: lastAchievedSpec}, nil @@ -94,116 +122,70 @@ func (r *ReconcileMongoDbReplicaSet) readState(rs *mdbv1.MongoDB) (*ReplicaSetDe } // writeState abstract writing the state of the resource that we store on the cluster between reconciliations. -func (r *ReconcileMongoDbReplicaSet) writeState(ctx context.Context, rs *mdbv1.MongoDB, log *zap.SugaredLogger) error { +func (h *ReplicaSetReconcilerHelper) writeState(ctx context.Context) error { // Serialize the state to annotations - annotationsToAdd, err := getAnnotationsForResource(rs) + annotationsToAdd, err := getAnnotationsForResource(h.resource) if err != nil { return err } // Add vault annotations if needed if vault.IsVaultSecretBackend() { - secrets := rs.GetSecretsMountedIntoDBPod() + secrets := h.resource.GetSecretsMountedIntoDBPod() vaultMap := make(map[string]string) for _, s := range secrets { - path := fmt.Sprintf("%s/%s/%s", r.VaultClient.DatabaseSecretMetadataPath(), rs.Namespace, s) - vaultMap = merge.StringToStringMap(vaultMap, r.VaultClient.GetSecretAnnotation(path)) + path := fmt.Sprintf("%s/%s/%s", h.reconciler.VaultClient.DatabaseSecretMetadataPath(), h.resource.Namespace, s) + vaultMap = merge.StringToStringMap(vaultMap, h.reconciler.VaultClient.GetSecretAnnotation(path)) } - path := fmt.Sprintf("%s/%s/%s", r.VaultClient.OperatorScretMetadataPath(), rs.Namespace, rs.Spec.Credentials) - vaultMap = merge.StringToStringMap(vaultMap, r.VaultClient.GetSecretAnnotation(path)) + path := fmt.Sprintf("%s/%s/%s", h.reconciler.VaultClient.OperatorScretMetadataPath(), h.resource.Namespace, h.resource.Spec.Credentials) + vaultMap = merge.StringToStringMap(vaultMap, h.reconciler.VaultClient.GetSecretAnnotation(path)) for k, val := range vaultMap { annotationsToAdd[k] = val } } // Write annotations back to the resource - if err := annotations.SetAnnotations(ctx, rs, annotationsToAdd, r.client); err != nil { + if err := annotations.SetAnnotations(ctx, h.resource, annotationsToAdd, h.reconciler.client); err != nil { return err } - log.Debugf("Successfully wrote deployment state for ReplicaSet %s/%s", rs.Namespace, rs.Name) + h.log.Debugf("Successfully wrote deployment state for ReplicaSet %s/%s", h.resource.Namespace, h.resource.Name) return nil } -func (r *ReconcileMongoDbReplicaSet) initializeState(rs *mdbv1.MongoDB) error { - if state, err := r.readState(rs); err == nil { - r.deploymentState = state - return nil - } else { - return err +func (h *ReplicaSetReconcilerHelper) initialize(ctx context.Context) error { + state, err := h.readState() + if err != nil { + return xerrors.Errorf("Failed to initialize replica set state: %w", err) } + h.deploymentState = state + return nil } // updateStatus overrides the common controller's updateStatus to ensure that the deployment state // is written after every status update. This ensures state consistency even on early returns. // It must be executed only once per reconcile (with a return) -func (r *ReconcileMongoDbReplicaSet) updateStatus(ctx context.Context, resource *mdbv1.MongoDB, status workflow.Status, log *zap.SugaredLogger, statusOptions ...mdbstatus.Option) (reconcile.Result, error) { - result, err := r.ReconcileCommonController.updateStatus(ctx, resource, status, log, statusOptions...) +func (h *ReplicaSetReconcilerHelper) updateStatus(ctx context.Context, status workflow.Status, statusOptions ...mdbstatus.Option) (reconcile.Result, error) { + result, err := h.reconciler.ReconcileCommonController.updateStatus(ctx, h.resource, status, h.log, statusOptions...) if err != nil { return result, err } // Write deployment state after every status update - if err := r.writeState(ctx, resource, log); err != nil { - return r.ReconcileCommonController.updateStatus(ctx, resource, workflow.Failed(xerrors.Errorf("Failed to write deployment state after updating status: %w", err)), log) + if err := h.writeState(ctx); err != nil { + return h.reconciler.ReconcileCommonController.updateStatus(ctx, h.resource, workflow.Failed(xerrors.Errorf("Failed to write deployment state after updating status: %w", err)), h.log) } return result, nil } -func newReplicaSetReconciler(ctx context.Context, kubeClient client.Client, imageUrls images.ImageUrls, initDatabaseNonStaticImageVersion, databaseNonStaticImageVersion string, forceEnterprise bool, enableClusterMongoDBRoles bool, omFunc om.ConnectionFactory) *ReconcileMongoDbReplicaSet { - return &ReconcileMongoDbReplicaSet{ - ReconcileCommonController: NewReconcileCommonController(ctx, kubeClient), - omConnectionFactory: omFunc, - imageUrls: imageUrls, - forceEnterprise: forceEnterprise, - enableClusterMongoDBRoles: enableClusterMongoDBRoles, - - initDatabaseNonStaticImageVersion: initDatabaseNonStaticImageVersion, - databaseNonStaticImageVersion: databaseNonStaticImageVersion, - } -} - -type deploymentOptionsRS struct { - agentCertPath string - agentCertHash string - prometheusCertHash string - currentAgentAuthMode string -} - -// Generic Kubernetes Resources -// +kubebuilder:rbac:groups=core,resources=namespaces,verbs=list;watch,namespace=placeholder -// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch,namespace=placeholder -// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update,namespace=placeholder -// +kubebuilder:rbac:groups=core,resources={secrets,configmaps},verbs=get;list;watch;create;delete;update,namespace=placeholder -// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=create;get;list;watch;delete;update,namespace=placeholder - -// MongoDB Resource -// +kubebuilder:rbac:groups=mongodb.com,resources={mongodb,mongodb/status,mongodb/finalizers},verbs=*,namespace=placeholder - -// Setting up a webhook -// +kubebuilder:rbac:groups=admissionregistration.k8s.io,resources=validatingwebhookconfigurations,verbs=get;create;update;delete - -// Certificate generation -// +kubebuilder:rbac:groups=certificates.k8s.io,resources=certificatesigningrequests,verbs=get;create;list;watch - -// Reconcile reads that state of the cluster for a MongoDbReplicaSet object and makes changes based on the state read -// and what is in the MongoDbReplicaSet.Spec -func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reconcile.Request) (res reconcile.Result, e error) { - // === 1. Initial Checks and setup - log := zap.S().With("ReplicaSet", request.NamespacedName) - rs := &mdbv1.MongoDB{} - - if reconcileResult, err := r.prepareResourceForReconciliation(ctx, request, rs, log); err != nil { - if errors.IsNotFound(err) { - return workflow.Invalid("Object for reconciliation not found").ReconcileResult() - } - return reconcileResult, err - } - - if err := r.initializeState(rs); err != nil { - return r.updateStatus(ctx, rs, workflow.Failed(xerrors.Errorf("Failed to initialize replica set state: %w", err)), log) - } +// Reconcile performs the full reconciliation logic for a replica set. +// This is the main entry point for all reconciliation work and contains all +// state and logic specific to a single reconcile execution. +func (h *ReplicaSetReconcilerHelper) Reconcile(ctx context.Context) (reconcile.Result, error) { + rs := h.resource + log := h.log + r := h.reconciler if !architectures.IsRunningStaticArchitecture(rs.Annotations) { agents.UpgradeAllIfNeeded(ctx, agents.ClientSecret{Client: r.client, SecretClient: r.SecretClient}, r.omConnectionFactory, GetWatchedNamespace(), false) @@ -214,36 +196,36 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco log.Infow("ReplicaSet.Status", "status", rs.Status) if err := rs.ProcessValidationsOnReconcile(nil); err != nil { - return r.updateStatus(ctx, rs, workflow.Invalid("%s", err.Error()), log) + return h.updateStatus(ctx, workflow.Invalid("%s", err.Error())) } projectConfig, credsConfig, err := project.ReadConfigAndCredentials(ctx, r.client, r.SecretClient, rs, log) if err != nil { - return r.updateStatus(ctx, rs, workflow.Failed(err), log) + return h.updateStatus(ctx, workflow.Failed(err)) } conn, _, err := connection.PrepareOpsManagerConnection(ctx, r.SecretClient, projectConfig, credsConfig, r.omConnectionFactory, rs.Namespace, log) if err != nil { - return r.updateStatus(ctx, rs, workflow.Failed(xerrors.Errorf("Failed to prepare Ops Manager connection: %w", err)), log) + return h.updateStatus(ctx, workflow.Failed(xerrors.Errorf("Failed to prepare Ops Manager connection: %w", err))) } if status := ensureSupportedOpsManagerVersion(conn); status.Phase() != mdbstatus.PhaseRunning { - return r.updateStatus(ctx, rs, status, log) + return h.updateStatus(ctx, status) } r.SetupCommonWatchers(rs, nil, nil, rs.Name) reconcileResult := checkIfHasExcessProcesses(conn, rs.Name, log) if !reconcileResult.IsOK() { - return r.updateStatus(ctx, rs, reconcileResult, log) + return h.updateStatus(ctx, reconcileResult) } if status := validateMongoDBResource(rs, conn); !status.IsOK() { - return r.updateStatus(ctx, rs, status, log) + return h.updateStatus(ctx, status) } if status := controlledfeature.EnsureFeatureControls(*rs, conn, conn.OpsManagerVersion(), log); !status.IsOK() { - return r.updateStatus(ctx, rs, status, log) + return h.updateStatus(ctx, status) } // === 2. Auth and Certificates @@ -271,18 +253,18 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco prometheusCertHash, err := certs.EnsureTLSCertsForPrometheus(ctx, r.SecretClient, rs.GetNamespace(), rs.GetPrometheus(), certs.Database, log) if err != nil { - return r.updateStatus(ctx, rs, workflow.Failed(xerrors.Errorf("could not generate certificates for Prometheus: %w", err)), log) + return h.updateStatus(ctx, workflow.Failed(xerrors.Errorf("Could not generate certificates for Prometheus: %w", err))) } currentAgentAuthMode, err := conn.GetAgentAuthMode() if err != nil { - return r.updateStatus(ctx, rs, workflow.Failed(xerrors.Errorf("failed to get agent auth mode: %w", err)), log) + return h.updateStatus(ctx, workflow.Failed(xerrors.Errorf("failed to get agent auth mode: %w", err))) } // Check if we need to prepare for scale-down if scale.ReplicasThisReconciliation(rs) < rs.Status.Members { if err := replicaset.PrepareScaleDownFromMongoDB(conn, rs, log); err != nil { - return r.updateStatus(ctx, rs, workflow.Failed(xerrors.Errorf("Failed to prepare Replica Set for scaling down using Ops Manager: %w", err)), log) + return h.updateStatus(ctx, workflow.Failed(xerrors.Errorf("Failed to prepare Replica Set for scaling down using Ops Manager: %w", err))) } } deploymentOpts := deploymentOptionsRS{ @@ -304,7 +286,7 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco // See CLOUDP-189433 and CLOUDP-229222 for more details. if recovery.ShouldTriggerRecovery(rs.Status.Phase != mdbstatus.PhaseRunning, rs.Status.LastTransition) { log.Warnf("Triggering Automatic Recovery. The MongoDB resource %s/%s is in %s state since %s", rs.Namespace, rs.Name, rs.Status.Phase, rs.Status.LastTransition) - automationConfigStatus := r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, rs, log, tlsCertPath, internalClusterCertPath, deploymentOpts, shouldMirrorKeyfileForMongot, true).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):") + automationConfigStatus := r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, h, log, tlsCertPath, internalClusterCertPath, deploymentOpts, shouldMirrorKeyfileForMongot, true).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):") reconcileStatus := r.reconcileMemberResources(ctx, rs, conn, log, projectConfig, deploymentOpts) if !reconcileStatus.IsOK() { log.Errorf("Recovery failed because of reconcile errors, %v", reconcileStatus) @@ -316,26 +298,86 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco // 5. Actual reconciliation execution, Ops Manager and kubernetes resources update - publishAutomationConfigFirst := publishAutomationConfigFirstRS(ctx, r.client, *rs, r.deploymentState.LastAchievedSpec, deploymentOpts.currentAgentAuthMode, projectConfig.SSLMMSCAConfigMap, log) + publishAutomationConfigFirst := publishAutomationConfigFirstRS(ctx, r.client, *rs, h.deploymentState.LastAchievedSpec, deploymentOpts.currentAgentAuthMode, projectConfig.SSLMMSCAConfigMap, log) status := workflow.RunInGivenOrder(publishAutomationConfigFirst, func() workflow.Status { - return r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, rs, log, tlsCertPath, internalClusterCertPath, deploymentOpts, shouldMirrorKeyfileForMongot, false).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):") + return r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, h, log, tlsCertPath, internalClusterCertPath, deploymentOpts, shouldMirrorKeyfileForMongot, false).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):") }, func() workflow.Status { return r.reconcileMemberResources(ctx, rs, conn, log, projectConfig, deploymentOpts) }) if !status.IsOK() { - return r.updateStatus(ctx, rs, status, log) + return h.updateStatus(ctx, status) } // === 6. Final steps if scale.IsStillScaling(rs) { - return r.updateStatus(ctx, rs, workflow.Pending("Continuing scaling operation for ReplicaSet %s, desiredMembers=%d, currentMembers=%d", rs.ObjectKey(), rs.DesiredReplicas(), scale.ReplicasThisReconciliation(rs)), log, mdbstatus.MembersOption(rs)) + return h.updateStatus(ctx, workflow.Pending("Continuing scaling operation for ReplicaSet %s, desiredMembers=%d, currentMembers=%d", rs.ObjectKey(), rs.DesiredReplicas(), scale.ReplicasThisReconciliation(rs)), mdbstatus.MembersOption(rs)) } log.Infof("Finished reconciliation for MongoDbReplicaSet! %s", completionMessage(conn.BaseURL(), conn.GroupID())) - return r.updateStatus(ctx, rs, workflow.OK(), log, mdbstatus.NewBaseUrlOption(deployment.Link(conn.BaseURL(), conn.GroupID())), mdbstatus.MembersOption(rs), mdbstatus.NewPVCsStatusOptionEmptyStatus()) + return h.updateStatus(ctx, workflow.OK(), mdbstatus.NewBaseUrlOption(deployment.Link(conn.BaseURL(), conn.GroupID())), mdbstatus.MembersOption(rs), mdbstatus.NewPVCsStatusOptionEmptyStatus()) +} + +func newReplicaSetReconciler(ctx context.Context, kubeClient client.Client, imageUrls images.ImageUrls, initDatabaseNonStaticImageVersion, databaseNonStaticImageVersion string, forceEnterprise bool, enableClusterMongoDBRoles bool, omFunc om.ConnectionFactory) *ReconcileMongoDbReplicaSet { + return &ReconcileMongoDbReplicaSet{ + ReconcileCommonController: NewReconcileCommonController(ctx, kubeClient), + omConnectionFactory: omFunc, + imageUrls: imageUrls, + forceEnterprise: forceEnterprise, + enableClusterMongoDBRoles: enableClusterMongoDBRoles, + + initDatabaseNonStaticImageVersion: initDatabaseNonStaticImageVersion, + databaseNonStaticImageVersion: databaseNonStaticImageVersion, + } +} + +type deploymentOptionsRS struct { + agentCertPath string + agentCertHash string + prometheusCertHash string + currentAgentAuthMode string +} + +// Generic Kubernetes Resources +// +kubebuilder:rbac:groups=core,resources=namespaces,verbs=list;watch,namespace=placeholder +// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch,namespace=placeholder +// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update,namespace=placeholder +// +kubebuilder:rbac:groups=core,resources={secrets,configmaps},verbs=get;list;watch;create;delete;update,namespace=placeholder +// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=create;get;list;watch;delete;update,namespace=placeholder + +// MongoDB Resource +// +kubebuilder:rbac:groups=mongodb.com,resources={mongodb,mongodb/status,mongodb/finalizers},verbs=*,namespace=placeholder + +// Setting up a webhook +// +kubebuilder:rbac:groups=admissionregistration.k8s.io,resources=validatingwebhookconfigurations,verbs=get;create;update;delete + +// Certificate generation +// +kubebuilder:rbac:groups=certificates.k8s.io,resources=certificatesigningrequests,verbs=get;create;list;watch + +// Reconcile reads that state of the cluster for a MongoDbReplicaSet object and makes changes based on the state read +// and what is in the MongoDbReplicaSet.Spec +func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reconcile.Request) (res reconcile.Result, e error) { + // === 1. Initial Checks and setup + log := zap.S().With("ReplicaSet", request.NamespacedName) + rs := &mdbv1.MongoDB{} + + if reconcileResult, err := r.prepareResourceForReconciliation(ctx, request, rs, log); err != nil { + if errors.IsNotFound(err) { + return workflow.Invalid("Object for reconciliation not found").ReconcileResult() + } + return reconcileResult, err + } + + // Create helper for THIS reconciliation + helper, err := r.newReconcilerHelper(ctx, rs, log) + if err != nil { + return r.updateStatus(ctx, rs, workflow.Failed(err), log) + } + + // Delegate all reconciliation logic to helper + return helper.Reconcile(ctx) } func publishAutomationConfigFirstRS(ctx context.Context, getter kubernetesClient.Client, mdb mdbv1.MongoDB, lastSpec *mdbv1.MongoDbSpec, currentAgentAuthMode string, sslMMSCAConfigMap string, log *zap.SugaredLogger) bool { @@ -618,7 +660,8 @@ func AddReplicaSetController(ctx context.Context, mgr manager.Manager, imageUrls // updateOmDeploymentRs performs OM registration operation for the replicaset. So the changes will be finally propagated // to automation agents in containers -func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, conn om.Connection, membersNumberBefore int, rs *mdbv1.MongoDB, log *zap.SugaredLogger, tlsCertPath, internalClusterCertPath string, deploymentOptionsRS deploymentOptionsRS, shouldMirrorKeyfileForMongot bool, isRecovering bool) workflow.Status { +func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, conn om.Connection, membersNumberBefore int, helper *ReplicaSetReconcilerHelper, log *zap.SugaredLogger, tlsCertPath, internalClusterCertPath string, deploymentOptionsRS deploymentOptionsRS, shouldMirrorKeyfileForMongot bool, isRecovering bool) workflow.Status { + rs := helper.resource log.Debug("Entering UpdateOMDeployments") // Only "concrete" RS members should be observed // - if scaling down, let's observe only members that will remain after scale-down operation @@ -632,7 +675,7 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c caFilePath := fmt.Sprintf("%s/ca-pem", util.TLSCaMountPath) // If current operation is to Disable TLS, then we should the current members of the Replica Set, // this is, do not scale them up or down util TLS disabling has completed. - shouldLockMembers, err := updateOmDeploymentDisableTLSConfiguration(conn, r.imageUrls[mcoConstruct.MongodbImageEnv], r.forceEnterprise, membersNumberBefore, rs, caFilePath, tlsCertPath, r.deploymentState.LastAchievedSpec, log) + shouldLockMembers, err := updateOmDeploymentDisableTLSConfiguration(conn, r.imageUrls[mcoConstruct.MongodbImageEnv], r.forceEnterprise, membersNumberBefore, rs, caFilePath, tlsCertPath, helper.deploymentState.LastAchievedSpec, log) if err != nil && !isRecovering { return workflow.Failed(err) } @@ -663,7 +706,7 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c return status } - lastRsConfig, err := mdbv1.GetLastAdditionalMongodConfigByType(r.deploymentState.LastAchievedSpec, mdbv1.ReplicaSetConfig) + lastRsConfig, err := mdbv1.GetLastAdditionalMongodConfigByType(helper.deploymentState.LastAchievedSpec, mdbv1.ReplicaSetConfig) if err != nil && !isRecovering { return workflow.Failed(err) } From 063a68c7616f91080922a2267d3a7ef096121759 Mon Sep 17 00:00:00 2001 From: Julien Benhaim Date: Mon, 20 Oct 2025 13:41:23 +0200 Subject: [PATCH 5/6] Moved all methods to helper --- .../operator/mongodbreplicaset_controller.go | 82 ++++++++++++------- 1 file changed, 51 insertions(+), 31 deletions(-) diff --git a/controllers/operator/mongodbreplicaset_controller.go b/controllers/operator/mongodbreplicaset_controller.go index 929af5fcb..ea415e7ff 100644 --- a/controllers/operator/mongodbreplicaset_controller.go +++ b/controllers/operator/mongodbreplicaset_controller.go @@ -277,7 +277,7 @@ func (h *ReplicaSetReconcilerHelper) Reconcile(ctx context.Context) (reconcile.R // 3. Search Overrides // Apply search overrides early so searchCoordinator role is present before ensureRoles runs // This must happen before the ordering logic to ensure roles are synced regardless of order - shouldMirrorKeyfileForMongot := r.applySearchOverrides(ctx, rs, log) + shouldMirrorKeyfile := h.applySearchOverrides(ctx) // 4. Recovery @@ -286,8 +286,8 @@ func (h *ReplicaSetReconcilerHelper) Reconcile(ctx context.Context) (reconcile.R // See CLOUDP-189433 and CLOUDP-229222 for more details. if recovery.ShouldTriggerRecovery(rs.Status.Phase != mdbstatus.PhaseRunning, rs.Status.LastTransition) { log.Warnf("Triggering Automatic Recovery. The MongoDB resource %s/%s is in %s state since %s", rs.Namespace, rs.Name, rs.Status.Phase, rs.Status.LastTransition) - automationConfigStatus := r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, h, log, tlsCertPath, internalClusterCertPath, deploymentOpts, shouldMirrorKeyfileForMongot, true).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):") - reconcileStatus := r.reconcileMemberResources(ctx, rs, conn, log, projectConfig, deploymentOpts) + automationConfigStatus := h.updateOmDeploymentRs(ctx, conn, rs.Status.Members, tlsCertPath, internalClusterCertPath, deploymentOpts, shouldMirrorKeyfile, true).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):") + reconcileStatus := h.reconcileMemberResources(ctx, conn, projectConfig, deploymentOpts) if !reconcileStatus.IsOK() { log.Errorf("Recovery failed because of reconcile errors, %v", reconcileStatus) } @@ -301,10 +301,10 @@ func (h *ReplicaSetReconcilerHelper) Reconcile(ctx context.Context) (reconcile.R publishAutomationConfigFirst := publishAutomationConfigFirstRS(ctx, r.client, *rs, h.deploymentState.LastAchievedSpec, deploymentOpts.currentAgentAuthMode, projectConfig.SSLMMSCAConfigMap, log) status := workflow.RunInGivenOrder(publishAutomationConfigFirst, func() workflow.Status { - return r.updateOmDeploymentRs(ctx, conn, rs.Status.Members, h, log, tlsCertPath, internalClusterCertPath, deploymentOpts, shouldMirrorKeyfileForMongot, false).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):") + return h.updateOmDeploymentRs(ctx, conn, rs.Status.Members, tlsCertPath, internalClusterCertPath, deploymentOpts, shouldMirrorKeyfile, false).OnErrorPrepend("Failed to create/update (Ops Manager reconciliation phase):") }, func() workflow.Status { - return r.reconcileMemberResources(ctx, rs, conn, log, projectConfig, deploymentOpts) + return h.reconcileMemberResources(ctx, conn, projectConfig, deploymentOpts) }) if !status.IsOK() { @@ -469,9 +469,11 @@ func (r *ReconcileMongoDbReplicaSet) reconcileHostnameOverrideConfigMap(ctx cont // reconcileMemberResources handles the synchronization of kubernetes resources, which can be statefulsets, services etc. // All the resources required in the k8s cluster (as opposed to the automation config) for creating the replicaset // should be reconciled in this method. -func (r *ReconcileMongoDbReplicaSet) reconcileMemberResources(ctx context.Context, rs *mdbv1.MongoDB, conn om.Connection, - log *zap.SugaredLogger, projectConfig mdbv1.ProjectConfig, deploymentOptions deploymentOptionsRS, -) workflow.Status { +func (h *ReplicaSetReconcilerHelper) reconcileMemberResources(ctx context.Context, conn om.Connection, projectConfig mdbv1.ProjectConfig, deploymentOptions deploymentOptionsRS) workflow.Status { + rs := h.resource + r := h.reconciler + log := h.log + // Reconcile hostname override ConfigMap if err := r.reconcileHostnameOverrideConfigMap(ctx, log, r.client, *rs); err != nil { return workflow.Failed(xerrors.Errorf("Failed to reconcileHostnameOverrideConfigMap: %w", err)) @@ -482,12 +484,14 @@ func (r *ReconcileMongoDbReplicaSet) reconcileMemberResources(ctx context.Contex return status } - return r.reconcileStatefulSet(ctx, rs, log, conn, projectConfig, deploymentOptions) + return h.reconcileStatefulSet(ctx, conn, projectConfig, deploymentOptions) } -func (r *ReconcileMongoDbReplicaSet) reconcileStatefulSet(ctx context.Context, rs *mdbv1.MongoDB, - log *zap.SugaredLogger, conn om.Connection, projectConfig mdbv1.ProjectConfig, deploymentOptions deploymentOptionsRS, -) workflow.Status { +func (h *ReplicaSetReconcilerHelper) reconcileStatefulSet(ctx context.Context, conn om.Connection, projectConfig mdbv1.ProjectConfig, deploymentOptions deploymentOptionsRS) workflow.Status { + rs := h.resource + r := h.reconciler + log := h.log + certConfigurator := certs.ReplicaSetX509CertConfigurator{MongoDB: rs, SecretClient: r.SecretClient} status := r.ensureX509SecretAndCheckTLSType(ctx, certConfigurator, deploymentOptions.currentAgentAuthMode, log) if !status.IsOK() { @@ -500,7 +504,7 @@ func (r *ReconcileMongoDbReplicaSet) reconcileStatefulSet(ctx context.Context, r } // Build the replica set config - rsConfig, err := r.buildStatefulSetOptions(ctx, rs, conn, projectConfig, deploymentOptions.currentAgentAuthMode, deploymentOptions.prometheusCertHash, deploymentOptions.agentCertHash, log) + rsConfig, err := h.buildStatefulSetOptions(ctx, conn, projectConfig, deploymentOptions) if err != nil { return workflow.Failed(xerrors.Errorf("failed to build StatefulSet options: %w", err)) } @@ -533,7 +537,11 @@ func (r *ReconcileMongoDbReplicaSet) reconcileStatefulSet(ctx context.Context, r } // buildStatefulSetOptions creates the options needed for constructing the StatefulSet -func (r *ReconcileMongoDbReplicaSet) buildStatefulSetOptions(ctx context.Context, rs *mdbv1.MongoDB, conn om.Connection, projectConfig mdbv1.ProjectConfig, currentAgentAuthMode string, prometheusCertHash string, agentCertHash string, log *zap.SugaredLogger) (func(mdb mdbv1.MongoDB) construct.DatabaseStatefulSetOptions, error) { +func (h *ReplicaSetReconcilerHelper) buildStatefulSetOptions(ctx context.Context, conn om.Connection, projectConfig mdbv1.ProjectConfig, deploymentOptions deploymentOptionsRS) (func(mdb mdbv1.MongoDB) construct.DatabaseStatefulSetOptions, error) { + rs := h.resource + r := h.reconciler + log := h.log + rsCertsConfig := certs.ReplicaSetConfig(*rs) var vaultConfig vault.VaultConfiguration @@ -562,11 +570,11 @@ func (r *ReconcileMongoDbReplicaSet) buildStatefulSetOptions(ctx context.Context rsConfig := construct.ReplicaSetOptions( PodEnvVars(newPodVars(conn, projectConfig, rs.Spec.LogLevel)), - CurrentAgentAuthMechanism(currentAgentAuthMode), + CurrentAgentAuthMechanism(deploymentOptions.currentAgentAuthMode), CertificateHash(tlsCertHash), - AgentCertHash(agentCertHash), + AgentCertHash(deploymentOptions.agentCertHash), InternalClusterHash(internalClusterCertHash), - PrometheusTLSCertHash(prometheusCertHash), + PrometheusTLSCertHash(deploymentOptions.prometheusCertHash), WithVaultConfig(vaultConfig), WithLabels(rs.Labels), WithAdditionalMongodConfig(rs.Spec.GetAdditionalMongodConfig()), @@ -660,8 +668,10 @@ func AddReplicaSetController(ctx context.Context, mgr manager.Manager, imageUrls // updateOmDeploymentRs performs OM registration operation for the replicaset. So the changes will be finally propagated // to automation agents in containers -func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, conn om.Connection, membersNumberBefore int, helper *ReplicaSetReconcilerHelper, log *zap.SugaredLogger, tlsCertPath, internalClusterCertPath string, deploymentOptionsRS deploymentOptionsRS, shouldMirrorKeyfileForMongot bool, isRecovering bool) workflow.Status { - rs := helper.resource +func (h *ReplicaSetReconcilerHelper) updateOmDeploymentRs(ctx context.Context, conn om.Connection, membersNumberBefore int, tlsCertPath, internalClusterCertPath string, deploymentOpts deploymentOptionsRS, shouldMirrorKeyfile bool, isRecovering bool) workflow.Status { + rs := h.resource + log := h.log + r := h.reconciler log.Debug("Entering UpdateOMDeployments") // Only "concrete" RS members should be observed // - if scaling down, let's observe only members that will remain after scale-down operation @@ -675,7 +685,7 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c caFilePath := fmt.Sprintf("%s/ca-pem", util.TLSCaMountPath) // If current operation is to Disable TLS, then we should the current members of the Replica Set, // this is, do not scale them up or down util TLS disabling has completed. - shouldLockMembers, err := updateOmDeploymentDisableTLSConfiguration(conn, r.imageUrls[mcoConstruct.MongodbImageEnv], r.forceEnterprise, membersNumberBefore, rs, caFilePath, tlsCertPath, helper.deploymentState.LastAchievedSpec, log) + shouldLockMembers, err := updateOmDeploymentDisableTLSConfiguration(conn, r.imageUrls[mcoConstruct.MongodbImageEnv], r.forceEnterprise, membersNumberBefore, rs, caFilePath, tlsCertPath, h.deploymentState.LastAchievedSpec, log) if err != nil && !isRecovering { return workflow.Failed(err) } @@ -701,12 +711,12 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c replicaSet := replicaset.BuildFromMongoDBWithReplicas(r.imageUrls[mcoConstruct.MongodbImageEnv], r.forceEnterprise, rs, updatedMembers, rs.CalculateFeatureCompatibilityVersion(), tlsCertPath) processNames := replicaSet.GetProcessNames() - status, additionalReconciliationRequired := r.updateOmAuthentication(ctx, conn, processNames, rs, deploymentOptionsRS.agentCertPath, caFilePath, internalClusterCertPath, isRecovering, log) + status, additionalReconciliationRequired := r.updateOmAuthentication(ctx, conn, processNames, rs, deploymentOpts.agentCertPath, caFilePath, internalClusterCertPath, isRecovering, log) if !status.IsOK() && !isRecovering { return status } - lastRsConfig, err := mdbv1.GetLastAdditionalMongodConfigByType(helper.deploymentState.LastAchievedSpec, mdbv1.ReplicaSetConfig) + lastRsConfig, err := mdbv1.GetLastAdditionalMongodConfigByType(h.deploymentState.LastAchievedSpec, mdbv1.ReplicaSetConfig) if err != nil && !isRecovering { return workflow.Failed(err) } @@ -716,13 +726,13 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c conn: conn, secretsClient: r.SecretClient, namespace: rs.GetNamespace(), - prometheusCertHash: deploymentOptionsRS.prometheusCertHash, + prometheusCertHash: deploymentOpts.prometheusCertHash, } err = conn.ReadUpdateDeployment( func(d om.Deployment) error { - if shouldMirrorKeyfileForMongot { - if err := r.mirrorKeyfileIntoSecretForMongot(ctx, d, rs, log); err != nil { + if shouldMirrorKeyfile { + if err := h.mirrorKeyfileIntoSecretForMongot(ctx, d); err != nil { return err } } @@ -867,8 +877,11 @@ func getAllHostsForReplicas(rs *mdbv1.MongoDB, membersCount int) []string { return hostnames } -func (r *ReconcileMongoDbReplicaSet) applySearchOverrides(ctx context.Context, rs *mdbv1.MongoDB, log *zap.SugaredLogger) bool { - search := r.lookupCorrespondingSearchResource(ctx, rs, log) +func (h *ReplicaSetReconcilerHelper) applySearchOverrides(ctx context.Context) bool { + rs := h.resource + log := h.log + + search := h.lookupCorrespondingSearchResource(ctx) if search == nil { log.Debugf("No MongoDBSearch resource found, skipping search overrides") return false @@ -894,7 +907,11 @@ func (r *ReconcileMongoDbReplicaSet) applySearchOverrides(ctx context.Context, r return true } -func (r *ReconcileMongoDbReplicaSet) mirrorKeyfileIntoSecretForMongot(ctx context.Context, d om.Deployment, rs *mdbv1.MongoDB, log *zap.SugaredLogger) error { +func (h *ReplicaSetReconcilerHelper) mirrorKeyfileIntoSecretForMongot(ctx context.Context, d om.Deployment) error { + rs := h.resource + r := h.reconciler + log := h.log + keyfileContents := maputil.ReadMapValueAsString(d, "auth", "key") keyfileSecret := &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("%s-%s", rs.Name, searchcontroller.MongotKeyfileFilename), Namespace: rs.Namespace}} @@ -906,12 +923,15 @@ func (r *ReconcileMongoDbReplicaSet) mirrorKeyfileIntoSecretForMongot(ctx contex }) if err != nil { return xerrors.Errorf("Failed to mirror the replicaset's keyfile into a secret: %w", err) - } else { - return nil } + return nil } -func (r *ReconcileMongoDbReplicaSet) lookupCorrespondingSearchResource(ctx context.Context, rs *mdbv1.MongoDB, log *zap.SugaredLogger) *searchv1.MongoDBSearch { +func (h *ReplicaSetReconcilerHelper) lookupCorrespondingSearchResource(ctx context.Context) *searchv1.MongoDBSearch { + rs := h.resource + r := h.reconciler + log := h.log + var search *searchv1.MongoDBSearch searchList := &searchv1.MongoDBSearchList{} if err := r.client.List(ctx, searchList, &client.ListOptions{ From 95608cdae27e9b072bc171b8ec76bd3143dbb2e7 Mon Sep 17 00:00:00 2001 From: Julien Benhaim Date: Tue, 21 Oct 2025 10:14:00 +0200 Subject: [PATCH 6/6] Use helper pattern for reconcileHostnameOverrideConfigMap --- controllers/operator/mongodbreplicaset_controller.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/controllers/operator/mongodbreplicaset_controller.go b/controllers/operator/mongodbreplicaset_controller.go index ea415e7ff..378161e5c 100644 --- a/controllers/operator/mongodbreplicaset_controller.go +++ b/controllers/operator/mongodbreplicaset_controller.go @@ -195,6 +195,7 @@ func (h *ReplicaSetReconcilerHelper) Reconcile(ctx context.Context) (reconcile.R log.Infow("ReplicaSet.Spec", "spec", rs.Spec, "desiredReplicas", scale.ReplicasThisReconciliation(rs), "isScaling", scale.IsStillScaling(rs)) log.Infow("ReplicaSet.Status", "status", rs.Status) + // TODO: adapt validations to multi cluster if err := rs.ProcessValidationsOnReconcile(nil); err != nil { return h.updateStatus(ctx, workflow.Invalid("%s", err.Error())) } @@ -431,7 +432,7 @@ func publishAutomationConfigFirstRS(ctx context.Context, getter kubernetesClient return false } -func getHostnameOverrideConfigMapForReplicaset(mdb mdbv1.MongoDB) corev1.ConfigMap { +func getHostnameOverrideConfigMapForReplicaset(mdb *mdbv1.MongoDB) corev1.ConfigMap { data := make(map[string]string) if mdb.Spec.DbCommonSpec.GetExternalDomain() != nil { @@ -451,12 +452,12 @@ func getHostnameOverrideConfigMapForReplicaset(mdb mdbv1.MongoDB) corev1.ConfigM return cm } -func (r *ReconcileMongoDbReplicaSet) reconcileHostnameOverrideConfigMap(ctx context.Context, log *zap.SugaredLogger, getUpdateCreator configmap.GetUpdateCreator, mdb mdbv1.MongoDB) error { - if mdb.Spec.DbCommonSpec.GetExternalDomain() == nil { +func (h *ReplicaSetReconcilerHelper) reconcileHostnameOverrideConfigMap(ctx context.Context, log *zap.SugaredLogger, getUpdateCreator configmap.GetUpdateCreator) error { + if h.resource.Spec.DbCommonSpec.GetExternalDomain() == nil { return nil } - cm := getHostnameOverrideConfigMapForReplicaset(mdb) + cm := getHostnameOverrideConfigMapForReplicaset(h.resource) err := configmap.CreateOrUpdate(ctx, getUpdateCreator, cm) if err != nil && !errors.IsAlreadyExists(err) { return xerrors.Errorf("failed to create configmap: %s, err: %w", cm.Name, err) @@ -475,7 +476,7 @@ func (h *ReplicaSetReconcilerHelper) reconcileMemberResources(ctx context.Contex log := h.log // Reconcile hostname override ConfigMap - if err := r.reconcileHostnameOverrideConfigMap(ctx, log, r.client, *rs); err != nil { + if err := h.reconcileHostnameOverrideConfigMap(ctx, log, h.reconciler.client); err != nil { return workflow.Failed(xerrors.Errorf("Failed to reconcileHostnameOverrideConfigMap: %w", err)) } @@ -808,6 +809,7 @@ func updateOmDeploymentDisableTLSConfiguration(conn om.Connection, mongoDBImage return tlsConfigWasDisabled, err } +// TODO: split into subfunctions, follow helper pattern func (r *ReconcileMongoDbReplicaSet) OnDelete(ctx context.Context, obj runtime.Object, log *zap.SugaredLogger) error { rs := obj.(*mdbv1.MongoDB)