From db8a84b6cb6316d40eb7fd3e7ef53b9cf2fd84ec Mon Sep 17 00:00:00 2001 From: Mikalai Radchuk <509198+m1kola@users.noreply.github.com> Date: Fri, 7 Nov 2025 11:25:18 +0100 Subject: [PATCH 1/3] Fix linting --- .../mongodbreplicaset_controller_multi_test.go | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/controllers/operator/mongodbreplicaset_controller_multi_test.go b/controllers/operator/mongodbreplicaset_controller_multi_test.go index f97b6a861..795ff0782 100644 --- a/controllers/operator/mongodbreplicaset_controller_multi_test.go +++ b/controllers/operator/mongodbreplicaset_controller_multi_test.go @@ -97,22 +97,6 @@ func checkReplicaSetReconcileSuccessful( assert.NoError(t, err) } -// getReplicaSetMultiClusterMap simulates multiple K8s clusters using fake clients -func getReplicaSetMultiClusterMap(omConnectionFactory *om.CachedOMConnectionFactory) map[string]client.Client { - clientMap := make(map[string]client.Client) - - for _, clusterName := range multiClusters { - fakeClientBuilder := mock.NewEmptyFakeClientBuilder() - fakeClientBuilder.WithInterceptorFuncs(interceptor.Funcs{ - Get: mock.GetFakeClientInterceptorGetFunc(omConnectionFactory, true, true), - }) - - clientMap[clusterName] = kubernetesClient.NewClient(fakeClientBuilder.Build()) - } - - return clientMap -} - // TestReplicaSetMultiClusterScaling tests that multi-cluster ReplicaSets scale one member at a time // across all clusters, similar to single-cluster behavior. // From 6c9a36f5bdda1b1bcac5d6062cd937a20bae3252 Mon Sep 17 00:00:00 2001 From: Mikalai Radchuk <509198+m1kola@users.noreply.github.com> Date: Fri, 7 Nov 2025 16:52:29 +0100 Subject: [PATCH 2/3] Revert `GetMultiServiceFQDN` It was breaking a lot of unit end e2e tests. --- pkg/dns/dns.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/dns/dns.go b/pkg/dns/dns.go index 8872eff11..189a5a983 100644 --- a/pkg/dns/dns.go +++ b/pkg/dns/dns.go @@ -37,10 +37,7 @@ func GetMultiServiceFQDN(stsName string, namespace string, clusterNum int, podNu domain = strings.TrimPrefix(clusterDomain, ".") } - // For StatefulSet pods, DNS format is: ...svc. - podName := GetMultiPodName(stsName, clusterNum, podNum) - headlessService := GetMultiHeadlessServiceName(stsName, clusterNum) - return fmt.Sprintf("%s.%s.%s.svc.%s", podName, headlessService, namespace, domain) + return fmt.Sprintf("%s.%s.svc.%s", GetMultiServiceName(stsName, clusterNum, podNum), namespace, domain) } func GetMultiServiceExternalDomain(stsName, externalDomain string, clusterNum, podNum int) string { From ae73cbed7366eb918b2db6951affa6d032e9deb2 Mon Sep 17 00:00:00 2001 From: Mikalai Radchuk <509198+m1kola@users.noreply.github.com> Date: Tue, 18 Nov 2025 14:39:52 +0100 Subject: [PATCH 3/3] Create services for multi-cluster RS --- api/v1/mdb/mongodb_types.go | 10 + .../operator/mongodbreplicaset_controller.go | 172 +++++++++++++++++- 2 files changed, 181 insertions(+), 1 deletion(-) diff --git a/api/v1/mdb/mongodb_types.go b/api/v1/mdb/mongodb_types.go index 3ae61d881..be341896e 100644 --- a/api/v1/mdb/mongodb_types.go +++ b/api/v1/mdb/mongodb_types.go @@ -454,6 +454,16 @@ func (m *MongoDbSpec) GetExternalDomain() *string { return nil } +func (m *MongoDbSpec) GetExternalAccessConfigurationForMemberCluster(clusterName string) *ExternalAccessConfiguration { + for _, csl := range m.ClusterSpecList { + if csl.ClusterName == clusterName && csl.ExternalAccessConfiguration != nil { + return csl.ExternalAccessConfiguration + } + } + + return m.ExternalAccessConfiguration +} + // GetExternalDomainForMemberCluster returns the external domain for a specific member cluster. Falls back to the global // external domain if not found. func (m *MongoDbSpec) GetExternalDomainForMemberCluster(clusterName string) *string { diff --git a/controllers/operator/mongodbreplicaset_controller.go b/controllers/operator/mongodbreplicaset_controller.go index a5777f942..95d255be1 100644 --- a/controllers/operator/mongodbreplicaset_controller.go +++ b/controllers/operator/mongodbreplicaset_controller.go @@ -8,9 +8,12 @@ import ( "go.uber.org/zap" "golang.org/x/xerrors" "k8s.io/apimachinery/pkg/api/errors" + apiErrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -25,7 +28,9 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/mongodb/mongodb-kubernetes/api/v1/mdb" mdbv1 "github.com/mongodb/mongodb-kubernetes/api/v1/mdb" + omv1 "github.com/mongodb/mongodb-kubernetes/api/v1/om" rolev1 "github.com/mongodb/mongodb-kubernetes/api/v1/role" searchv1 "github.com/mongodb/mongodb-kubernetes/api/v1/search" mdbstatus "github.com/mongodb/mongodb-kubernetes/api/v1/status" @@ -38,6 +43,7 @@ import ( "github.com/mongodb/mongodb-kubernetes/controllers/operator/certs" "github.com/mongodb/mongodb-kubernetes/controllers/operator/connection" "github.com/mongodb/mongodb-kubernetes/controllers/operator/construct" + mconstruct "github.com/mongodb/mongodb-kubernetes/controllers/operator/construct/multicluster" "github.com/mongodb/mongodb-kubernetes/controllers/operator/construct/scalers" "github.com/mongodb/mongodb-kubernetes/controllers/operator/construct/scalers/interfaces" "github.com/mongodb/mongodb-kubernetes/controllers/operator/controlledfeature" @@ -54,11 +60,13 @@ import ( kubernetesClient "github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/kube/client" "github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/kube/configmap" "github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/kube/container" + "github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/kube/service" "github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/util/merge" "github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/util/scale" "github.com/mongodb/mongodb-kubernetes/pkg/dns" "github.com/mongodb/mongodb-kubernetes/pkg/images" "github.com/mongodb/mongodb-kubernetes/pkg/kube" + mekoService "github.com/mongodb/mongodb-kubernetes/pkg/kube/service" "github.com/mongodb/mongodb-kubernetes/pkg/multicluster" "github.com/mongodb/mongodb-kubernetes/pkg/statefulset" "github.com/mongodb/mongodb-kubernetes/pkg/util" @@ -711,6 +719,13 @@ func (r *ReplicaSetReconcilerHelper) reconcileMemberResources(ctx context.Contex reconciler := r.reconciler log := r.log + if r.resource.Spec.IsMultiCluster() { + err := r.reconcileServices(ctx) + if err != nil { + return workflow.Failed(err) + } + } + // Reconcile hostname override ConfigMap if err := r.reconcileHostnameOverrideConfigMap(ctx, log, r.reconciler.client); err != nil { return workflow.Failed(xerrors.Errorf("failed to reconcile hostname override ConfigMap: %w", err)) @@ -729,6 +744,160 @@ func (r *ReplicaSetReconcilerHelper) reconcileMemberResources(ctx context.Contex return r.reconcileStatefulSets(ctx, conn, projectConfig, deploymentOptions) } +// reconcileServices makes sure that we have a service object corresponding to each statefulset pod +// in the member clusters +func (r *ReplicaSetReconcilerHelper) reconcileServices(ctx context.Context) error { + for _, memberCluster := range r.MemberClusters { + if memberCluster.Replicas == 0 { + r.log.Warnf("skipping services creation: no members assigned to cluster %s", memberCluster.Name) + continue + } + + // ensure SRV service + srvService := mdbSRVService(r.resource) + if err := ensureSRVService(ctx, memberCluster.Client, srvService, memberCluster.Name); err != nil { + return err + } + r.log.Infof("Successfully created SRV service %s in cluster %s", srvService.Name, memberCluster.Name) + + // ensure Headless service + headlessServiceName := dns.GetMultiStatefulSetName(r.resource.Name, memberCluster.Index) + nameSpacedName := kube.ObjectKey(r.resource.Namespace, headlessServiceName) + headlessService := create.BuildService(nameSpacedName, r.resource, ptr.To(headlessServiceName), nil, r.resource.Spec.AdditionalMongodConfig.GetPortOrDefault(), omv1.MongoDBOpsManagerServiceDefinition{Type: corev1.ServiceTypeClusterIP}) + if err := ensureHeadlessService(ctx, memberCluster.Client, headlessService, memberCluster.Name); err != nil { + return err + } + r.log.Infof("Successfully created headless service %s in cluster: %s", headlessServiceName, memberCluster.Name) + } + + // by default, we would create the duplicate services + shouldCreateDuplicates := r.resource.Spec.DuplicateServiceObjects == nil || *r.resource.Spec.DuplicateServiceObjects + for _, memberCluster := range r.MemberClusters { + for _, clusterSpecItem := range r.getClusterSpecList() { + if !shouldCreateDuplicates && clusterSpecItem.ClusterName != memberCluster.Name { + // skip creating of other cluster's services (duplicates) in the current cluster + continue + } + + if err := mdbEnsureServices(ctx, r.resource, &memberCluster, clusterSpecItem, r.log); err != nil { + return err + } + } + } + + return nil +} + +// mdbEnsureServices creates pod services and/or external services. +// If externalAccess is defined (at spec or clusterSpecItem level) then we always create an external service. +// If externalDomain is defined then we DO NOT create pod services (service created for each pod selecting only 1 pod). +// When there are external domains used, we don't use internal pod-service FQDNs as hostnames at all, +// so there is no point in creating pod services. +// But when external domains are not used, then mongod process hostnames use pod service FQDN, and +// at the same time user might want to expose externally using external services. +func mdbEnsureServices(ctx context.Context, mdb *mdb.MongoDB, memberCluster *multicluster.MemberCluster, clusterSpecItem mdb.ClusterSpecItem, log *zap.SugaredLogger) error { + for podNum := 0; podNum < clusterSpecItem.Members; podNum++ { + var svc corev1.Service + if mdb.Spec.GetExternalAccessConfigurationForMemberCluster(clusterSpecItem.ClusterName) != nil { + svc = mdbExternalService(mdb, clusterSpecItem.ClusterName, memberCluster.Index, podNum) + externalDomain := mdb.Spec.GetExternalDomainForMemberCluster(clusterSpecItem.ClusterName) + placeholderReplacer := create.GetMultiClusterMongoDBPlaceholderReplacer(mdb.Name, mdb.Name, mdb.Namespace, clusterSpecItem.ClusterName, memberCluster.Index, externalDomain, mdb.Spec.ClusterDomain, podNum) + if processedAnnotations, replacedFlag, err := placeholderReplacer.ProcessMap(svc.Annotations); err != nil { + return xerrors.Errorf("failed to process annotations in external service %s in cluster %s: %w", svc.Name, memberCluster.Name, err) + } else if replacedFlag { + log.Debugf("Replaced placeholders in annotations in external service %s in cluster: %s. Annotations before: %+v, annotations after: %+v", svc.Name, memberCluster.Name, svc.Annotations, processedAnnotations) + svc.Annotations = processedAnnotations + } + + err := mekoService.CreateOrUpdateService(ctx, memberCluster.Client, svc) + if err != nil && !apiErrors.IsAlreadyExists(err) { + return xerrors.Errorf("failed to create external service %s in cluster: %s, err: %w", svc.Name, memberCluster.Name, err) + } + } + + // we create regular pod-services only if we don't use external domains + if mdb.Spec.GetExternalDomainForMemberCluster(clusterSpecItem.ClusterName) == nil { + svc = mdbService(mdb, clusterSpecItem.ClusterName, memberCluster.Index, podNum) + err := mekoService.CreateOrUpdateService(ctx, memberCluster.Client, svc) + if err != nil && !apiErrors.IsAlreadyExists(err) { + return xerrors.Errorf("failed to create pod service %s in cluster: %s, err: %w", svc.Name, memberCluster.Name, err) + } + } + } + return nil +} + +func mdbSRVService(mdb *mdbv1.MongoDB) corev1.Service { + additionalConfig := mdb.Spec.GetAdditionalMongodConfig() + port := additionalConfig.GetPortOrDefault() + + svc := service.Builder(). + SetName(fmt.Sprintf("%s-svc", mdb.Name)). + SetNamespace(mdb.Namespace). + SetSelector(mconstruct.PodLabel(mdb.Name)). + SetLabels(mdb.GetOwnerLabels()). + SetPublishNotReadyAddresses(true). + AddPort(&corev1.ServicePort{Port: port, Name: "mongodb"}). + AddPort(&corev1.ServicePort{Port: create.GetNonEphemeralBackupPort(port), Name: "backup", TargetPort: intstr.IntOrString{IntVal: create.GetNonEphemeralBackupPort(port)}}). + Build() + + return svc +} + +func mdbExternalService(mdb *mdb.MongoDB, clusterName string, clusterNum, podNum int) corev1.Service { + svc := mdbService(mdb, clusterName, clusterNum, podNum) + svc.Name = dns.GetMultiExternalServiceName(mdb.GetName(), clusterNum, podNum) + svc.Spec.Type = corev1.ServiceTypeLoadBalancer + + externalAccessConfig := mdb.Spec.GetExternalAccessConfigurationForMemberCluster(clusterName) + if externalAccessConfig != nil { + // first we override with the Service spec from the root and then from a specific cluster. + if mdb.Spec.ExternalAccessConfiguration != nil { + globalOverrideSpecWrapper := mdb.Spec.ExternalAccessConfiguration.ExternalService.SpecWrapper + if globalOverrideSpecWrapper != nil { + svc.Spec = merge.ServiceSpec(svc.Spec, globalOverrideSpecWrapper.Spec) + } + } + clusterLevelOverrideSpec := externalAccessConfig.ExternalService.SpecWrapper + additionalAnnotations := externalAccessConfig.ExternalService.Annotations + if clusterLevelOverrideSpec != nil { + svc.Spec = merge.ServiceSpec(svc.Spec, clusterLevelOverrideSpec.Spec) + } + svc.Annotations = merge.StringToStringMap(svc.Annotations, additionalAnnotations) + } + + return svc +} + +func mdbService(mdb *mdb.MongoDB, clusterName string, clusterNum, podNum int) corev1.Service { + svcLabels := map[string]string{ + appsv1.StatefulSetPodNameLabel: dns.GetMultiPodName(mdb.Name, clusterNum, podNum), + } + svcLabels = merge.StringToStringMap(svcLabels, mdb.GetOwnerLabels()) + + labelSelectors := map[string]string{ + appsv1.StatefulSetPodNameLabel: dns.GetMultiPodName(mdb.Name, clusterNum, podNum), + util.OperatorLabelName: util.OperatorLabelValue, + } + + additionalConfig := mdb.Spec.GetAdditionalMongodConfig() + port := additionalConfig.GetPortOrDefault() + + svc := service.Builder(). + SetName(dns.GetMultiServiceName(mdb.Name, clusterNum, podNum)). + SetNamespace(mdb.Namespace). + SetSelector(labelSelectors). + SetLabels(svcLabels). + SetPublishNotReadyAddresses(true). + AddPort(&corev1.ServicePort{Port: port, Name: "mongodb"}). + // Note: in the agent-launcher.sh We explicitly pass an offset of 1. When port N is exposed + // the agent would use port N+1 for the spinning up of the ephemeral mongod process, which is used for backup + AddPort(&corev1.ServicePort{Port: create.GetNonEphemeralBackupPort(port), Name: "backup", TargetPort: intstr.IntOrString{IntVal: create.GetNonEphemeralBackupPort(port)}}). + Build() + + return svc +} + func (r *ReplicaSetReconcilerHelper) reconcileStatefulSets(ctx context.Context, conn om.Connection, projectConfig mdbv1.ProjectConfig, deploymentOptions deploymentOptionsRS) workflow.Status { for _, memberCluster := range r.MemberClusters { if status := r.reconcileStatefulSet(ctx, conn, memberCluster.Client, memberCluster.SecretClient, projectConfig, deploymentOptions, memberCluster); !status.IsOK() { @@ -872,6 +1041,7 @@ func (r *ReplicaSetReconcilerHelper) buildStatefulSetOptions(ctx context.Context StatefulSetNameOverride(r.GetReplicaSetStsName(memberCluster)), ServiceName(r.GetReplicaSetServiceName(memberCluster)), Replicas(scale.ReplicasThisReconciliation(r.GetReplicaSetScaler(memberCluster))), + // TODO: Add owner labels and ensure proper clean up. Note that there are differences in labels between existing MongoDB and MongoDBMulticluster. ) return rsConfig, nil @@ -913,7 +1083,7 @@ func (r *ReplicaSetReconcilerHelper) GetReplicaSetServiceName(memberCluster mult if memberCluster.Legacy { return r.resource.ServiceName() } - return dns.GetMultiHeadlessServiceName(r.resource.Name, memberCluster.Index) + return fmt.Sprintf("%s-svc", r.GetReplicaSetStsName(memberCluster)) } // GetReplicaSetScaler returns a scaler for calculating replicas in a member cluster.