-
Notifications
You must be signed in to change notification settings - Fork 107
CLOUDP-289150: Fix private endpoint conflict between sub-resource and new CRD #1998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,12 +36,17 @@ func ensurePrivateEndpoint(workflowCtx *workflow.Context, project *akov2.AtlasPr | |
|
|
||
| specPEs := project.Spec.DeepCopy().PrivateEndpoints | ||
|
|
||
| lastAppliedPEs, err := mapLastAppliedPrivateEndpoint(project) | ||
| if err != nil { | ||
| return workflow.Terminate(workflow.Internal, err.Error()) | ||
| } | ||
|
|
||
| atlasPEs, err := getAllPrivateEndpoints(workflowCtx.Context, workflowCtx.SdkClient, project.ID()) | ||
| if err != nil { | ||
| return workflow.Terminate(workflow.Internal, err.Error()) | ||
| } | ||
|
|
||
| result, conditionType := syncPrivateEndpointsWithAtlas(workflowCtx, project.ID(), specPEs, atlasPEs) | ||
| result, conditionType := syncPrivateEndpointsWithAtlas(workflowCtx, project.ID(), specPEs, atlasPEs, lastAppliedPEs) | ||
| if !result.IsOk() { | ||
| if conditionType == api.PrivateEndpointServiceReadyType { | ||
| workflowCtx.UnsetCondition(api.PrivateEndpointReadyType) | ||
|
|
@@ -50,13 +55,13 @@ func ensurePrivateEndpoint(workflowCtx *workflow.Context, project *akov2.AtlasPr | |
| return result | ||
| } | ||
|
|
||
| if len(specPEs) == 0 && len(atlasPEs) == 0 { | ||
| if (len(specPEs) == 0 && len(atlasPEs) == 0) || !hasManagedPrivateEndpoints(specPEs, atlasPEs, lastAppliedPEs) { | ||
| workflowCtx.UnsetCondition(api.PrivateEndpointServiceReadyType) | ||
| workflowCtx.UnsetCondition(api.PrivateEndpointReadyType) | ||
| return workflow.OK() | ||
| } | ||
|
|
||
| serviceStatus := getStatusForServices(workflowCtx, atlasPEs) | ||
| serviceStatus := getStatusForServices(atlasPEs) | ||
| if !serviceStatus.IsOk() { | ||
| workflowCtx.SetConditionFromResult(api.PrivateEndpointServiceReadyType, serviceStatus) | ||
| return serviceStatus | ||
|
|
@@ -81,11 +86,17 @@ func ensurePrivateEndpoint(workflowCtx *workflow.Context, project *akov2.AtlasPr | |
| return interfaceStatus | ||
| } | ||
|
|
||
| func syncPrivateEndpointsWithAtlas(ctx *workflow.Context, projectID string, specPEs []akov2.PrivateEndpoint, atlasPEs []atlasPE) (workflow.Result, api.ConditionType) { | ||
| func syncPrivateEndpointsWithAtlas( | ||
| ctx *workflow.Context, | ||
| projectID string, | ||
| specPEs []akov2.PrivateEndpoint, | ||
| atlasPEs []atlasPE, | ||
| lastAppliedPEs map[string]akov2.PrivateEndpoint, | ||
| ) (workflow.Result, api.ConditionType) { | ||
| log := ctx.Log | ||
|
|
||
| log.Debugw("PE Connections", "atlasPEs", atlasPEs, "specPEs", specPEs) | ||
| endpointsToDelete := getEndpointsNotInSpec(specPEs, atlasPEs) | ||
| endpointsToDelete := getEndpointsNotInSpec(specPEs, atlasPEs, lastAppliedPEs) | ||
| log.Debugf("Number of Private Endpoints to delete: %d", len(endpointsToDelete)) | ||
| if result := deletePrivateEndpointsFromAtlas(ctx, projectID, endpointsToDelete); !result.IsOk() { | ||
| return result, api.PrivateEndpointServiceReadyType | ||
|
|
@@ -115,12 +126,18 @@ func syncPrivateEndpointsWithAtlas(ctx *workflow.Context, projectID string, spec | |
| return workflow.OK(), api.PrivateEndpointReadyType | ||
| } | ||
|
|
||
| func getStatusForServices(ctx *workflow.Context, atlasPEs []atlasPE) workflow.Result { | ||
| allAvailable, failureMessage := areServicesAvailableOrFailed(atlasPEs) | ||
| ctx.Log.Debugw("Get Status for Services", "allAvailable", allAvailable, "failureMessage", failureMessage) | ||
| if failureMessage != "" { | ||
| return workflow.Terminate(workflow.ProjectPEServiceIsNotReadyInAtlas, failureMessage) | ||
| func getStatusForServices(atlasPEs []atlasPE) workflow.Result { | ||
| allAvailable := true | ||
| for _, conn := range atlasPEs { | ||
| if isFailed(conn.GetStatus()) { | ||
| return workflow.Terminate(workflow.ProjectPEServiceIsNotReadyInAtlas, conn.GetErrorMessage()) | ||
| } | ||
|
|
||
| if !isAvailable(conn.GetStatus()) { | ||
| allAvailable = false | ||
| } | ||
| } | ||
|
|
||
| if !allAvailable { | ||
| return notReadyServiceResult | ||
| } | ||
|
|
@@ -167,22 +184,6 @@ func getStatusForInterfaces(ctx *workflow.Context, projectID string, specPEs []a | |
| return workflow.OK() | ||
| } | ||
|
|
||
| func areServicesAvailableOrFailed(atlasPeConnections []atlasPE) (allAvailable bool, failureMessage string) { | ||
| allAvailable = true | ||
|
|
||
| for _, conn := range atlasPeConnections { | ||
| if isFailed(conn.GetStatus()) { | ||
| failureMessage = conn.GetErrorMessage() | ||
| return | ||
| } | ||
| if !isAvailable(conn.GetStatus()) { | ||
| allAvailable = false | ||
| } | ||
| } | ||
|
|
||
| return | ||
| } | ||
|
|
||
| func updatePEStatusOption(ctx *workflow.Context, projectID string, newConnections, syncedConnections []atlasPE) { | ||
| setPEStatusOption(ctx, projectID, syncedConnections) | ||
| addPEStatusOption(ctx, projectID, newConnections) | ||
|
|
@@ -332,14 +333,19 @@ func endpointDefinedInSpec(specEndpoint akov2.PrivateEndpoint) bool { | |
| return specEndpoint.ID != "" || specEndpoint.EndpointGroupName != "" | ||
| } | ||
|
|
||
| func DeleteAllPrivateEndpoints(ctx *workflow.Context, projectID string) workflow.Result { | ||
| atlasPEs, err := getAllPrivateEndpoints(ctx.Context, ctx.SdkClient, projectID) | ||
| func DeleteAllPrivateEndpoints(ctx *workflow.Context, atlasProject *akov2.AtlasProject) workflow.Result { | ||
| atlasPEs, err := getAllPrivateEndpoints(ctx.Context, ctx.SdkClient, atlasProject.ID()) | ||
| if err != nil { | ||
| return workflow.Terminate(workflow.Internal, err.Error()) | ||
| } | ||
|
|
||
| endpointsToDelete := getEndpointsNotInSpec([]akov2.PrivateEndpoint{}, atlasPEs) | ||
| return deletePrivateEndpointsFromAtlas(ctx, projectID, endpointsToDelete) | ||
| lastAppliedSpecPEs, err := mapLastAppliedPrivateEndpoint(atlasProject) | ||
| if err != nil { | ||
| return workflow.Terminate(workflow.Internal, err.Error()) | ||
| } | ||
|
|
||
| endpointsToDelete := getEndpointsNotInSpec([]akov2.PrivateEndpoint{}, atlasPEs, lastAppliedSpecPEs) | ||
| return deletePrivateEndpointsFromAtlas(ctx, atlasProject.ID(), endpointsToDelete) | ||
| } | ||
|
|
||
| func deletePrivateEndpointsFromAtlas(ctx *workflow.Context, projectID string, listsToRemove []atlasPE) workflow.Result { | ||
|
|
@@ -516,9 +522,16 @@ func terminateWithError(ctx *workflow.Context, conditionType api.ConditionType, | |
| var notReadyServiceResult = workflow.InProgress(workflow.ProjectPEServiceIsNotReadyInAtlas, "Private Endpoint Service is not ready") | ||
| var notReadyInterfaceResult = workflow.InProgress(workflow.ProjectPEInterfaceIsNotReadyInAtlas, "Interface Private Endpoint is not ready") | ||
|
|
||
| func getEndpointsNotInSpec(specPEs []akov2.PrivateEndpoint, atlasPEs []atlasPE) []atlasPE { | ||
| uniqueItems, _ := getUniqueDifference(atlasPEs, specPEs) | ||
| return uniqueItems | ||
| func getEndpointsNotInSpec(specPEs []akov2.PrivateEndpoint, atlasPEs []atlasPE, lastAppliedPEs map[string]akov2.PrivateEndpoint) []atlasPE { | ||
| notInSpecItems, _ := getUniqueDifference(atlasPEs, specPEs) | ||
| toDelete := make([]atlasPE, 0, len(notInSpecItems)) | ||
| for _, item := range notInSpecItems { | ||
| if _, ok := lastAppliedPEs[item.Identifier().(string)]; ok { | ||
| toDelete = append(toDelete, item) | ||
| } | ||
| } | ||
|
|
||
| return toDelete | ||
| } | ||
|
|
||
| func getEndpointsNotInAtlas(specPEs []akov2.PrivateEndpoint, atlasPEs []atlasPE) (toCreate []akov2.PrivateEndpoint, counts []int) { | ||
|
|
@@ -585,3 +598,31 @@ func hasSkippedPrivateEndpointConfiguration(atlasProject *akov2.AtlasProject) (b | |
|
|
||
| return false, nil | ||
| } | ||
|
|
||
| func mapLastAppliedPrivateEndpoint(atlasProject *akov2.AtlasProject) (map[string]akov2.PrivateEndpoint, error) { | ||
| lastApplied, err := lastSpecFrom(atlasProject, customresource.AnnotationLastAppliedConfiguration) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| if lastApplied == nil || len(lastApplied.PrivateEndpoints) == 0 { | ||
| return nil, nil | ||
| } | ||
|
|
||
| result := map[string]akov2.PrivateEndpoint{} | ||
| for _, pe := range lastApplied.PrivateEndpoints { | ||
| result[pe.Identifier().(string)] = pe | ||
| } | ||
|
|
||
| return result, nil | ||
| } | ||
|
|
||
| func hasManagedPrivateEndpoints(specPEs []akov2.PrivateEndpoint, atlasPEs []atlasPE, lastAppliedPEs map[string]akov2.PrivateEndpoint) bool { | ||
| for _, pe := range atlasPEs { | ||
| if _, ok := lastAppliedPEs[pe.Identifier().(string)]; ok { | ||
| return true | ||
| } | ||
| } | ||
|
|
||
| return len(set.DeprecatedDifference(specPEs, atlasPEs)) == 0 | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why are we using a deprecated diff on new code?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As mentioned in previous comments, while there's new addition to this part of the code, it still depends on deprecated types/interfaces unless fully refactor to use translation layer, which is not a goal. |
||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.