Skip to content

Commit

Permalink
chore: Upgrades datalake_pipeline resource to auto-generated SDK (#1911)
Browse files Browse the repository at this point in the history
* rename

* change region to avoid out of capacity errors

* plural datasource doesn't depend on resources so we can ensure the 2 resources are created when ds is executed

* Revert "change region to avoid out of capacity errors"

This reverts commit 7c78823.

* connv2 in tests

* data sources, read, import

* data source runs

* delete

* create

* migration test
  • Loading branch information
lantoli committed Feb 8, 2024
1 parent 34c8eb1 commit c93207c
Show file tree
Hide file tree
Showing 12 changed files with 328 additions and 315 deletions.
28 changes: 27 additions & 1 deletion .github/workflows/migration-tests.yml
Expand Up @@ -65,6 +65,7 @@ jobs:
network: ${{ steps.filter.outputs.network == 'true' || env.mustTrigger == 'true' }}
encryption: ${{ steps.filter.outputs.encryption == 'true' || env.mustTrigger == 'true' }}
serverless: ${{ steps.filter.outputs.serverless == 'true' || env.mustTrigger == 'true' }}
data_lake: ${{ steps.filter.outputs.data_lake == 'true' || env.mustTrigger == 'true' }}
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11
- uses: dorny/paths-filter@0bc4621a3135347011ad047f9ecf449bf72ce2bd
Expand Down Expand Up @@ -129,6 +130,8 @@ jobs:
- 'internal/service/serverlessinstance/*.go'
- 'internal/service/privatelinkendpointserverless/*.go'
- 'internal/service/privatelinkendpointserviceserverless/*.go'
data_lake:
- 'internal/service/datalakepipeline/*.go'
project:
needs: [ change-detection, get-provider-version ]
Expand Down Expand Up @@ -435,4 +438,27 @@ jobs:
MONGODB_ATLAS_LAST_VERSION: ${{ needs.get-provider-version.outputs.provider_version }}
TEST_REGEX: "^TestAccMigrationServerless"
run: make testacc

data_lake:
needs: [ change-detection, get-provider-version ]
if: ${{ needs.change-detection.outputs.data_lake == 'true' || inputs.test_group == 'data_lake' }}
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: 'go.mod'
- uses: hashicorp/setup-terraform@v3
with:
terraform_version: ${{ env.terraform_version }}
terraform_wrapper: false
- name: Migration Tests
env:
MONGODB_ATLAS_PUBLIC_KEY: ${{ secrets.MONGODB_ATLAS_PUBLIC_KEY_CLOUD_DEV }}
MONGODB_ATLAS_PRIVATE_KEY: ${{ secrets.MONGODB_ATLAS_PRIVATE_KEY_CLOUD_DEV }}
MONGODB_ATLAS_ORG_ID: ${{ vars.MONGODB_ATLAS_ORG_ID_CLOUD_DEV }}
MONGODB_ATLAS_BASE_URL: ${{ vars.MONGODB_ATLAS_BASE_URL }}
MONGODB_ATLAS_LAST_VERSION: ${{ needs.get-provider-version.outputs.provider_version }}
TEST_REGEX: "^TestAccMigrationcDataLake"
run: make testacc
69 changes: 31 additions & 38 deletions internal/service/datalakepipeline/data_source_data_lake_pipeline.go
Expand Up @@ -8,12 +8,11 @@ import (
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/mongodb/terraform-provider-mongodbatlas/internal/common/conversion"
"github.com/mongodb/terraform-provider-mongodbatlas/internal/config"
matlas "go.mongodb.org/atlas/mongodbatlas"
)

func DataSource() *schema.Resource {
return &schema.Resource{
ReadContext: dataSourceMongoDBAtlasDataLakePipelineRead,
ReadContext: dataSourceRead,
Schema: map[string]*schema.Schema{
"project_id": {
Type: schema.TypeString,
Expand Down Expand Up @@ -119,13 +118,13 @@ func DataSource() *schema.Resource {
},
},
},
"snapshots": dataSourceSchemaDataLakePipelineSnapshots(),
"ingestion_schedules": dataSourceSchemaDataLakePipelineIngestionSchedules(),
"snapshots": dataSourceSchemaSnapshots(),
"ingestion_schedules": dataSourceSchemaIngestionSchedules(),
},
}
}

func dataSourceSchemaDataLakePipelineIngestionSchedules() *schema.Schema {
func dataSourceSchemaIngestionSchedules() *schema.Schema {
return &schema.Schema{
Type: schema.TypeSet,
Computed: true,
Expand Down Expand Up @@ -156,7 +155,7 @@ func dataSourceSchemaDataLakePipelineIngestionSchedules() *schema.Schema {
}
}

func dataSourceSchemaDataLakePipelineSnapshots() *schema.Schema {
func dataSourceSchemaSnapshots() *schema.Schema {
return &schema.Schema{
Type: schema.TypeSet,
Computed: true,
Expand Down Expand Up @@ -222,73 +221,67 @@ func dataSourceSchemaDataLakePipelineSnapshots() *schema.Schema {
}
}

func dataSourceMongoDBAtlasDataLakePipelineRead(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics {
conn := meta.(*config.MongoDBClient).Atlas
func dataSourceRead(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics {
connV2 := meta.(*config.MongoDBClient).AtlasV2
projectID := d.Get("project_id").(string)
name := d.Get("name").(string)

dataLakePipeline, _, err := conn.DataLakePipeline.Get(ctx, projectID, name)
pipeline, _, err := connV2.DataLakePipelinesApi.GetPipeline(ctx, projectID, name).Execute()
if err != nil {
return diag.FromErr(fmt.Errorf(errorDataLakePipelineRead, name, err))
}

snapshots, _, err := conn.DataLakePipeline.ListSnapshots(ctx, projectID, name, nil)
snapshots, _, err := connV2.DataLakePipelinesApi.ListPipelineSnapshots(ctx, projectID, name).Execute()
if err != nil {
return diag.FromErr(fmt.Errorf(errorDataLakePipelineRead, name, err))
}

ingestionSchedules, _, err := conn.DataLakePipeline.ListIngestionSchedules(ctx, projectID, name)
ingestionSchedules, _, err := connV2.DataLakePipelinesApi.ListPipelineSchedules(ctx, projectID, name).Execute()
if err != nil {
return diag.FromErr(fmt.Errorf(errorDataLakePipelineRead, name, err))
}

return setDataLakeResourceData(d, dataLakePipeline, snapshots, ingestionSchedules)
}
pipelineName := pipeline.GetName()

func setDataLakeResourceData(
d *schema.ResourceData,
pipeline *matlas.DataLakePipeline,
snapshots *matlas.DataLakePipelineSnapshotsResponse,
ingestionSchedules []*matlas.DataLakePipelineIngestionSchedule) diag.Diagnostics {
if err := d.Set("id", pipeline.ID); err != nil {
return diag.FromErr(fmt.Errorf(ErrorDataLakeSetting, "id", pipeline.Name, err))
if err := d.Set("id", pipeline.GetId()); err != nil {
return diag.FromErr(fmt.Errorf(ErrorDataLakeSetting, "id", pipelineName, err))
}

if err := d.Set("state", pipeline.State); err != nil {
return diag.FromErr(fmt.Errorf(errorDataLakePipelineSetting, "state", pipeline.Name, err))
if err := d.Set("state", pipeline.GetState()); err != nil {
return diag.FromErr(fmt.Errorf(errorDataLakePipelineSetting, "state", pipelineName, err))
}

if err := d.Set("created_date", pipeline.CreatedDate); err != nil {
return diag.FromErr(fmt.Errorf(errorDataLakePipelineSetting, "created_date", pipeline.Name, err))
if err := d.Set("created_date", conversion.TimePtrToStringPtr(pipeline.CreatedDate)); err != nil {
return diag.FromErr(fmt.Errorf(errorDataLakePipelineSetting, "created_date", pipelineName, err))
}

if err := d.Set("last_updated_date", pipeline.LastUpdatedDate); err != nil {
return diag.FromErr(fmt.Errorf(errorDataLakePipelineSetting, "last_updated_date", pipeline.Name, err))
if err := d.Set("last_updated_date", conversion.TimePtrToStringPtr(pipeline.LastUpdatedDate)); err != nil {
return diag.FromErr(fmt.Errorf(errorDataLakePipelineSetting, "last_updated_date", pipelineName, err))
}

if err := d.Set("sink", flattenDataLakePipelineSink(pipeline.Sink)); err != nil {
return diag.FromErr(fmt.Errorf(errorDataLakePipelineSetting, "sink", pipeline.Name, err))
if err := d.Set("sink", flattenSink(pipeline.Sink)); err != nil {
return diag.FromErr(fmt.Errorf(errorDataLakePipelineSetting, "sink", pipelineName, err))
}

if err := d.Set("source", flattenDataLakePipelineSource(pipeline.Source)); err != nil {
return diag.FromErr(fmt.Errorf(errorDataLakePipelineSetting, "source", pipeline.Name, err))
if err := d.Set("source", flattenSource(pipeline.Source)); err != nil {
return diag.FromErr(fmt.Errorf(errorDataLakePipelineSetting, "source", pipelineName, err))
}

if err := d.Set("transformations", flattenDataLakePipelineTransformations(pipeline.Transformations)); err != nil {
return diag.FromErr(fmt.Errorf(errorDataLakePipelineSetting, "transformations", pipeline.Name, err))
if err := d.Set("transformations", flattenTransformations(pipeline.GetTransformations())); err != nil {
return diag.FromErr(fmt.Errorf(errorDataLakePipelineSetting, "transformations", pipelineName, err))
}

if err := d.Set("snapshots", flattenDataLakePipelineSnapshots(snapshots.Results)); err != nil {
return diag.FromErr(fmt.Errorf(errorDataLakePipelineSetting, "snapshots", pipeline.Name, err))
if err := d.Set("snapshots", flattenSnapshots(snapshots.GetResults())); err != nil {
return diag.FromErr(fmt.Errorf(errorDataLakePipelineSetting, "snapshots", pipelineName, err))
}

if err := d.Set("ingestion_schedules", flattenDataLakePipelineIngestionSchedules(ingestionSchedules)); err != nil {
return diag.FromErr(fmt.Errorf(errorDataLakePipelineSetting, "ingestion_schedules", pipeline.Name, err))
if err := d.Set("ingestion_schedules", flattenIngestionSchedules(ingestionSchedules)); err != nil {
return diag.FromErr(fmt.Errorf(errorDataLakePipelineSetting, "ingestion_schedules", pipelineName, err))
}

d.SetId(conversion.EncodeStateID(map[string]string{
"project_id": pipeline.GroupID,
"name": pipeline.Name,
"project_id": pipeline.GetGroupId(),
"name": pipelineName,
}))

return nil
Expand Down
Expand Up @@ -9,14 +9,14 @@ import (
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/mongodb/terraform-provider-mongodbatlas/internal/common/conversion"
"github.com/mongodb/terraform-provider-mongodbatlas/internal/config"
matlas "go.mongodb.org/atlas/mongodbatlas"
"go.mongodb.org/atlas-sdk/v20231115005/admin"
)

const errorDataLakePipelineRunRead = "error reading MongoDB Atlas DataLake Run (%s): %s"

func DataSourceRun() *schema.Resource {
return &schema.Resource{
ReadContext: dataSourceMongoDBAtlasDataLakeRunRead,
ReadContext: dataSourceRunRead,
Schema: map[string]*schema.Schema{
"project_id": {
Type: schema.TypeString,
Expand Down Expand Up @@ -87,13 +87,13 @@ func DataSourceRun() *schema.Resource {
}
}

func dataSourceMongoDBAtlasDataLakeRunRead(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics {
conn := meta.(*config.MongoDBClient).Atlas
func dataSourceRunRead(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics {
connV2 := meta.(*config.MongoDBClient).AtlasV2
projectID := d.Get("project_id").(string)
name := d.Get("pipeline_name").(string)
pipelineRunID := d.Get("pipeline_run_id").(string)

dataLakeRun, resp, err := conn.DataLakePipeline.GetRun(ctx, projectID, name, pipelineRunID)
run, resp, err := connV2.DataLakePipelinesApi.GetPipelineRun(ctx, projectID, name, pipelineRunID).Execute()
if err != nil {
if resp != nil && resp.StatusCode == http.StatusNotFound {
d.SetId("")
Expand All @@ -103,47 +103,47 @@ func dataSourceMongoDBAtlasDataLakeRunRead(ctx context.Context, d *schema.Resour
return diag.FromErr(fmt.Errorf(errorDataLakePipelineRunRead, name, err))
}

if err := d.Set("id", dataLakeRun.ID); err != nil {
if err := d.Set("id", run.GetId()); err != nil {
return diag.FromErr(fmt.Errorf(ErrorDataLakeSetting, "hostnames", name, err))
}

if err := d.Set("project_id", dataLakeRun.GroupID); err != nil {
if err := d.Set("project_id", run.GetGroupId()); err != nil {
return diag.FromErr(fmt.Errorf(ErrorDataLakeSetting, "state", name, err))
}

if err := d.Set("created_date", dataLakeRun.CreatedDate); err != nil {
if err := d.Set("created_date", conversion.TimePtrToStringPtr(run.CreatedDate)); err != nil {
return diag.FromErr(fmt.Errorf(ErrorDataLakeSetting, "storage_databases", name, err))
}

if err := d.Set("last_updated_date", dataLakeRun.LastUpdatedDate); err != nil {
if err := d.Set("last_updated_date", conversion.TimePtrToStringPtr(run.LastUpdatedDate)); err != nil {
return diag.FromErr(fmt.Errorf(ErrorDataLakeSetting, "storage_databases", name, err))
}

if err := d.Set("state", dataLakeRun.State); err != nil {
if err := d.Set("state", run.GetState()); err != nil {
return diag.FromErr(fmt.Errorf(ErrorDataLakeSetting, "storage_databases", name, err))
}

if err := d.Set("phase", dataLakeRun.Phase); err != nil {
if err := d.Set("phase", run.GetPhase()); err != nil {
return diag.FromErr(fmt.Errorf(ErrorDataLakeSetting, "storage_databases", name, err))
}

if err := d.Set("pipeline_id", dataLakeRun.PipelineID); err != nil {
if err := d.Set("pipeline_id", run.GetPipelineId()); err != nil {
return diag.FromErr(fmt.Errorf(ErrorDataLakeSetting, "storage_stores", name, err))
}

if err := d.Set("dataset_name", dataLakeRun.DatasetName); err != nil {
if err := d.Set("dataset_name", run.GetDatasetName()); err != nil {
return diag.FromErr(fmt.Errorf(ErrorDataLakeSetting, "storage_stores", name, err))
}

if err := d.Set("snapshot_id", dataLakeRun.SnapshotID); err != nil {
if err := d.Set("snapshot_id", run.GetSnapshotId()); err != nil {
return diag.FromErr(fmt.Errorf(ErrorDataLakeSetting, "storage_stores", name, err))
}

if err := d.Set("backup_frequency_type", dataLakeRun.BackupFrequencyType); err != nil {
if err := d.Set("backup_frequency_type", run.GetBackupFrequencyType()); err != nil {
return diag.FromErr(fmt.Errorf(ErrorDataLakeSetting, "storage_stores", name, err))
}

if err := d.Set("stats", flattenDataLakePipelineRunStats(dataLakeRun.Stats)); err != nil {
if err := d.Set("stats", flattenRunStats(run.Stats)); err != nil {
return diag.FromErr(fmt.Errorf(ErrorDataLakeSetting, "storage_stores", name, err))
}

Expand All @@ -156,15 +156,14 @@ func dataSourceMongoDBAtlasDataLakeRunRead(ctx context.Context, d *schema.Resour
return nil
}

func flattenDataLakePipelineRunStats(datalakeRunStats *matlas.DataLakePipelineRunStats) []map[string]any {
if datalakeRunStats == nil {
func flattenRunStats(stats *admin.PipelineRunStats) []map[string]any {
if stats == nil {
return nil
}

maps := make([]map[string]any, 1)
maps[0] = map[string]any{
"bytes_exported": datalakeRunStats.BytesExported,
"num_docs": datalakeRunStats.NumDocs,
"bytes_exported": stats.GetBytesExported(),
"num_docs": stats.GetNumDocs(),
}
return maps
}
Expand Up @@ -23,7 +23,7 @@ func TestAccDataLakeRunDS_basic(t *testing.T) {
ProtoV6ProviderFactories: acc.TestAccProviderV6Factories,
Steps: []resource.TestStep{
{
Config: testAccMongoDBAtlasDataLakeDataSourcePipelineRunConfig(projectID, pipelineName, runID),
Config: configRunDS(projectID, pipelineName, runID),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttrSet(dataSourceName, "project_id"),
resource.TestCheckResourceAttr(dataSourceName, "pipeline_name", pipelineName),
Expand All @@ -38,7 +38,7 @@ func TestAccDataLakeRunDS_basic(t *testing.T) {
})
}

func testAccMongoDBAtlasDataLakeDataSourcePipelineRunConfig(projectID, pipelineName, runID string) string {
func configRunDS(projectID, pipelineName, runID string) string {
return fmt.Sprintf(`
data "mongodbatlas_data_lake_pipeline_run" "test" {
Expand Down
Expand Up @@ -7,15 +7,16 @@ import (
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/id"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/mongodb/terraform-provider-mongodbatlas/internal/common/conversion"
"github.com/mongodb/terraform-provider-mongodbatlas/internal/config"
matlas "go.mongodb.org/atlas/mongodbatlas"
"go.mongodb.org/atlas-sdk/v20231115005/admin"
)

const errorDataLakePipelineRunList = "error reading MongoDB Atlas DataLake Runs (%s): %s"

func PluralDataSourceRun() *schema.Resource {
return &schema.Resource{
ReadContext: dataSourceMongoDBAtlasDataLakeRunsRead,
ReadContext: dataSourcePluralRunRead,
Schema: map[string]*schema.Schema{
"project_id": {
Type: schema.TypeString,
Expand Down Expand Up @@ -90,46 +91,38 @@ func PluralDataSourceRun() *schema.Resource {
}
}

func dataSourceMongoDBAtlasDataLakeRunsRead(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics {
conn := meta.(*config.MongoDBClient).Atlas
func dataSourcePluralRunRead(ctx context.Context, d *schema.ResourceData, meta any) diag.Diagnostics {
connV2 := meta.(*config.MongoDBClient).AtlasV2
projectID := d.Get("project_id").(string)
name := d.Get("pipeline_name").(string)

dataLakeRuns, _, err := conn.DataLakePipeline.ListRuns(ctx, projectID, name)
runs, _, err := connV2.DataLakePipelinesApi.ListPipelineRuns(ctx, projectID, name).Execute()
if err != nil {
return diag.FromErr(fmt.Errorf(errorDataLakePipelineRunList, projectID, err))
}

if err := d.Set("results", flattenDataLakePipelineRunResult(dataLakeRuns.Results)); err != nil {
if err := d.Set("results", flattenRunResults(runs.GetResults())); err != nil {
return diag.FromErr(fmt.Errorf(ErrorDataLakeSetting, "results", projectID, err))
}

d.SetId(id.UniqueId())

return nil
}

func flattenDataLakePipelineRunResult(datalakePipelineRuns []*matlas.DataLakePipelineRun) []map[string]any {
var results []map[string]any

func flattenRunResults(datalakePipelineRuns []admin.IngestionPipelineRun) []map[string]any {
if len(datalakePipelineRuns) == 0 {
return results
return nil
}

results = make([]map[string]any, len(datalakePipelineRuns))
results := make([]map[string]any, len(datalakePipelineRuns))

for k, run := range datalakePipelineRuns {
results[k] = map[string]any{
"id": run.ID,
"created_date": run.CreatedDate,
"last_updated_date": run.LastUpdatedDate,
"state": run.State,
"pipeline_id": run.PipelineID,
"snapshot_id": run.SnapshotID,
"backup_frequency_type": run.BackupFrequencyType,
"stats": flattenDataLakePipelineRunStats(run.Stats),
"id": run.GetId(),
"created_date": conversion.TimePtrToStringPtr(run.CreatedDate),
"last_updated_date": conversion.TimePtrToStringPtr(run.LastUpdatedDate),
"state": run.GetState(),
"pipeline_id": run.GetPipelineId(),
"snapshot_id": run.GetSnapshotId(),
"backup_frequency_type": run.GetBackupFrequencyType(),
"stats": flattenRunStats(run.Stats),
}
}

return results
}

0 comments on commit c93207c

Please sign in to comment.