Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DIS] new resource/opentelekomcloud_dis_checkpoint_v2 #2211

Merged
merged 3 commits into from Jul 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/resources/dis_app_v2.md
Expand Up @@ -13,15 +13,15 @@ Manages a DIS Apps in the OpenTelekomCloud DIS Service.

```hcl
resource "opentelekomcloud_dis_app_v2" "app_1" {
app_name = "app_name"
name = "app_name"
}
```

## Argument Reference

The following arguments are supported:

* `app_name` - (Required) Name of the consumer application to be created
* `name` - (Required) Name of the consumer application to be created
The application name contains 1 to 200 characters. Only letters, digits, hyphens (-), and underscores (_) are allowed.

## Attributes Reference
Expand All @@ -30,7 +30,7 @@ Above argument parameter can be exported as attribute parameters.

* `created` - Time when the app is created. The value is a timestamp.

* `app_id` - Unique identifier of the app.
* `id` - Unique identifier of the app.

* `commit_checkpoint_stream_names` - List of associated streams.

Expand Down
73 changes: 73 additions & 0 deletions docs/resources/dis_checkpoint_v2.md
@@ -0,0 +1,73 @@
---
subcategory: "Data Ingestion Service (DIS)"
---

Up-to-date reference of API arguments for DIS checkpoint you can get at
`https://docs.otc.t-systems.com/data-ingestion-service/api-ref/api_description/checkpoint_management/index.html`.

# opentelekomcloud_dis_checkpoint_v2

Manages a DIS Checkpoints in the OpenTelekomCloud DIS Service.

## Example Usage

```hcl
resource "opentelekomcloud_dis_stream_v2" "stream_1" {
name = "my_stream"
partition_count = 3
stream_type = "COMMON"
retention_period = 24
auto_scale_min_partition_count = 1
auto_scale_max_partition_count = 4
compression_format = "zip"

data_type = "BLOB"

tags = {
foo = "bar"
}
}

resource "opentelekomcloud_dis_app_v2" "app_1" {
name = "my_app"
}

resource "opentelekomcloud_dis_checkpoint_v2" "checkpoint_1" {
app_name = opentelekomcloud_dis_app_v2.app_1.name
stream_name = opentelekomcloud_dis_stream_v2.stream_1.name
partition_id = "0"
sequence_number = "0"
metadata = "my_first_checkpoint"
}
```

## Argument Reference

The following arguments are supported:

* `app_name` - (Required) Name of the consumer application to be created
The application name contains 1 to 200 characters. Only letters, digits, hyphens (-), and underscores (_) are allowed.

* `checkpoint_type` - (Required) Type of the checkpoint. `LAST_READ`: Only sequence numbers are recorded in databases.
Default value: `LAST_READ`

* `stream_name` - (Required) Name of the stream. The stream name can contain 1 to 64 characters,
including letters, digits, underscores (_), and hyphens (-).

* `partition_id` - (Required) Partition ID of the stream The value can be in either of the following formats:
* `shardId-0000000000`
* `0`

* `sequence_number` - (Required) Sequence number to be submitted, which is used to record the consumption
checkpoint of the stream. Ensure that the sequence number is within the valid range.

* `metadata` - (Optional) Metadata information of the consumer application.
Maximum length: 1000

## Attributes Reference

The following attributes are exported:

* `sequence_number` - Sequence number used to record the consumption checkpoint of the stream.

* `metadata` - Metadata information of the consumer application.
4 changes: 2 additions & 2 deletions docs/resources/dis_stream_v2.md
Expand Up @@ -13,7 +13,7 @@ Manages a DIS Stream in the OpenTelekomCloud DIS Service.

```hcl
resource "opentelekomcloud_dis_stream_v2" "stream_1" {
stream_name = "MyStream"
name = "MyStream"
partition_count = 3
stream_type = "COMMON"
retention_period = 24
Expand All @@ -33,7 +33,7 @@ resource "opentelekomcloud_dis_stream_v2" "stream_1" {

The following arguments are supported:

* `stream_name` - (Required) Name of the stream. The stream name can contain 1 to 64 characters,
* `name` - (Required) Name of the stream. The stream name can contain 1 to 64 characters,
including letters, digits, underscores (_), and hyphens (-).

* `partition_count` - (Required) Number of partitions. Partitions are the base throughput unit of a DIS stream.
Expand Down
Expand Up @@ -28,7 +28,7 @@ func TestAccDisAppV2_basic(t *testing.T) {
Config: testAccDisV2AppBasic(appName),
Check: resource.ComposeTestCheckFunc(
testAccCheckDisV2AppExists(resourceAppName, &cls),
resource.TestCheckResourceAttr(resourceAppName, "app_name", appName),
resource.TestCheckResourceAttr(resourceAppName, "name", appName),
),
},
{
Expand Down Expand Up @@ -93,7 +93,7 @@ func testAccCheckDisV2AppExists(n string, cls *apps.GetAppResponse) resource.Tes
func testAccDisV2AppBasic(appName string) string {
return fmt.Sprintf(`
resource "opentelekomcloud_dis_app_v2" "app_1" {
app_name = "%s"
name = "%s"
}
`, appName)
}
@@ -0,0 +1,128 @@
package acceptance

import (
"fmt"
"testing"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/v2/terraform"
"github.com/opentelekomcloud/gophertelekomcloud/openstack/dis/v2/checkpoints"
"github.com/opentelekomcloud/terraform-provider-opentelekomcloud/opentelekomcloud/acceptance/common"
"github.com/opentelekomcloud/terraform-provider-opentelekomcloud/opentelekomcloud/acceptance/env"
"github.com/opentelekomcloud/terraform-provider-opentelekomcloud/opentelekomcloud/common/cfg"
)

const resourceCheckpointName = "opentelekomcloud_dis_checkpoint_v2.checkpoint_1"

func TestAccDisCheckpointV2_basic(t *testing.T) {
var streamName = fmt.Sprintf("dis_stream_%s", acctest.RandString(5))
var appName = fmt.Sprintf("dis_app_%s", acctest.RandString(5))

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { common.TestAccPreCheck(t) },
ProviderFactories: common.TestAccProviderFactories,
CheckDestroy: testAccCheckDisV2CheckpointDestroy,
Steps: []resource.TestStep{
{
Config: testAccDisV2CheckpointBasic(streamName, appName),
Check: resource.ComposeTestCheckFunc(
testAccCheckDisV2CheckpointExists(resourceCheckpointName),
resource.TestCheckResourceAttr(resourceCheckpointName, "metadata", "my_first_checkpoint"),
),
},
},
})
}

func testAccCheckDisV2CheckpointDestroy(s *terraform.State) error {
config := common.TestAccProvider.Meta().(*cfg.Config)
client, err := config.DisV2Client(env.OS_REGION_NAME)
if err != nil {
return fmt.Errorf("error creating DISv2 client: %w", err)
}

for _, rs := range s.RootModule().Resources {
if rs.Type != "opentelekomcloud_dis_checkpoint_v2" {
continue
}

_, err := checkpoints.GetCheckpoint(client, checkpoints.GetCheckpointOpts{
StreamName: rs.Primary.ID,
AppName: rs.Primary.Attributes["app_name"],
PartitionId: rs.Primary.Attributes["partition_id"],
CheckpointType: rs.Primary.Attributes["checkpoint_type"],
})
if err == nil {
return fmt.Errorf("DIS checkpoint still exists")
}
}
return nil
}

func testAccCheckDisV2CheckpointExists(n string) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
if !ok {
return fmt.Errorf("not found: %s", n)
}

if rs.Primary.ID == "" {
return fmt.Errorf("no ID is set")
}

config := common.TestAccProvider.Meta().(*cfg.Config)
client, err := config.DisV2Client(env.OS_REGION_NAME)
if err != nil {
return fmt.Errorf("error creating DISv2 client: %w", err)
}

v, err := checkpoints.GetCheckpoint(client, checkpoints.GetCheckpointOpts{
StreamName: rs.Primary.ID,
AppName: rs.Primary.Attributes["app_name"],
PartitionId: rs.Primary.Attributes["partition_id"],
CheckpointType: rs.Primary.Attributes["checkpoint_type"],
})
if err != nil {
return fmt.Errorf("error getting checkpoint (%s): %w", rs.Primary.ID, err)
}

if v.SequenceNumber == "" {
return fmt.Errorf("DIS checkpoint not found")
}
return nil
}
}

func testAccDisV2CheckpointBasic(streamName string, appName string) string {
return fmt.Sprintf(`

resource "opentelekomcloud_dis_stream_v2" "stream_1" {
name = "%s"
partition_count = 3
stream_type = "COMMON"
retention_period = 24
auto_scale_min_partition_count = 1
auto_scale_max_partition_count = 4
compression_format = "zip"

data_type = "BLOB"

tags = {
foo = "bar"
}
}

resource "opentelekomcloud_dis_app_v2" "app_1" {
name = "%s"
}

resource "opentelekomcloud_dis_checkpoint_v2" "checkpoint_1" {
app_name = opentelekomcloud_dis_app_v2.app_1.name
stream_name = opentelekomcloud_dis_stream_v2.stream_1.name
partition_id = "0"
sequence_number = "0"
metadata = "my_first_checkpoint"
}
`, streamName, appName)
}
Expand Up @@ -28,15 +28,15 @@ func TestAccDisStreamV2_basic(t *testing.T) {
Config: testAccDisV2StreamBasic(streamName),
Check: resource.ComposeTestCheckFunc(
testAccCheckDisV2StreamExists(resourceName, &cls),
resource.TestCheckResourceAttr(resourceName, "stream_name", streamName),
resource.TestCheckResourceAttr(resourceName, "name", streamName),
resource.TestCheckResourceAttr(resourceName, "partitions.#", "3"),
),
},
{
Config: testAccDisV2StreamBasicUpdated(streamName),
Check: resource.ComposeTestCheckFunc(
testAccCheckDisV2StreamExists(resourceName, &cls),
resource.TestCheckResourceAttr(resourceName, "stream_name", streamName),
resource.TestCheckResourceAttr(resourceName, "name", streamName),
resource.TestCheckResourceAttr(resourceName, "partitions.#", "4"),
),
},
Expand Down Expand Up @@ -102,7 +102,7 @@ func testAccCheckDisV2StreamExists(n string, cls *streams.DescribeStreamResponse
func testAccDisV2StreamBasic(streamName string) string {
return fmt.Sprintf(`
resource "opentelekomcloud_dis_stream_v2" "stream_1" {
stream_name = "%s"
name = "%s"
partition_count = 3
stream_type = "COMMON"
retention_period = 24
Expand All @@ -122,7 +122,7 @@ resource "opentelekomcloud_dis_stream_v2" "stream_1" {
func testAccDisV2StreamBasicUpdated(streamName string) string {
return fmt.Sprintf(`
resource "opentelekomcloud_dis_stream_v2" "stream_1" {
stream_name = "%s"
name = "%s"
partition_count = 4
stream_type = "COMMON"
retention_period = 24
Expand Down
1 change: 1 addition & 0 deletions opentelekomcloud/provider.go
Expand Up @@ -354,6 +354,7 @@ func Provider() *schema.Provider {
"opentelekomcloud_deh_host_v1": deh.ResourceDeHHostV1(),
"opentelekomcloud_dis_stream_v2": dis.ResourceDisStreamV2(),
"opentelekomcloud_dis_app_v2": dis.ResourceDisAppV2(),
"opentelekomcloud_dis_checkpoint_v2": dis.ResourceDisCheckpointV2(),
"opentelekomcloud_dns_ptrrecord_v2": dns.ResourceDNSPtrRecordV2(),
"opentelekomcloud_dns_recordset_v2": dns.ResourceDNSRecordSetV2(),
"opentelekomcloud_dns_zone_v2": dns.ResourceDNSZoneV2(),
Expand Down
Expand Up @@ -30,7 +30,7 @@ func ResourceDisAppV2() *schema.Resource {
Update: schema.DefaultTimeout(2 * time.Minute),
},
Schema: map[string]*schema.Schema{
"app_name": {
"name": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
Expand All @@ -46,7 +46,7 @@ func ResourceDisAppV2() *schema.Resource {
Type: schema.TypeInt,
Computed: true,
},
"app_id": {
"id": {
Type: schema.TypeString,
Computed: true,
},
Expand Down Expand Up @@ -100,13 +100,13 @@ func resourceDisAppV2Create(ctx context.Context, d *schema.ResourceData, meta in
return fmterr.Errorf(errCreationV2Client, err)
}
opts := apps.CreateAppOpts{
AppName: d.Get("app_name").(string),
AppName: d.Get("name").(string),
}

log.Printf("[DEBUG] Creating new App: %s", opts.AppName)
err = apps.CreateApp(client, opts)
if err != nil {
return fmterr.Errorf("error creating DIS streams: %s", err)
return fmterr.Errorf("error creating DIS app: %s", err)
}

d.SetId(opts.AppName)
Expand All @@ -130,8 +130,8 @@ func resourceDisAppV2Read(ctx context.Context, d *schema.ResourceData, meta inte
}

mErr := multierror.Append(
d.Set("app_name", app.AppName),
d.Set("app_id", app.AppId),
d.Set("name", app.AppName),
d.Set("id", app.AppId),
d.Set("created", app.CreateTime),
d.Set("commit_checkpoint_stream_names", app.CommitCheckPointStreamNames),
)
Expand Down