From 2a8a5394850fe6e73d4e12d2da23eb2e991cf36c Mon Sep 17 00:00:00 2001 From: Helder Santana Date: Tue, 17 Dec 2024 13:56:43 +0100 Subject: [PATCH 1/4] fix private endpoint conflict between sub-resource and new CRD --- pkg/api/v1/atlasproject_types.go | 19 ++ pkg/api/v1/atlasproject_types_test.go | 40 ++++- .../atlasproject/private_endpoint.go | 119 ++++++++---- .../atlasproject/private_endpoint_test.go | 170 +++++++++++++++--- pkg/controller/atlasproject/project.go | 2 +- test/e2e/privateendpoint_test.go | 129 ++++++++++++- 6 files changed, 406 insertions(+), 73 deletions(-) diff --git a/pkg/api/v1/atlasproject_types.go b/pkg/api/v1/atlasproject_types.go index 1cec26bff9..950df055df 100644 --- a/pkg/api/v1/atlasproject_types.go +++ b/pkg/api/v1/atlasproject_types.go @@ -17,6 +17,9 @@ limitations under the License. package v1 import ( + "encoding/json" + "fmt" + "go.uber.org/zap/zapcore" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" @@ -295,6 +298,22 @@ func (p *AtlasProject) WithMaintenanceWindow(window project.MaintenanceWindow) * return p } +func (p *AtlasProject) LastSpecFrom(annotation string) (*AtlasProjectSpec, error) { + var lastApplied AtlasProject + ann, ok := p.GetAnnotations()[annotation] + + if !ok { + return nil, nil + } + + err := json.Unmarshal([]byte(ann), &lastApplied.Spec) + if err != nil { + return nil, fmt.Errorf("error reading AtlasProject Spec from annotation [%s]: %w", annotation, err) + } + + return &lastApplied.Spec, nil +} + func DefaultProject(namespace, connectionSecretName string) *AtlasProject { return NewProject(namespace, "test-project", namespace).WithConnectionSecret(connectionSecretName) } diff --git a/pkg/api/v1/atlasproject_types_test.go b/pkg/api/v1/atlasproject_types_test.go index 479c372cf4..031fa0cf88 100644 --- a/pkg/api/v1/atlasproject_types_test.go +++ b/pkg/api/v1/atlasproject_types_test.go @@ -6,14 +6,13 @@ import ( "testing" "time" - "go.mongodb.org/atlas-sdk/v20231115004/admin" - - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/common" - "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "go.mongodb.org/atlas-sdk/v20231115004/admin" "sigs.k8s.io/yaml" internalcmp "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/cmp" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/common" ) func TestSpecEquality(t *testing.T) { @@ -126,3 +125,36 @@ func mustMarshal(t *testing.T, what any) string { } return string(result) } + +func TestLastSpecFrom(t *testing.T) { + tests := map[string]struct { + annotations map[string]string + expectedLastSpec *AtlasProjectSpec + expectedError string + }{ + + "should return nil when there is no last spec": {}, + "should return error when last spec annotation is wrong": { + annotations: map[string]string{"mongodb.com/last-applied-configuration": "{wrong}"}, + expectedError: "error reading AtlasProject Spec from annotation [mongodb.com/last-applied-configuration]:" + + " invalid character 'w' looking for beginning of object key string", + }, + "should return last spec": { + annotations: map[string]string{"mongodb.com/last-applied-configuration": "{\"name\": \"my-project\"}"}, + expectedLastSpec: &AtlasProjectSpec{ + Name: "my-project", + }, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + p := &AtlasProject{} + p.WithAnnotations(tt.annotations) + lastSpec, err := p.LastSpecFrom("mongodb.com/last-applied-configuration") + if err != nil { + assert.ErrorContains(t, err, tt.expectedError) + } + assert.Equal(t, tt.expectedLastSpec, lastSpec) + }) + } +} diff --git a/pkg/controller/atlasproject/private_endpoint.go b/pkg/controller/atlasproject/private_endpoint.go index 800fad76ca..3838a347b9 100644 --- a/pkg/controller/atlasproject/private_endpoint.go +++ b/pkg/controller/atlasproject/private_endpoint.go @@ -36,12 +36,17 @@ func ensurePrivateEndpoint(workflowCtx *workflow.Context, project *akov2.AtlasPr specPEs := project.Spec.DeepCopy().PrivateEndpoints + lastAppliedPEs, err := mapLastAppliedPrivateEndpoint(project) + if err != nil { + return workflow.Terminate(workflow.Internal, err.Error()) + } + atlasPEs, err := getAllPrivateEndpoints(workflowCtx.Context, workflowCtx.SdkClient, project.ID()) if err != nil { return workflow.Terminate(workflow.Internal, err.Error()) } - result, conditionType := syncPrivateEndpointsWithAtlas(workflowCtx, project.ID(), specPEs, atlasPEs) + result, conditionType := syncPrivateEndpointsWithAtlas(workflowCtx, project.ID(), specPEs, atlasPEs, lastAppliedPEs) if !result.IsOk() { if conditionType == api.PrivateEndpointServiceReadyType { workflowCtx.UnsetCondition(api.PrivateEndpointReadyType) @@ -50,13 +55,13 @@ func ensurePrivateEndpoint(workflowCtx *workflow.Context, project *akov2.AtlasPr return result } - if len(specPEs) == 0 && len(atlasPEs) == 0 { + if (len(specPEs) == 0 && len(atlasPEs) == 0) || !hasManagedPrivateEndpoints(atlasPEs, lastAppliedPEs) { workflowCtx.UnsetCondition(api.PrivateEndpointServiceReadyType) workflowCtx.UnsetCondition(api.PrivateEndpointReadyType) return workflow.OK() } - serviceStatus := getStatusForServices(workflowCtx, atlasPEs) + serviceStatus := getStatusForServices(atlasPEs, lastAppliedPEs) if !serviceStatus.IsOk() { workflowCtx.SetConditionFromResult(api.PrivateEndpointServiceReadyType, serviceStatus) return serviceStatus @@ -75,17 +80,23 @@ func ensurePrivateEndpoint(workflowCtx *workflow.Context, project *akov2.AtlasPr } } - interfaceStatus := getStatusForInterfaces(workflowCtx, project.ID(), specPEs, atlasPEs) + interfaceStatus := getStatusForInterfaces(workflowCtx, project.ID(), specPEs, atlasPEs, lastAppliedPEs) workflowCtx.SetConditionFromResult(api.PrivateEndpointReadyType, interfaceStatus) return interfaceStatus } -func syncPrivateEndpointsWithAtlas(ctx *workflow.Context, projectID string, specPEs []akov2.PrivateEndpoint, atlasPEs []atlasPE) (workflow.Result, api.ConditionType) { +func syncPrivateEndpointsWithAtlas( + ctx *workflow.Context, + projectID string, + specPEs []akov2.PrivateEndpoint, + atlasPEs []atlasPE, + lastAppliedPEs map[string]akov2.PrivateEndpoint, +) (workflow.Result, api.ConditionType) { log := ctx.Log log.Debugw("PE Connections", "atlasPEs", atlasPEs, "specPEs", specPEs) - endpointsToDelete := getEndpointsNotInSpec(specPEs, atlasPEs) + endpointsToDelete := getEndpointsNotInSpec(specPEs, atlasPEs, lastAppliedPEs) log.Debugf("Number of Private Endpoints to delete: %d", len(endpointsToDelete)) if result := deletePrivateEndpointsFromAtlas(ctx, projectID, endpointsToDelete); !result.IsOk() { return result, api.PrivateEndpointServiceReadyType @@ -115,12 +126,22 @@ func syncPrivateEndpointsWithAtlas(ctx *workflow.Context, projectID string, spec return workflow.OK(), api.PrivateEndpointReadyType } -func getStatusForServices(ctx *workflow.Context, atlasPEs []atlasPE) workflow.Result { - allAvailable, failureMessage := areServicesAvailableOrFailed(atlasPEs) - ctx.Log.Debugw("Get Status for Services", "allAvailable", allAvailable, "failureMessage", failureMessage) - if failureMessage != "" { - return workflow.Terminate(workflow.ProjectPEServiceIsNotReadyInAtlas, failureMessage) +func getStatusForServices(atlasPEs []atlasPE, lastAppliedPEs map[string]akov2.PrivateEndpoint) workflow.Result { + allAvailable := true + for _, conn := range atlasPEs { + if _, ok := lastAppliedPEs[conn.Identifier().(string)]; !ok { + continue + } + + if isFailed(conn.GetStatus()) { + return workflow.Terminate(workflow.ProjectPEServiceIsNotReadyInAtlas, conn.GetErrorMessage()) + } + + if !isAvailable(conn.GetStatus()) { + allAvailable = false + } } + if !allAvailable { return notReadyServiceResult } @@ -128,10 +149,14 @@ func getStatusForServices(ctx *workflow.Context, atlasPEs []atlasPE) workflow.Re return workflow.OK() } -func getStatusForInterfaces(ctx *workflow.Context, projectID string, specPEs []akov2.PrivateEndpoint, atlasPEs []atlasPE) workflow.Result { +func getStatusForInterfaces(ctx *workflow.Context, projectID string, specPEs []akov2.PrivateEndpoint, atlasPEs []atlasPE, lastAppliedPEs map[string]akov2.PrivateEndpoint) workflow.Result { totalInterfaceCount := 0 for _, atlasPeService := range atlasPEs { + if _, ok := lastAppliedPEs[atlasPeService.Identifier().(string)]; !ok { + continue + } + interfaceEndpointIDs := atlasPeService.InterfaceEndpointIDs() totalInterfaceCount += len(interfaceEndpointIDs) @@ -167,22 +192,6 @@ func getStatusForInterfaces(ctx *workflow.Context, projectID string, specPEs []a return workflow.OK() } -func areServicesAvailableOrFailed(atlasPeConnections []atlasPE) (allAvailable bool, failureMessage string) { - allAvailable = true - - for _, conn := range atlasPeConnections { - if isFailed(conn.GetStatus()) { - failureMessage = conn.GetErrorMessage() - return - } - if !isAvailable(conn.GetStatus()) { - allAvailable = false - } - } - - return -} - func updatePEStatusOption(ctx *workflow.Context, projectID string, newConnections, syncedConnections []atlasPE) { setPEStatusOption(ctx, projectID, syncedConnections) addPEStatusOption(ctx, projectID, newConnections) @@ -332,14 +341,19 @@ func endpointDefinedInSpec(specEndpoint akov2.PrivateEndpoint) bool { return specEndpoint.ID != "" || specEndpoint.EndpointGroupName != "" } -func DeleteAllPrivateEndpoints(ctx *workflow.Context, projectID string) workflow.Result { - atlasPEs, err := getAllPrivateEndpoints(ctx.Context, ctx.SdkClient, projectID) +func DeleteAllPrivateEndpoints(ctx *workflow.Context, atlasProject *akov2.AtlasProject) workflow.Result { + atlasPEs, err := getAllPrivateEndpoints(ctx.Context, ctx.SdkClient, atlasProject.ID()) + if err != nil { + return workflow.Terminate(workflow.Internal, err.Error()) + } + + lastAppliedSpecPEs, err := mapLastAppliedPrivateEndpoint(atlasProject) if err != nil { return workflow.Terminate(workflow.Internal, err.Error()) } - endpointsToDelete := getEndpointsNotInSpec([]akov2.PrivateEndpoint{}, atlasPEs) - return deletePrivateEndpointsFromAtlas(ctx, projectID, endpointsToDelete) + endpointsToDelete := getEndpointsNotInSpec([]akov2.PrivateEndpoint{}, atlasPEs, lastAppliedSpecPEs) + return deletePrivateEndpointsFromAtlas(ctx, atlasProject.ID(), endpointsToDelete) } func deletePrivateEndpointsFromAtlas(ctx *workflow.Context, projectID string, listsToRemove []atlasPE) workflow.Result { @@ -516,9 +530,16 @@ func terminateWithError(ctx *workflow.Context, conditionType api.ConditionType, var notReadyServiceResult = workflow.InProgress(workflow.ProjectPEServiceIsNotReadyInAtlas, "Private Endpoint Service is not ready") var notReadyInterfaceResult = workflow.InProgress(workflow.ProjectPEInterfaceIsNotReadyInAtlas, "Interface Private Endpoint is not ready") -func getEndpointsNotInSpec(specPEs []akov2.PrivateEndpoint, atlasPEs []atlasPE) []atlasPE { - uniqueItems, _ := getUniqueDifference(atlasPEs, specPEs) - return uniqueItems +func getEndpointsNotInSpec(specPEs []akov2.PrivateEndpoint, atlasPEs []atlasPE, lastAppliedPEs map[string]akov2.PrivateEndpoint) []atlasPE { + notInSpecItems, _ := getUniqueDifference(atlasPEs, specPEs) + toDelete := make([]atlasPE, 0, len(notInSpecItems)) + for _, item := range notInSpecItems { + if _, ok := lastAppliedPEs[item.Identifier().(string)]; ok { + toDelete = append(toDelete, item) + } + } + + return toDelete } func getEndpointsNotInAtlas(specPEs []akov2.PrivateEndpoint, atlasPEs []atlasPE) (toCreate []akov2.PrivateEndpoint, counts []int) { @@ -585,3 +606,31 @@ func hasSkippedPrivateEndpointConfiguration(atlasProject *akov2.AtlasProject) (b return false, nil } + +func mapLastAppliedPrivateEndpoint(atlasProject *akov2.AtlasProject) (map[string]akov2.PrivateEndpoint, error) { + lastApplied, err := atlasProject.LastSpecFrom(customresource.AnnotationLastAppliedConfiguration) + if err != nil { + return nil, err + } + + if lastApplied == nil || len(lastApplied.PrivateEndpoints) == 0 { + return nil, nil + } + + result := map[string]akov2.PrivateEndpoint{} + for _, pe := range lastApplied.PrivateEndpoints { + result[pe.Identifier().(string)] = pe + } + + return result, nil +} + +func hasManagedPrivateEndpoints(atlasPEs []atlasPE, lastAppliedPEs map[string]akov2.PrivateEndpoint) bool { + for _, pe := range atlasPEs { + if _, ok := lastAppliedPEs[pe.Identifier().(string)]; ok { + return true + } + } + + return false +} diff --git a/pkg/controller/atlasproject/private_endpoint_test.go b/pkg/controller/atlasproject/private_endpoint_test.go index 202fd20fcf..521e6ac1f0 100644 --- a/pkg/controller/atlasproject/private_endpoint_test.go +++ b/pkg/controller/atlasproject/private_endpoint_test.go @@ -8,6 +8,7 @@ import ( akov2 "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/provider" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/customresource" ) func TestGetEndpointsNotInAtlas(t *testing.T) { @@ -46,42 +47,153 @@ func TestGetEndpointsNotInAtlas(t *testing.T) { } func TestGetEndpointsNotInSpec(t *testing.T) { - const region1 = "SOME_REGION" - const region2 = "OTHER_REGION" - specPEs := []akov2.PrivateEndpoint{ - { - Provider: provider.ProviderAWS, - Region: region1, - }, - { - Provider: provider.ProviderAWS, - Region: region1, + tests := map[string]struct { + specPEs []akov2.PrivateEndpoint + atlasPEs []atlasPE + lastPEs map[string]akov2.PrivateEndpoint + expectedItems []atlasPE + }{ + "should return no items when spec and atlas are the same": { + specPEs: []akov2.PrivateEndpoint{ + { + Provider: provider.ProviderAWS, + Region: "us_east1", + }, + { + Provider: provider.ProviderAWS, + Region: "us_east2", + }, + }, + atlasPEs: []atlasPE{ + { + EndpointService: admin.EndpointService{ + CloudProvider: string(provider.ProviderAWS), + RegionName: admin.PtrString("us_east1"), + }, + }, + { + EndpointService: admin.EndpointService{ + CloudProvider: string(provider.ProviderAWS), + RegionName: admin.PtrString("us_east2"), + }, + }, + }, + expectedItems: []atlasPE{}, }, - } - atlasPEs := []atlasPE{ - { - EndpointService: admin.EndpointService{ - CloudProvider: string(provider.ProviderAWS), - RegionName: admin.PtrString(region1), + "should return no items when spec and atlas are different but not previously managed by the operator": { + specPEs: []akov2.PrivateEndpoint{ + { + Provider: provider.ProviderAWS, + Region: "us_east1", + }, + { + Provider: provider.ProviderAWS, + Region: "us_east2", + }, + }, + atlasPEs: []atlasPE{ + { + EndpointService: admin.EndpointService{ + CloudProvider: string(provider.ProviderAWS), + RegionName: admin.PtrString("us_east1"), + }, + }, + { + EndpointService: admin.EndpointService{ + CloudProvider: string(provider.ProviderAWS), + RegionName: admin.PtrString("us_west1"), + }, + }, }, + expectedItems: []atlasPE{}, }, - { - EndpointService: admin.EndpointService{ - CloudProvider: string(provider.ProviderAWS), - RegionName: admin.PtrString(region1), + "should return items when spec and atlas are different but previously managed by the operator": { + specPEs: []akov2.PrivateEndpoint{ + { + Provider: provider.ProviderAWS, + Region: "us_east1", + }, + }, + atlasPEs: []atlasPE{ + { + EndpointService: admin.EndpointService{ + CloudProvider: string(provider.ProviderAWS), + RegionName: admin.PtrString("us_east1"), + }, + }, + { + EndpointService: admin.EndpointService{ + CloudProvider: string(provider.ProviderAWS), + RegionName: admin.PtrString("us_east2"), + }, + }, + }, + lastPEs: map[string]akov2.PrivateEndpoint{ + "AWSaesstu": { + Provider: provider.ProviderAWS, + Region: "us_east1", + }, + "AWS2aesstu": { + Provider: provider.ProviderAWS, + Region: "us_east2", + }, + }, + expectedItems: []atlasPE{ + { + EndpointService: admin.EndpointService{ + CloudProvider: string(provider.ProviderAWS), + RegionName: admin.PtrString("us_east2"), + }, + }, }, }, } - uniqueItems := getEndpointsNotInSpec(specPEs, atlasPEs) - assert.Equalf(t, 0, len(uniqueItems), "getEndpointsNotInSpec should not return anything if PEs are in spec") + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + uniqueItems := getEndpointsNotInSpec(tt.specPEs, tt.atlasPEs, tt.lastPEs) + assert.Equal(t, tt.expectedItems, uniqueItems) + }) + } +} - atlasPEs = append(atlasPEs, atlasPE{ - EndpointService: admin.EndpointService{ - CloudProvider: string(provider.ProviderAWS), - RegionName: admin.PtrString(region2), +func TestMapLastAppliedPrivateEndpoint(t *testing.T) { + tests := map[string]struct { + annotations map[string]string + expectedPEs map[string]akov2.PrivateEndpoint + expectedError string + }{ + "should return error when last spec annotation is wrong": { + annotations: map[string]string{customresource.AnnotationLastAppliedConfiguration: "{wrong}"}, + expectedError: "error reading AtlasProject Spec from annotation [mongodb.com/last-applied-configuration]:" + + " invalid character 'w' looking for beginning of object key string", }, - }) - uniqueItems = getEndpointsNotInSpec(specPEs, atlasPEs) - assert.Equalf(t, 1, len(uniqueItems), "getEndpointsNotInSpec should get a spec item") + "should return nil when there is no last spec": {}, + "should return map of last private endpoints": { + annotations: map[string]string{ + customresource.AnnotationLastAppliedConfiguration: "{\"privateEndpoints\": [{\"provider\":\"AWS\",\"region\":\"us_east1\"},{\"provider\":\"AWS\",\"region\":\"us_east2\"}]}"}, + expectedPEs: map[string]akov2.PrivateEndpoint{ + "AWSaesstu": { + Provider: provider.ProviderAWS, + Region: "us_east1", + }, + "AWS2aesstu": { + Provider: provider.ProviderAWS, + Region: "us_east2", + }, + }, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + p := &akov2.AtlasProject{} + p.WithAnnotations(tt.annotations) + + result, err := mapLastAppliedPrivateEndpoint(p) + if err != nil { + assert.ErrorContains(t, err, tt.expectedError) + } + assert.Equal(t, tt.expectedPEs, result) + }) + } } diff --git a/pkg/controller/atlasproject/project.go b/pkg/controller/atlasproject/project.go index 11262851ac..d47639d3c8 100644 --- a/pkg/controller/atlasproject/project.go +++ b/pkg/controller/atlasproject/project.go @@ -99,7 +99,7 @@ func (r *AtlasProjectReconciler) delete(ctx *workflow.Context, orgID string, atl if customresource.IsResourcePolicyKeepOrDefault(atlasProject, r.ObjectDeletionProtection) { r.Log.Info("Not removing Project from Atlas as per configuration") } else { - if result := DeleteAllPrivateEndpoints(ctx, atlasProject.ID()); !result.IsOk() { + if result := DeleteAllPrivateEndpoints(ctx, atlasProject); !result.IsOk() { return r.terminate(ctx, workflow.ServerlessPrivateEndpointReady, errors.New(result.GetMessage())) } if result := DeleteAllNetworkPeers(ctx.Context, atlasProject.ID(), ctx.SdkClient.NetworkPeeringApi, ctx.Log); !result.IsOk() { diff --git a/test/e2e/privateendpoint_test.go b/test/e2e/privateendpoint_test.go index e17f725f4b..12e88b43de 100644 --- a/test/e2e/privateendpoint_test.go +++ b/test/e2e/privateendpoint_test.go @@ -16,6 +16,7 @@ import ( "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/provider" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/status" "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/customresource" + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/workflow" "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/conditions" "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/e2e/actions" "github.com/mongodb/mongodb-atlas-kubernetes/v2/test/helper/e2e/actions/cloud" @@ -417,6 +418,7 @@ var _ = Describe("Migrate private endpoints from sub-resources to separate custo }) By("Migrate private endpoint as separate custom resource", func() { + //nolint:dupl By("Migrating AWS private endpoint", func() { awsPE = &akov2.AtlasPrivateEndpoint{ ObjectMeta: metav1.ObjectMeta{ @@ -446,7 +448,7 @@ var _ = Describe("Migrate private endpoints from sub-resources to separate custo api.TrueCondition(api.ReadyType), ) g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(awsPE), awsPE)).To(Succeed()) - g.Expect(testData.Project.Status.Conditions).To(ContainElements(expectedConditions)) + g.Expect(awsPE.Status.Conditions).To(ContainElements(expectedConditions)) g.Expect(awsPE.Status.ServiceStatus).To(Equal("AVAILABLE")) for _, eStatus := range awsPE.Status.Endpoints { g.Expect(eStatus.Status).To(Equal("AVAILABLE")) @@ -484,7 +486,7 @@ var _ = Describe("Migrate private endpoints from sub-resources to separate custo api.TrueCondition(api.ReadyType), ) g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(azurePE), azurePE)).To(Succeed()) - g.Expect(testData.Project.Status.Conditions).To(ContainElements(expectedConditions)) + g.Expect(azurePE.Status.Conditions).To(ContainElements(expectedConditions)) g.Expect(azurePE.Status.ServiceStatus).To(Equal("AVAILABLE")) for _, eStatus := range azurePE.Status.Endpoints { g.Expect(eStatus.Status).To(Equal("AVAILABLE")) @@ -534,7 +536,7 @@ var _ = Describe("Migrate private endpoints from sub-resources to separate custo api.TrueCondition(api.ReadyType), ) g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(gcpPE), gcpPE)).To(Succeed()) - g.Expect(testData.Project.Status.Conditions).To(ContainElements(expectedConditions)) + g.Expect(gcpPE.Status.Conditions).To(ContainElements(expectedConditions)) g.Expect(gcpPE.Status.ServiceStatus).To(Equal("AVAILABLE")) for _, eStatus := range gcpPE.Status.Endpoints { g.Expect(eStatus.Status).To(Equal("AVAILABLE")) @@ -584,7 +586,7 @@ var _ = Describe("Migrate private endpoints from sub-resources to separate custo ) for _, pe := range []*akov2.AtlasPrivateEndpoint{awsPE, azurePE, gcpPE} { g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(pe), pe)).To(Succeed()) - g.Expect(testData.Project.Status.Conditions).To(ContainElements(expectedConditions)) + g.Expect(pe.Status.Conditions).To(ContainElements(expectedConditions)) g.Expect(pe.Status.ServiceStatus).To(Equal("AVAILABLE")) for _, eStatus := range pe.Status.Endpoints { g.Expect(eStatus.Status).To(Equal("AVAILABLE")) @@ -607,6 +609,125 @@ var _ = Describe("Migrate private endpoints from sub-resources to separate custo }) }) +var _ = Describe("Independent resource should no conflict with sub-resource", Label("private-endpoint"), func() { + var testData *model.TestDataProvider + var awsPE *akov2.AtlasPrivateEndpoint + + _ = BeforeEach(func() { + checkUpAWSEnvironment() + checkUpAzureEnvironment() + checkNSetUpGCPEnvironment() + }) + + _ = AfterEach(func() { + GinkgoWriter.Println() + GinkgoWriter.Println("===============================================") + GinkgoWriter.Println("Operator namespace: " + testData.Resources.Namespace) + GinkgoWriter.Println("===============================================") + if CurrentSpecReport().Failed() { + Expect(actions.SaveProjectsToFile(testData.Context, testData.K8SClient, testData.Resources.Namespace)).Should(Succeed()) + } + By("Delete Project and cluster resources", func() { + actions.DeleteTestDataProject(testData) + actions.AfterEachFinalCleanup([]model.TestDataProvider{*testData}) + }) + }) + + It("Should migrate a private endpoint configured in a project as sub-resource to a separate custom resource", func() { + By("Setting up project", func() { + testData = model.DataProvider( + "migrate-private-endpoint", + model.NewEmptyAtlasKeyType().UseDefaultFullAccess(), + 40000, + []func(*model.TestDataProvider){}, + ).WithProject(data.DefaultProject()) + + actions.ProjectCreationFlow(testData) + }) + + //nolint:dupl + By("Creating AWS private endpoint", func() { + awsRegion, err := cloud.GetAtlasRegionByProvider("AWS") + Expect(err).ToNot(HaveOccurred()) + + awsPE = &akov2.AtlasPrivateEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pe-aws-" + testData.Resources.TestID, + Namespace: testData.Resources.Namespace, + }, + Spec: akov2.AtlasPrivateEndpointSpec{ + Project: &common.ResourceRefNamespaced{ + Name: testData.Project.Name, + Namespace: testData.Project.Namespace, + }, + Provider: "AWS", + Region: awsRegion, + }, + } + + Expect(testData.K8SClient.Create(testData.Context, awsPE)).To(Succeed()) + Eventually(func(g Gomega) { //nolint:dupl + expectedConditions := conditions.MatchConditions( + api.TrueCondition(api.PrivateEndpointServiceReady), + api.FalseCondition(api.PrivateEndpointReady). + WithReason(string(workflow.PrivateEndpointConfigurationPending)). + WithMessageRegexp("waiting for private endpoint configuration from customer side"), + api.FalseCondition(api.ReadyType), + ) + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(awsPE), awsPE)).To(Succeed()) + g.Expect(awsPE.Status.Conditions).To(ContainElements(expectedConditions)) + g.Expect(awsPE.Status.ServiceStatus).To(Equal("AVAILABLE")) + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + + By("Updating project doesn't affect private endpoint", func() { + Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + testData.Project.Spec.Settings = &akov2.ProjectSettings{ + IsSchemaAdvisorEnabled: pointer.MakePtr(true), + IsRealtimePerformancePanelEnabled: pointer.MakePtr(true), + } + + Expect(testData.K8SClient.Update(testData.Context, testData.Project)).To(Succeed()) + Eventually(func(g Gomega) { + notExpectedConditions := conditions.MatchConditions( + api.TrueCondition(api.PrivateEndpointServiceReady), + api.TrueCondition(api.PrivateEndpointReady), + api.FalseCondition(api.PrivateEndpointServiceReady), + api.FalseCondition(api.PrivateEndpointReady), + ) + + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(testData.Project), testData.Project)).To(Succeed()) + g.Expect(testData.Project.Status.Conditions).ToNot(ContainElements(notExpectedConditions)) + g.Expect(testData.Project.Status.Conditions).To(ContainElement(conditions.MatchCondition(api.TrueCondition(api.ReadyType)))) + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + + By("Private endpoint are still ready", func() { + Eventually(func(g Gomega) { //nolint:dupl + expectedConditions := conditions.MatchConditions( + api.TrueCondition(api.PrivateEndpointServiceReady), + api.FalseCondition(api.PrivateEndpointReady). + WithReason(string(workflow.PrivateEndpointConfigurationPending)). + WithMessageRegexp("waiting for private endpoint configuration from customer side"), + api.FalseCondition(api.ReadyType), + ) + + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(awsPE), awsPE)).To(Succeed()) + g.Expect(awsPE.Status.Conditions).To(ContainElements(expectedConditions)) + g.Expect(awsPE.Status.ServiceStatus).To(Equal("AVAILABLE")) + }).WithTimeout(15 * time.Minute).WithPolling(10 * time.Second).Should(Succeed()) + }) + + By("Removing private endpoints", func() { + Expect(testData.K8SClient.Delete(testData.Context, awsPE)).To(Succeed()) + + Eventually(func(g Gomega) { + g.Expect(testData.K8SClient.Get(testData.Context, client.ObjectKeyFromObject(awsPE), awsPE)).ShouldNot(Succeed()) + }).WithTimeout(15 * time.Minute).WithPolling(20 * time.Second).Should(Succeed()) + }) + }) +}) + func statusForProvider(peStatus []status.ProjectPrivateEndpoint, providerName provider.ProviderName) *status.ProjectPrivateEndpoint { for _, s := range peStatus { if s.Provider == providerName { From b213be57548e1c55eddd20f9a7f5671e055dc17b Mon Sep 17 00:00:00 2001 From: Helder Santana Date: Tue, 17 Dec 2024 21:42:51 +0100 Subject: [PATCH 2/4] move behavior out of public API --- pkg/api/v1/atlasproject_types.go | 19 --------- pkg/api/v1/atlasproject_types_test.go | 40 ++----------------- .../atlasproject/atlasproject_controller.go | 17 ++++++++ .../atlasproject_controller_test.go | 33 +++++++++++++++ 4 files changed, 54 insertions(+), 55 deletions(-) diff --git a/pkg/api/v1/atlasproject_types.go b/pkg/api/v1/atlasproject_types.go index 950df055df..1cec26bff9 100644 --- a/pkg/api/v1/atlasproject_types.go +++ b/pkg/api/v1/atlasproject_types.go @@ -17,9 +17,6 @@ limitations under the License. package v1 import ( - "encoding/json" - "fmt" - "go.uber.org/zap/zapcore" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" @@ -298,22 +295,6 @@ func (p *AtlasProject) WithMaintenanceWindow(window project.MaintenanceWindow) * return p } -func (p *AtlasProject) LastSpecFrom(annotation string) (*AtlasProjectSpec, error) { - var lastApplied AtlasProject - ann, ok := p.GetAnnotations()[annotation] - - if !ok { - return nil, nil - } - - err := json.Unmarshal([]byte(ann), &lastApplied.Spec) - if err != nil { - return nil, fmt.Errorf("error reading AtlasProject Spec from annotation [%s]: %w", annotation, err) - } - - return &lastApplied.Spec, nil -} - func DefaultProject(namespace, connectionSecretName string) *AtlasProject { return NewProject(namespace, "test-project", namespace).WithConnectionSecret(connectionSecretName) } diff --git a/pkg/api/v1/atlasproject_types_test.go b/pkg/api/v1/atlasproject_types_test.go index 031fa0cf88..479c372cf4 100644 --- a/pkg/api/v1/atlasproject_types_test.go +++ b/pkg/api/v1/atlasproject_types_test.go @@ -6,13 +6,14 @@ import ( "testing" "time" - "github.com/google/go-cmp/cmp" - "github.com/stretchr/testify/assert" "go.mongodb.org/atlas-sdk/v20231115004/admin" + + "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/common" + + "github.com/google/go-cmp/cmp" "sigs.k8s.io/yaml" internalcmp "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/cmp" - "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/api/v1/common" ) func TestSpecEquality(t *testing.T) { @@ -125,36 +126,3 @@ func mustMarshal(t *testing.T, what any) string { } return string(result) } - -func TestLastSpecFrom(t *testing.T) { - tests := map[string]struct { - annotations map[string]string - expectedLastSpec *AtlasProjectSpec - expectedError string - }{ - - "should return nil when there is no last spec": {}, - "should return error when last spec annotation is wrong": { - annotations: map[string]string{"mongodb.com/last-applied-configuration": "{wrong}"}, - expectedError: "error reading AtlasProject Spec from annotation [mongodb.com/last-applied-configuration]:" + - " invalid character 'w' looking for beginning of object key string", - }, - "should return last spec": { - annotations: map[string]string{"mongodb.com/last-applied-configuration": "{\"name\": \"my-project\"}"}, - expectedLastSpec: &AtlasProjectSpec{ - Name: "my-project", - }, - }, - } - for name, tt := range tests { - t.Run(name, func(t *testing.T) { - p := &AtlasProject{} - p.WithAnnotations(tt.annotations) - lastSpec, err := p.LastSpecFrom("mongodb.com/last-applied-configuration") - if err != nil { - assert.ErrorContains(t, err, tt.expectedError) - } - assert.Equal(t, tt.expectedLastSpec, lastSpec) - }) - } -} diff --git a/pkg/controller/atlasproject/atlasproject_controller.go b/pkg/controller/atlasproject/atlasproject_controller.go index b5995cd081..a62ec406b1 100644 --- a/pkg/controller/atlasproject/atlasproject_controller.go +++ b/pkg/controller/atlasproject/atlasproject_controller.go @@ -18,6 +18,7 @@ package atlasproject import ( "context" + "encoding/json" "fmt" "go.uber.org/zap" @@ -333,3 +334,19 @@ func logIfWarning(ctx *workflow.Context, result workflow.Result) { ctx.Log.Warnw(result.GetMessage()) } } + +func lastSpecFrom(atlasProject *akov2.AtlasProject, annotation string) (*akov2.AtlasProjectSpec, error) { + var lastApplied akov2.AtlasProject + ann, ok := atlasProject.GetAnnotations()[annotation] + + if !ok { + return nil, nil + } + + err := json.Unmarshal([]byte(ann), &lastApplied.Spec) + if err != nil { + return nil, fmt.Errorf("error reading AtlasProject Spec from annotation [%s]: %w", annotation, err) + } + + return &lastApplied.Spec, nil +} diff --git a/pkg/controller/atlasproject/atlasproject_controller_test.go b/pkg/controller/atlasproject/atlasproject_controller_test.go index 6c2588173b..b6bfafbdd4 100644 --- a/pkg/controller/atlasproject/atlasproject_controller_test.go +++ b/pkg/controller/atlasproject/atlasproject_controller_test.go @@ -388,3 +388,36 @@ func TestFindProjectsForTeams(t *testing.T) { }) } } + +func TestLastSpecFrom(t *testing.T) { + tests := map[string]struct { + annotations map[string]string + expectedLastSpec *akov2.AtlasProjectSpec + expectedError string + }{ + + "should return nil when there is no last spec": {}, + "should return error when last spec annotation is wrong": { + annotations: map[string]string{"mongodb.com/last-applied-configuration": "{wrong}"}, + expectedError: "error reading AtlasProject Spec from annotation [mongodb.com/last-applied-configuration]:" + + " invalid character 'w' looking for beginning of object key string", + }, + "should return last spec": { + annotations: map[string]string{"mongodb.com/last-applied-configuration": "{\"name\": \"my-project\"}"}, + expectedLastSpec: &akov2.AtlasProjectSpec{ + Name: "my-project", + }, + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + p := &akov2.AtlasProject{} + p.WithAnnotations(tt.annotations) + lastSpec, err := lastSpecFrom(p, "mongodb.com/last-applied-configuration") + if err != nil { + assert.ErrorContains(t, err, tt.expectedError) + } + assert.Equal(t, tt.expectedLastSpec, lastSpec) + }) + } +} From 047dd92828713f7fef4bafb05d9f39681d65d7e0 Mon Sep 17 00:00:00 2001 From: Helder Santana Date: Tue, 17 Dec 2024 21:43:33 +0100 Subject: [PATCH 3/4] Fix reconciliation --- pkg/controller/atlasproject/private_endpoint.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/controller/atlasproject/private_endpoint.go b/pkg/controller/atlasproject/private_endpoint.go index 3838a347b9..a6cc005819 100644 --- a/pkg/controller/atlasproject/private_endpoint.go +++ b/pkg/controller/atlasproject/private_endpoint.go @@ -55,7 +55,7 @@ func ensurePrivateEndpoint(workflowCtx *workflow.Context, project *akov2.AtlasPr return result } - if (len(specPEs) == 0 && len(atlasPEs) == 0) || !hasManagedPrivateEndpoints(atlasPEs, lastAppliedPEs) { + if (len(specPEs) == 0 && len(atlasPEs) == 0) || !hasManagedPrivateEndpoints(specPEs, atlasPEs, lastAppliedPEs) { workflowCtx.UnsetCondition(api.PrivateEndpointServiceReadyType) workflowCtx.UnsetCondition(api.PrivateEndpointReadyType) return workflow.OK() @@ -608,7 +608,7 @@ func hasSkippedPrivateEndpointConfiguration(atlasProject *akov2.AtlasProject) (b } func mapLastAppliedPrivateEndpoint(atlasProject *akov2.AtlasProject) (map[string]akov2.PrivateEndpoint, error) { - lastApplied, err := atlasProject.LastSpecFrom(customresource.AnnotationLastAppliedConfiguration) + lastApplied, err := lastSpecFrom(atlasProject, customresource.AnnotationLastAppliedConfiguration) if err != nil { return nil, err } @@ -625,12 +625,12 @@ func mapLastAppliedPrivateEndpoint(atlasProject *akov2.AtlasProject) (map[string return result, nil } -func hasManagedPrivateEndpoints(atlasPEs []atlasPE, lastAppliedPEs map[string]akov2.PrivateEndpoint) bool { +func hasManagedPrivateEndpoints(specPEs []akov2.PrivateEndpoint, atlasPEs []atlasPE, lastAppliedPEs map[string]akov2.PrivateEndpoint) bool { for _, pe := range atlasPEs { if _, ok := lastAppliedPEs[pe.Identifier().(string)]; ok { return true } } - return false + return len(set.DeprecatedDifference(specPEs, atlasPEs)) == 0 } From dfc78b635eea104e62f0a3e5b5bfb2d9c094170f Mon Sep 17 00:00:00 2001 From: Helder Santana Date: Tue, 17 Dec 2024 23:19:39 +0100 Subject: [PATCH 4/4] Fix private endpoint reconciliation --- pkg/controller/atlasproject/private_endpoint.go | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/pkg/controller/atlasproject/private_endpoint.go b/pkg/controller/atlasproject/private_endpoint.go index a6cc005819..a1fa325a6b 100644 --- a/pkg/controller/atlasproject/private_endpoint.go +++ b/pkg/controller/atlasproject/private_endpoint.go @@ -61,7 +61,7 @@ func ensurePrivateEndpoint(workflowCtx *workflow.Context, project *akov2.AtlasPr return workflow.OK() } - serviceStatus := getStatusForServices(atlasPEs, lastAppliedPEs) + serviceStatus := getStatusForServices(atlasPEs) if !serviceStatus.IsOk() { workflowCtx.SetConditionFromResult(api.PrivateEndpointServiceReadyType, serviceStatus) return serviceStatus @@ -80,7 +80,7 @@ func ensurePrivateEndpoint(workflowCtx *workflow.Context, project *akov2.AtlasPr } } - interfaceStatus := getStatusForInterfaces(workflowCtx, project.ID(), specPEs, atlasPEs, lastAppliedPEs) + interfaceStatus := getStatusForInterfaces(workflowCtx, project.ID(), specPEs, atlasPEs) workflowCtx.SetConditionFromResult(api.PrivateEndpointReadyType, interfaceStatus) return interfaceStatus @@ -126,13 +126,9 @@ func syncPrivateEndpointsWithAtlas( return workflow.OK(), api.PrivateEndpointReadyType } -func getStatusForServices(atlasPEs []atlasPE, lastAppliedPEs map[string]akov2.PrivateEndpoint) workflow.Result { +func getStatusForServices(atlasPEs []atlasPE) workflow.Result { allAvailable := true for _, conn := range atlasPEs { - if _, ok := lastAppliedPEs[conn.Identifier().(string)]; !ok { - continue - } - if isFailed(conn.GetStatus()) { return workflow.Terminate(workflow.ProjectPEServiceIsNotReadyInAtlas, conn.GetErrorMessage()) } @@ -149,14 +145,10 @@ func getStatusForServices(atlasPEs []atlasPE, lastAppliedPEs map[string]akov2.Pr return workflow.OK() } -func getStatusForInterfaces(ctx *workflow.Context, projectID string, specPEs []akov2.PrivateEndpoint, atlasPEs []atlasPE, lastAppliedPEs map[string]akov2.PrivateEndpoint) workflow.Result { +func getStatusForInterfaces(ctx *workflow.Context, projectID string, specPEs []akov2.PrivateEndpoint, atlasPEs []atlasPE) workflow.Result { totalInterfaceCount := 0 for _, atlasPeService := range atlasPEs { - if _, ok := lastAppliedPEs[atlasPeService.Identifier().(string)]; !ok { - continue - } - interfaceEndpointIDs := atlasPeService.InterfaceEndpointIDs() totalInterfaceCount += len(interfaceEndpointIDs)