Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new mongodbatlas_stream_connection and mongodbatlas_stream_connections data sources #1757

Merged
merged 3 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/atlas-streams/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Atlas Stream Processing is composed of multiple components, and users can levera
- `mongodbatlas_stream_instance`: Enables creating, modifying, and deleting Stream Instances. as part of this resource, a computed `hostnames` attribute is available for connecting to the created instance.
- `mongodbatlas_stream_connection`: Enables creating, modifying, and deleting Stream Instance Connections, which serve as data sources and sinks for your instance.

**NOTE**: To leverage these resources you'll need to set the environment variable `MONGODB_ATLAS_ENABLE_BETA=true` as this functionality is currently in preview. Also see [Limitations](https://www.mongodb.com/docs/atlas/atlas-sp/limitations/#std-label-atlas-sp-limitations) of Atlas Streams during this preview period.
**NOTE**: To leverage these resources you'll need to set the environment variable `MONGODB_ATLAS_ENABLE_BETA=true` as this functionality is currently in preview. Also see [Limitations](https://www.mongodb.com/docs/atlas/atlas-sp/limitations/#std-label-atlas-sp-limitations) of Atlas Streams Processing during this preview period.

### Managing Stream Processors

Expand Down
20 changes: 20 additions & 0 deletions examples/atlas-streams/stream-connection/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,24 @@ resource "mongodbatlas_stream_connection" "example-kafka-ssl" {
broker_public_certificate = var.kafka_ssl_cert
protocol = "SSL"
}
}

data "mongodbatlas_stream_connection" "example-kafka-ssl" {
project_id = var.project_id
instance_name = mongodbatlas_stream_instance.example.instance_name
connection_name = mongodbatlas_stream_connection.example-kafka-ssl.connection_name
}

data "mongodbatlas_stream_connections" "example" {
project_id = var.project_id
instance_name = mongodbatlas_stream_instance.example.instance_name
}

# example making use of data sources
output "stream_connection_bootstrap_servers" {
value = data.mongodbatlas_stream_connection.example-kafka-ssl.bootstrap_servers
}

output "stream_connection_total_count" {
value = data.mongodbatlas_stream_connections.example.total_count
}
2 changes: 2 additions & 0 deletions internal/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,8 @@ func (p *MongodbtlasProvider) DataSources(context.Context) []func() datasource.D
betaDataSources := []func() datasource.DataSource{
streaminstance.DataSource,
streaminstance.PluralDataSource,
streamconnection.DataSource,
streamconnection.PluralDataSource,
}
if ProviderEnableBeta {
dataSources = append(dataSources, betaDataSources...)
Expand Down
121 changes: 121 additions & 0 deletions internal/service/streamconnection/data_source_stream_connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package streamconnection

import (
"context"

"github.com/hashicorp/terraform-plugin-framework/datasource"
"github.com/hashicorp/terraform-plugin-framework/datasource/schema"
"github.com/hashicorp/terraform-plugin-framework/types"
"github.com/mongodb/terraform-provider-mongodbatlas/internal/config"
)

var _ datasource.DataSource = &streamConnectionDS{}
var _ datasource.DataSourceWithConfigure = &streamConnectionDS{}

func DataSource() datasource.DataSource {
return &streamConnectionDS{
DSCommon: config.DSCommon{
DataSourceName: streamConnectionName,
},
}
}

type streamConnectionDS struct {
config.DSCommon
}

func (d *streamConnectionDS) Schema(ctx context.Context, req datasource.SchemaRequest, resp *datasource.SchemaResponse) {
resp.Schema = schema.Schema{
Attributes: DSAttributes(true),
}
}

// DSAttributes returns the attribute definitions for a single stream connection.
// `withArguments` marks certain attributes as required (for singular data source) or as computed (for plural data source)
func DSAttributes(withArguments bool) map[string]schema.Attribute {
return map[string]schema.Attribute{
"id": schema.StringAttribute{
Computed: true,
},
"project_id": schema.StringAttribute{
Required: withArguments,
Computed: !withArguments,
},
"instance_name": schema.StringAttribute{
Required: withArguments,
Computed: !withArguments,
},
"connection_name": schema.StringAttribute{
Required: withArguments,
Computed: !withArguments,
},
"type": schema.StringAttribute{
Computed: true,
},

// cluster type specific
"cluster_name": schema.StringAttribute{
AgustinBettati marked this conversation as resolved.
Show resolved Hide resolved
Computed: true,
},

// kafka type specific
"authentication": schema.SingleNestedAttribute{
Computed: true,
Attributes: map[string]schema.Attribute{
"mechanism": schema.StringAttribute{
Computed: true,
},
"password": schema.StringAttribute{
Computed: true,
Sensitive: true,
},
"username": schema.StringAttribute{
Computed: true,
},
},
},
"bootstrap_servers": schema.StringAttribute{
Computed: true,
},
"config": schema.MapAttribute{
ElementType: types.StringType,
Computed: true,
},
"security": schema.SingleNestedAttribute{
Computed: true,
Attributes: map[string]schema.Attribute{
"broker_public_certificate": schema.StringAttribute{
Computed: true,
},
"protocol": schema.StringAttribute{
Computed: true,
},
},
},
}
}

func (d *streamConnectionDS) Read(ctx context.Context, req datasource.ReadRequest, resp *datasource.ReadResponse) {
var streamConnectionConfig TFStreamConnectionModel
resp.Diagnostics.Append(req.Config.Get(ctx, &streamConnectionConfig)...)
if resp.Diagnostics.HasError() {
return
}

connV2 := d.Client.AtlasV2
projectID := streamConnectionConfig.ProjectID.ValueString()
instanceName := streamConnectionConfig.InstanceName.ValueString()
connectionName := streamConnectionConfig.ConnectionName.ValueString()
apiResp, _, err := connV2.StreamsApi.GetStreamConnection(ctx, projectID, instanceName, connectionName).Execute()
if err != nil {
resp.Diagnostics.AddError("error fetching resource", err.Error())
return
}

newStreamConnectionModel, diags := NewTFStreamConnection(ctx, projectID, instanceName, nil, apiResp)
if diags.HasError() {
resp.Diagnostics.Append(diags...)
return
}
resp.Diagnostics.Append(resp.State.Set(ctx, newStreamConnectionModel)...)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package streamconnection_test

import (
"fmt"
"os"
"testing"

"github.com/hashicorp/terraform-plugin-testing/helper/acctest"
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
"github.com/mongodb/terraform-provider-mongodbatlas/internal/testutil/acc"
)

func TestAccStreamDSStreamConnection_kafkaPlaintext(t *testing.T) {
var (
orgID = os.Getenv("MONGODB_ATLAS_ORG_ID")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] not need to address it here. I am wondering if we should define MONGODB_ATLAS_ORG_ID and other env vars in a common file and then import them. In this way, if we change the env var we just need to update it in one place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could remove some boilerplate for sure, on the other hand it does help to see want env variables are being used for each test so dont have a strong opinion

projectName = acctest.RandomWithPrefix("test-acc-stream")
instanceName = acctest.RandomWithPrefix("test-acc-instance")
dataSourceName = "data.mongodbatlas_stream_connection.test"
)
resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acc.PreCheckBetaFlag(t); acc.PreCheckBasic(t) },
ProtoV6ProviderFactories: acc.TestAccProviderV6Factories,
CheckDestroy: CheckDestroyStreamConnection,
Steps: []resource.TestStep{
{
Config: streamConnectionDataSourceConfig(kafkaStreamConnectionConfig(orgID, projectName, instanceName, "user", "rawpassword", "localhost:9092,localhost:9092", "earliest", false)),
Check: kafkaStreamConnectionAttributeChecks(dataSourceName, orgID, projectName, instanceName, "user", "rawpassword", "localhost:9092,localhost:9092", "earliest", false, false),
},
},
})
}

func TestAccStreamDSStreamConnection_kafkaSSL(t *testing.T) {
var (
orgID = os.Getenv("MONGODB_ATLAS_ORG_ID")
projectName = acctest.RandomWithPrefix("test-acc-stream")
instanceName = acctest.RandomWithPrefix("test-acc-instance")
dataSourceName = "data.mongodbatlas_stream_connection.test"
)
resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acc.PreCheckBetaFlag(t); acc.PreCheckBasic(t) },
ProtoV6ProviderFactories: acc.TestAccProviderV6Factories,
CheckDestroy: CheckDestroyStreamConnection,
Steps: []resource.TestStep{
{
Config: streamConnectionDataSourceConfig(kafkaStreamConnectionConfig(orgID, projectName, instanceName, "user", "rawpassword", "localhost:9092", "earliest", true)),
Check: kafkaStreamConnectionAttributeChecks(dataSourceName, orgID, projectName, instanceName, "user", "rawpassword", "localhost:9092", "earliest", true, false),
},
},
})
}

func TestAccStreamDSStreamConnection_cluster(t *testing.T) {
var (
orgID = os.Getenv("MONGODB_ATLAS_ORG_ID")
clusterInfo = acc.GetClusterInfo(orgID)
instanceName = acctest.RandomWithPrefix("test-acc-name")
dataSourceName = "data.mongodbatlas_stream_connection.test"
)
resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acc.PreCheckBetaFlag(t); acc.PreCheckBasic(t) },
ProtoV6ProviderFactories: acc.TestAccProviderV6Factories,
CheckDestroy: CheckDestroyStreamConnection,
Steps: []resource.TestStep{
{
Config: streamConnectionDataSourceConfig(clusterStreamConnectionConfig(clusterInfo.ProjectIDStr, instanceName, clusterInfo.ClusterNameStr, clusterInfo.ClusterTerraformStr)),
Check: clusterStreamConnectionAttributeChecks(dataSourceName, clusterInfo.ClusterName),
},
},
})
}

func streamConnectionDataSourceConfig(streamConnectionConfig string) string {
return fmt.Sprintf(`
%s

data "mongodbatlas_stream_connection" "test" {
project_id = mongodbatlas_stream_connection.test.project_id
instance_name = mongodbatlas_stream_connection.test.instance_name
connection_name = mongodbatlas_stream_connection.test.connection_name
}
`, streamConnectionConfig)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package streamconnection

import (
"context"
"fmt"

"github.com/hashicorp/terraform-plugin-framework/datasource"
"github.com/hashicorp/terraform-plugin-framework/datasource/schema"
"github.com/hashicorp/terraform-plugin-framework/types"
"github.com/mongodb/terraform-provider-mongodbatlas/internal/common/conversion"
"github.com/mongodb/terraform-provider-mongodbatlas/internal/common/dsschema"
"github.com/mongodb/terraform-provider-mongodbatlas/internal/config"
"go.mongodb.org/atlas-sdk/v20231115002/admin"
)

var _ datasource.DataSource = &streamConnectionsDS{}
var _ datasource.DataSourceWithConfigure = &streamConnectionsDS{}

func PluralDataSource() datasource.DataSource {
return &streamConnectionsDS{
DSCommon: config.DSCommon{
DataSourceName: fmt.Sprintf("%ss", streamConnectionName),
},
}
}

type streamConnectionsDS struct {
config.DSCommon
}

type TFStreamConnectionsDSModel struct {
ID types.String `tfsdk:"id"`
ProjectID types.String `tfsdk:"project_id"`
InstanceName types.String `tfsdk:"instance_name"`
Results []TFStreamConnectionModel `tfsdk:"results"`
PageNum types.Int64 `tfsdk:"page_num"`
ItemsPerPage types.Int64 `tfsdk:"items_per_page"`
TotalCount types.Int64 `tfsdk:"total_count"`
}

func (d *streamConnectionsDS) Schema(ctx context.Context, req datasource.SchemaRequest, resp *datasource.SchemaResponse) {
resp.Schema = dsschema.PaginatedDSSchema(
map[string]schema.Attribute{
"project_id": schema.StringAttribute{
Required: true,
},
"instance_name": schema.StringAttribute{
Required: true,
},
},
DSAttributes(false))
}

func (d *streamConnectionsDS) Read(ctx context.Context, req datasource.ReadRequest, resp *datasource.ReadResponse) {
var streamConnectionsConfig TFStreamConnectionsDSModel
resp.Diagnostics.Append(req.Config.Get(ctx, &streamConnectionsConfig)...)
if resp.Diagnostics.HasError() {
return
}

connV2 := d.Client.AtlasV2
projectID := streamConnectionsConfig.ProjectID.ValueString()
instanceName := streamConnectionsConfig.InstanceName.ValueString()
itemsPerPage := streamConnectionsConfig.ItemsPerPage.ValueInt64Pointer()
pageNum := streamConnectionsConfig.PageNum.ValueInt64Pointer()

apiResp, _, err := connV2.StreamsApi.ListStreamConnectionsWithParams(ctx, &admin.ListStreamConnectionsApiParams{
GroupId: projectID,
TenantName: instanceName,
ItemsPerPage: conversion.Int64PtrToIntPtr(itemsPerPage),
PageNum: conversion.Int64PtrToIntPtr(pageNum),
}).Execute()

if err != nil {
resp.Diagnostics.AddError("error fetching results", err.Error())
return
}

newStreamConnectionsModel, diags := NewTFStreamConnections(ctx, &streamConnectionsConfig, apiResp)
if diags.HasError() {
resp.Diagnostics.Append(diags...)
return
}
resp.Diagnostics.Append(resp.State.Set(ctx, newStreamConnectionsModel)...)
}
Loading