Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/v1/mdb/mongodb_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,16 @@ func (m *MongoDbSpec) GetExternalDomain() *string {
return nil
}

func (m *MongoDbSpec) GetExternalAccessConfigurationForMemberCluster(clusterName string) *ExternalAccessConfiguration {
for _, csl := range m.ClusterSpecList {
if csl.ClusterName == clusterName && csl.ExternalAccessConfiguration != nil {
return csl.ExternalAccessConfiguration
}
}

return m.ExternalAccessConfiguration
}

// GetExternalDomainForMemberCluster returns the external domain for a specific member cluster. Falls back to the global
// external domain if not found.
func (m *MongoDbSpec) GetExternalDomainForMemberCluster(clusterName string) *string {
Expand Down
172 changes: 171 additions & 1 deletion controllers/operator/mongodbreplicaset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import (
"go.uber.org/zap"
"golang.org/x/xerrors"
"k8s.io/apimachinery/pkg/api/errors"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -25,7 +28,9 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/mongodb/mongodb-kubernetes/api/v1/mdb"
mdbv1 "github.com/mongodb/mongodb-kubernetes/api/v1/mdb"
omv1 "github.com/mongodb/mongodb-kubernetes/api/v1/om"
rolev1 "github.com/mongodb/mongodb-kubernetes/api/v1/role"
searchv1 "github.com/mongodb/mongodb-kubernetes/api/v1/search"
mdbstatus "github.com/mongodb/mongodb-kubernetes/api/v1/status"
Expand All @@ -38,6 +43,7 @@ import (
"github.com/mongodb/mongodb-kubernetes/controllers/operator/certs"
"github.com/mongodb/mongodb-kubernetes/controllers/operator/connection"
"github.com/mongodb/mongodb-kubernetes/controllers/operator/construct"
mconstruct "github.com/mongodb/mongodb-kubernetes/controllers/operator/construct/multicluster"
"github.com/mongodb/mongodb-kubernetes/controllers/operator/construct/scalers"
"github.com/mongodb/mongodb-kubernetes/controllers/operator/construct/scalers/interfaces"
"github.com/mongodb/mongodb-kubernetes/controllers/operator/controlledfeature"
Expand All @@ -54,11 +60,13 @@ import (
kubernetesClient "github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/kube/client"
"github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/kube/configmap"
"github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/kube/container"
"github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/kube/service"
"github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/util/merge"
"github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/util/scale"
"github.com/mongodb/mongodb-kubernetes/pkg/dns"
"github.com/mongodb/mongodb-kubernetes/pkg/images"
"github.com/mongodb/mongodb-kubernetes/pkg/kube"
mekoService "github.com/mongodb/mongodb-kubernetes/pkg/kube/service"
"github.com/mongodb/mongodb-kubernetes/pkg/multicluster"
"github.com/mongodb/mongodb-kubernetes/pkg/statefulset"
"github.com/mongodb/mongodb-kubernetes/pkg/util"
Expand Down Expand Up @@ -711,6 +719,13 @@ func (r *ReplicaSetReconcilerHelper) reconcileMemberResources(ctx context.Contex
reconciler := r.reconciler
log := r.log

if r.resource.Spec.IsMultiCluster() {
err := r.reconcileServices(ctx)
if err != nil {
return workflow.Failed(err)
}
}

// Reconcile hostname override ConfigMap
if err := r.reconcileHostnameOverrideConfigMap(ctx, log, r.reconciler.client); err != nil {
return workflow.Failed(xerrors.Errorf("failed to reconcile hostname override ConfigMap: %w", err))
Expand All @@ -729,6 +744,160 @@ func (r *ReplicaSetReconcilerHelper) reconcileMemberResources(ctx context.Contex
return r.reconcileStatefulSets(ctx, conn, projectConfig, deploymentOptions)
}

// reconcileServices makes sure that we have a service object corresponding to each statefulset pod
// in the member clusters
func (r *ReplicaSetReconcilerHelper) reconcileServices(ctx context.Context) error {
for _, memberCluster := range r.MemberClusters {
if memberCluster.Replicas == 0 {
r.log.Warnf("skipping services creation: no members assigned to cluster %s", memberCluster.Name)
continue
}

// ensure SRV service
srvService := mdbSRVService(r.resource)
if err := ensureSRVService(ctx, memberCluster.Client, srvService, memberCluster.Name); err != nil {
return err
}
r.log.Infof("Successfully created SRV service %s in cluster %s", srvService.Name, memberCluster.Name)

// ensure Headless service
headlessServiceName := dns.GetMultiStatefulSetName(r.resource.Name, memberCluster.Index)
nameSpacedName := kube.ObjectKey(r.resource.Namespace, headlessServiceName)
headlessService := create.BuildService(nameSpacedName, r.resource, ptr.To(headlessServiceName), nil, r.resource.Spec.AdditionalMongodConfig.GetPortOrDefault(), omv1.MongoDBOpsManagerServiceDefinition{Type: corev1.ServiceTypeClusterIP})
if err := ensureHeadlessService(ctx, memberCluster.Client, headlessService, memberCluster.Name); err != nil {
return err
}
r.log.Infof("Successfully created headless service %s in cluster: %s", headlessServiceName, memberCluster.Name)
}

// by default, we would create the duplicate services
shouldCreateDuplicates := r.resource.Spec.DuplicateServiceObjects == nil || *r.resource.Spec.DuplicateServiceObjects
for _, memberCluster := range r.MemberClusters {
for _, clusterSpecItem := range r.getClusterSpecList() {
if !shouldCreateDuplicates && clusterSpecItem.ClusterName != memberCluster.Name {
// skip creating of other cluster's services (duplicates) in the current cluster
continue
}

if err := mdbEnsureServices(ctx, r.resource, &memberCluster, clusterSpecItem, r.log); err != nil {
return err
}
}
}

return nil
}

// mdbEnsureServices creates pod services and/or external services.
// If externalAccess is defined (at spec or clusterSpecItem level) then we always create an external service.
// If externalDomain is defined then we DO NOT create pod services (service created for each pod selecting only 1 pod).
// When there are external domains used, we don't use internal pod-service FQDNs as hostnames at all,
// so there is no point in creating pod services.
// But when external domains are not used, then mongod process hostnames use pod service FQDN, and
// at the same time user might want to expose externally using external services.
func mdbEnsureServices(ctx context.Context, mdb *mdb.MongoDB, memberCluster *multicluster.MemberCluster, clusterSpecItem mdb.ClusterSpecItem, log *zap.SugaredLogger) error {
for podNum := 0; podNum < clusterSpecItem.Members; podNum++ {
var svc corev1.Service
if mdb.Spec.GetExternalAccessConfigurationForMemberCluster(clusterSpecItem.ClusterName) != nil {
svc = mdbExternalService(mdb, clusterSpecItem.ClusterName, memberCluster.Index, podNum)
externalDomain := mdb.Spec.GetExternalDomainForMemberCluster(clusterSpecItem.ClusterName)
placeholderReplacer := create.GetMultiClusterMongoDBPlaceholderReplacer(mdb.Name, mdb.Name, mdb.Namespace, clusterSpecItem.ClusterName, memberCluster.Index, externalDomain, mdb.Spec.ClusterDomain, podNum)
if processedAnnotations, replacedFlag, err := placeholderReplacer.ProcessMap(svc.Annotations); err != nil {
return xerrors.Errorf("failed to process annotations in external service %s in cluster %s: %w", svc.Name, memberCluster.Name, err)
} else if replacedFlag {
log.Debugf("Replaced placeholders in annotations in external service %s in cluster: %s. Annotations before: %+v, annotations after: %+v", svc.Name, memberCluster.Name, svc.Annotations, processedAnnotations)
svc.Annotations = processedAnnotations
}

err := mekoService.CreateOrUpdateService(ctx, memberCluster.Client, svc)
if err != nil && !apiErrors.IsAlreadyExists(err) {
return xerrors.Errorf("failed to create external service %s in cluster: %s, err: %w", svc.Name, memberCluster.Name, err)
}
}

// we create regular pod-services only if we don't use external domains
if mdb.Spec.GetExternalDomainForMemberCluster(clusterSpecItem.ClusterName) == nil {
svc = mdbService(mdb, clusterSpecItem.ClusterName, memberCluster.Index, podNum)
err := mekoService.CreateOrUpdateService(ctx, memberCluster.Client, svc)
if err != nil && !apiErrors.IsAlreadyExists(err) {
return xerrors.Errorf("failed to create pod service %s in cluster: %s, err: %w", svc.Name, memberCluster.Name, err)
}
}
}
return nil
}

func mdbSRVService(mdb *mdbv1.MongoDB) corev1.Service {
additionalConfig := mdb.Spec.GetAdditionalMongodConfig()
port := additionalConfig.GetPortOrDefault()

svc := service.Builder().
SetName(fmt.Sprintf("%s-svc", mdb.Name)).
SetNamespace(mdb.Namespace).
SetSelector(mconstruct.PodLabel(mdb.Name)).
SetLabels(mdb.GetOwnerLabels()).
SetPublishNotReadyAddresses(true).
AddPort(&corev1.ServicePort{Port: port, Name: "mongodb"}).
AddPort(&corev1.ServicePort{Port: create.GetNonEphemeralBackupPort(port), Name: "backup", TargetPort: intstr.IntOrString{IntVal: create.GetNonEphemeralBackupPort(port)}}).
Build()

return svc
}

func mdbExternalService(mdb *mdb.MongoDB, clusterName string, clusterNum, podNum int) corev1.Service {
svc := mdbService(mdb, clusterName, clusterNum, podNum)
svc.Name = dns.GetMultiExternalServiceName(mdb.GetName(), clusterNum, podNum)
svc.Spec.Type = corev1.ServiceTypeLoadBalancer

externalAccessConfig := mdb.Spec.GetExternalAccessConfigurationForMemberCluster(clusterName)
if externalAccessConfig != nil {
// first we override with the Service spec from the root and then from a specific cluster.
if mdb.Spec.ExternalAccessConfiguration != nil {
globalOverrideSpecWrapper := mdb.Spec.ExternalAccessConfiguration.ExternalService.SpecWrapper
if globalOverrideSpecWrapper != nil {
svc.Spec = merge.ServiceSpec(svc.Spec, globalOverrideSpecWrapper.Spec)
}
}
clusterLevelOverrideSpec := externalAccessConfig.ExternalService.SpecWrapper
additionalAnnotations := externalAccessConfig.ExternalService.Annotations
if clusterLevelOverrideSpec != nil {
svc.Spec = merge.ServiceSpec(svc.Spec, clusterLevelOverrideSpec.Spec)
}
svc.Annotations = merge.StringToStringMap(svc.Annotations, additionalAnnotations)
}

return svc
}

func mdbService(mdb *mdb.MongoDB, clusterName string, clusterNum, podNum int) corev1.Service {
svcLabels := map[string]string{
appsv1.StatefulSetPodNameLabel: dns.GetMultiPodName(mdb.Name, clusterNum, podNum),
}
svcLabels = merge.StringToStringMap(svcLabels, mdb.GetOwnerLabels())

labelSelectors := map[string]string{
appsv1.StatefulSetPodNameLabel: dns.GetMultiPodName(mdb.Name, clusterNum, podNum),
util.OperatorLabelName: util.OperatorLabelValue,
}

additionalConfig := mdb.Spec.GetAdditionalMongodConfig()
port := additionalConfig.GetPortOrDefault()

svc := service.Builder().
SetName(dns.GetMultiServiceName(mdb.Name, clusterNum, podNum)).
SetNamespace(mdb.Namespace).
SetSelector(labelSelectors).
SetLabels(svcLabels).
SetPublishNotReadyAddresses(true).
AddPort(&corev1.ServicePort{Port: port, Name: "mongodb"}).
// Note: in the agent-launcher.sh We explicitly pass an offset of 1. When port N is exposed
// the agent would use port N+1 for the spinning up of the ephemeral mongod process, which is used for backup
AddPort(&corev1.ServicePort{Port: create.GetNonEphemeralBackupPort(port), Name: "backup", TargetPort: intstr.IntOrString{IntVal: create.GetNonEphemeralBackupPort(port)}}).
Build()

return svc
}

func (r *ReplicaSetReconcilerHelper) reconcileStatefulSets(ctx context.Context, conn om.Connection, projectConfig mdbv1.ProjectConfig, deploymentOptions deploymentOptionsRS) workflow.Status {
for _, memberCluster := range r.MemberClusters {
if status := r.reconcileStatefulSet(ctx, conn, memberCluster.Client, memberCluster.SecretClient, projectConfig, deploymentOptions, memberCluster); !status.IsOK() {
Expand Down Expand Up @@ -872,6 +1041,7 @@ func (r *ReplicaSetReconcilerHelper) buildStatefulSetOptions(ctx context.Context
StatefulSetNameOverride(r.GetReplicaSetStsName(memberCluster)),
ServiceName(r.GetReplicaSetServiceName(memberCluster)),
Replicas(scale.ReplicasThisReconciliation(r.GetReplicaSetScaler(memberCluster))),
// TODO: Add owner labels and ensure proper clean up. Note that there are differences in labels between existing MongoDB and MongoDBMulticluster.
)

return rsConfig, nil
Expand Down Expand Up @@ -913,7 +1083,7 @@ func (r *ReplicaSetReconcilerHelper) GetReplicaSetServiceName(memberCluster mult
if memberCluster.Legacy {
return r.resource.ServiceName()
}
return dns.GetMultiHeadlessServiceName(r.resource.Name, memberCluster.Index)
return fmt.Sprintf("%s-svc", r.GetReplicaSetStsName(memberCluster))
}

// GetReplicaSetScaler returns a scaler for calculating replicas in a member cluster.
Expand Down
16 changes: 0 additions & 16 deletions controllers/operator/mongodbreplicaset_controller_multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,22 +97,6 @@ func checkReplicaSetReconcileSuccessful(
assert.NoError(t, err)
}

// getReplicaSetMultiClusterMap simulates multiple K8s clusters using fake clients
func getReplicaSetMultiClusterMap(omConnectionFactory *om.CachedOMConnectionFactory) map[string]client.Client {
clientMap := make(map[string]client.Client)

for _, clusterName := range multiClusters {
fakeClientBuilder := mock.NewEmptyFakeClientBuilder()
fakeClientBuilder.WithInterceptorFuncs(interceptor.Funcs{
Get: mock.GetFakeClientInterceptorGetFunc(omConnectionFactory, true, true),
})

clientMap[clusterName] = kubernetesClient.NewClient(fakeClientBuilder.Build())
}

return clientMap
}

// TestReplicaSetMultiClusterScaling tests that multi-cluster ReplicaSets scale one member at a time
// across all clusters, similar to single-cluster behavior.
//
Expand Down
5 changes: 1 addition & 4 deletions pkg/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ func GetMultiServiceFQDN(stsName string, namespace string, clusterNum int, podNu
domain = strings.TrimPrefix(clusterDomain, ".")
}

// For StatefulSet pods, DNS format is: <pod-name>.<headless-service>.<namespace>.svc.<domain>
podName := GetMultiPodName(stsName, clusterNum, podNum)
headlessService := GetMultiHeadlessServiceName(stsName, clusterNum)
return fmt.Sprintf("%s.%s.%s.svc.%s", podName, headlessService, namespace, domain)
return fmt.Sprintf("%s.%s.svc.%s", GetMultiServiceName(stsName, clusterNum, podNum), namespace, domain)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverting back to this (original, as seen in master) code fixes most of the tests, but breaks MongoDB-based multi-cluster.

Controller for MongoDBMultiCluster generates hostnames like this:

multi-replica-set-0-0-svc.mongodb-test.svc.cluster.local
multi-replica-set-1-0-svc.mongodb-test.svc.cluster.local
multi-replica-set-2-0-svc.mongodb-test.svc.cluster.local

But the controller for MongoDB multi-cluster replica sets creates hostnames like this even with the revert:

multi-replica-set-0-0.multi-replica-set-0-svc.mongodb-test.svc.cluster.local
multi-replica-set-1-0.multi-replica-set-1-svc.mongodb-test.svc.cluster.local
multi-replica-set-2-0.multi-replica-set-2-svc.mongodb-test.svc.cluster.local

Still trying to understand why.

}

func GetMultiServiceExternalDomain(stsName, externalDomain string, clusterNum, podNum int) string {
Expand Down