From a9dca4c205900e87d9077e539c92f0f190316d99 Mon Sep 17 00:00:00 2001 From: Kuntal Basu Date: Wed, 14 Aug 2024 17:08:55 -0400 Subject: [PATCH 1/8] enabling schema evolution for oxbow controlled tables --- main.tf | 4 +++- variables.tf | 6 ++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/main.tf b/main.tf index 6b445b3..30a7193 100644 --- a/main.tf +++ b/main.tf @@ -82,6 +82,8 @@ resource "aws_kinesis_firehose_delivery_stream" "this_kinesis" { locals { oxbow_lambda_unwrap_sns_event = var.enable_group_events == true ? {} : var.sns_topic_arn == "" ? {} : { UNWRAP_SNS_ENVELOPE = true } group_eventlambda_unwrap_sns_event = var.sns_topic_arn == "" ? {} : { UNWRAP_SNS_ENVELOPE = true } + oxbow_lambda_schema_evolution = var.enable_schema_evolution == false ? {} : { SCHEMA_EVOLUTION = true } + } resource "aws_lambda_function" "this_lambda" { @@ -101,7 +103,7 @@ resource "aws_lambda_function" "this_lambda" { variables = merge({ AWS_S3_LOCKING_PROVIDER = var.aws_s3_locking_provider RUST_LOG = "deltalake=${var.rust_log_deltalake_debug_level},oxbow=${var.rust_log_oxbow_debug_level}" - DYNAMO_LOCK_TABLE_NAME = var.dynamodb_table_name }, local.oxbow_lambda_unwrap_sns_event) + DYNAMO_LOCK_TABLE_NAME = var.dynamodb_table_name }, local.oxbow_lambda_unwrap_sns_event, local.oxbow_lambda_schema_evolution) } tags = var.tags } diff --git a/variables.tf b/variables.tf index ab9c711..4ede998 100644 --- a/variables.tf +++ b/variables.tf @@ -304,6 +304,12 @@ variable "enable_glue_create" { default = false } +variable "enable_schema_evolution" { + type = bool + description = "Whether to turn on schema evolution" + default = true +} + variable "glue_create_config" { type = object({ athena_workgroup_name = string // Name of AWS Athena workgroup From b5f42e7455b1daab1abfa2fd94decfa68870bd6a Mon Sep 17 00:00:00 2001 From: Kuntal Basu Date: Tue, 10 Sep 2024 13:14:59 -0400 Subject: [PATCH 2/8] Enabling Glue sync --- glue_sync.tf | 199 +++++++++++++++++++++++++++++++++++++++++++++++++++ variables.tf | 6 ++ 2 files changed, 205 insertions(+) create mode 100644 glue_sync.tf diff --git a/glue_sync.tf b/glue_sync.tf new file mode 100644 index 0000000..ab38813 --- /dev/null +++ b/glue_sync.tf @@ -0,0 +1,199 @@ +data "aws_iam_policy_document" "glue_sync_sqs" { + count = local.enable_glue_sync ? 1 : 0 + + statement { + effect = "Allow" + principals { + type = "*" + identifiers = ["*"] + } + actions = ["sqs:SendMessage", "sqs:ReceiveMessage"] + resources = ["arn:aws:sqs:*:*:${var.glue_sync_config.sqs_queue_name}"] + condition { + test = "ArnEquals" + variable = "aws:SourceArn" + values = [var.glue_sync_config.sns_topic_arn] + } + } +} + +data "aws_iam_policy_document" "glue_sync_sqs_dl" { + count = local.enable_glue_sync ? 1 : 0 + + statement { + effect = "Allow" + principals { + type = "AWS" + identifiers = ["*"] + } + actions = ["sqs:SendMessage", "sqs:ReceiveMessage"] + resources = ["arn:aws:sqs:*:*:${var.glue_sync_config.sqs_queue_name_dl}"] + condition { + test = "ForAllValues:StringEquals" + variable = "aws:SourceArn" + values = ["arn:aws:sqs:*:*:${var.glue_sync_config.sqs_queue_name}"] + } + } +} + +resource "aws_sqs_queue" "glue_sync" { + count = local.enable_glue_sync ? 1 : 0 + + name = var.glue_sync_config.sqs_queue_name + policy = data.aws_iam_policy_document.glue_sync_sqs[0].json + visibility_timeout_seconds = var.sqs_visibility_timeout_seconds + delay_seconds = var.sqs_delay_seconds + redrive_policy = jsonencode({ + deadLetterTargetArn = aws_sqs_queue.glue_sync_dl[0].arn + maxReceiveCount = var.sqs_redrive_policy_maxReceiveCount + }) + tags = var.tags +} + +resource "aws_sqs_queue" "glue_sync_dl" { + count = local.enable_glue_sync ? 1 : 0 + + name = var.glue_sync_config.sqs_queue_name_dl + policy = data.aws_iam_policy_document.glue_sync_sqs_dl[0].json + tags = var.tags +} + +resource "aws_sqs_queue_redrive_allow_policy" "terraform_queue_redrive_allow_policy" { + count = local.enable_glue_sync ? 1 : 0 + + queue_url = aws_sqs_queue.glue_sync_dl[0].id + redrive_allow_policy = jsonencode({ + redrivePermission = "byQueue", + sourceQueueArns = [aws_sqs_queue.glue_sync[0].arn] + }) +} + +resource "aws_sns_topic_subscription" "glue_sync_sns_sub" { + count = local.enable_glue_sync ? 1 : 0 + + topic_arn = var.glue_sync_config.sns_topic_arn + protocol = "sqs" + endpoint = aws_sqs_queue.glue_sync[0].arn +} + +data "aws_iam_policy_document" "glue_sync_assume" { + count = local.enable_glue_sync ? 1 : 0 + + statement { + effect = "Allow" + principals { + type = "Service" + identifiers = ["lambda.amazonaws.com"] + } + actions = [ + "sts:AssumeRole", + ] + } +} + +data "aws_iam_policy_document" "glue_sync" { + count = local.enable_glue_sync ? 1 : 0 + statement { + sid = "GlueAllowTables" + effect = "Allow" + actions = [ + "glue:GetTable", + "glue:GetTables", + "glue:GetPartitions", + "glue:CreateTable", + "glue:UpdateTable" + ] + resources = [ + "arn:aws:glue:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:catalog", + "arn:aws:glue:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:database/*", + "arn:aws:glue:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:table/*" + ] + } + statement { + sid = "GlueCatalogAllowDatabases" + effect = "Allow" + actions = [ + "glue:GetDatabase", + "glue:GetDatabases", + "glue:CreateDatabase" + ] + resources = [ + "*" + ] + } + statement { + sid = "TableExtLocS3RO" + effect = "Allow" + actions = [ + "s3:GetObject", + "s3:GetObjectTagging", + "s3:GetObjectVersion", + "s3:GetBucketLocation", + "s3:ListBucket", + "s3:ListBucketVersions" + ] + resources = [ + var.warehouse_bucket_arn, + "${var.warehouse_bucket_arn}/${var.s3_path}/*" + ] + } + statement { + effect = "Allow" + actions = ["sqs:*"] + resources = [aws_sqs_queue.glue_sync[0].arn] + } + statement { + effect = "Allow" + actions = [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents" + ] + resources = ["*"] + } +} + +resource "aws_iam_policy" "glue_sync_managed" { + count = local.enable_glue_sync ? 1 : 0 + + name = var.glue_sync_config.iam_policy_name + description = "Glue create policy allows access to Athena and S3" + policy = data.aws_iam_policy_document.glue_sync[0].json + tags = var.tags +} + +resource "aws_iam_role" "glue_sync" { + count = local.enable_glue_sync ? 1 : 0 + + name = var.glue_sync_config.iam_role_name + assume_role_policy = data.aws_iam_policy_document.glue_sync_assume[0].json + managed_policy_arns = [aws_iam_policy.glue_sync_managed[0].arn] + tags = var.tags +} + +resource "aws_lambda_function" "glue_sync_lambda" { + count = local.enable_glue_sync ? 1 : 0 + + description = "Greate tables in AWS Glue catalog based on the table prefix" + s3_key = var.glue_sync_config.lambda_s3_key + s3_bucket = var.glue_sync_config.lambda_s3_bucket + function_name = var.glue_sync_config.lambda_function_name + role = aws_iam_role.glue_sync[0].arn + handler = "provided" + runtime = "provided.al2" + + environment { + variables = { + RUST_LOG = var.rust_log_oxbow_debug_level + GLUE_PATH_REGEX = var.glue_sync_config.path_regex + UNWRAP_SNS_ENVELOPE = true + } + } +} + +resource "aws_lambda_event_source_mapping" "glue_sync" { + count = local.enable_glue_sync ? 1 : 0 + + event_source_arn = aws_sqs_queue.glue_sync[0].arn + function_name = aws_lambda_function.glue_sync_lambda[0].arn +} diff --git a/variables.tf b/variables.tf index 4ede998..956cdc2 100644 --- a/variables.tf +++ b/variables.tf @@ -327,3 +327,9 @@ variable "glue_create_config" { }) description = "Configuration of glue-create lambda" } + +variable "enable_glue_sync" { + type = bool + description = "Whether to turn on Glue create Lambda" + default = false +} From f15f5d0381b01873f67512bcd4536eae17d414aa Mon Sep 17 00:00:00 2001 From: Kuntal Basu Date: Tue, 10 Sep 2024 13:15:22 -0400 Subject: [PATCH 3/8] simple refactpr to take the glue create lambda its own function --- glue_create.tf | 272 +++++++++++++++++++++++++++++++++++++++++++++++++ main.tf | 272 ------------------------------------------------- 2 files changed, 272 insertions(+), 272 deletions(-) create mode 100644 glue_create.tf diff --git a/glue_create.tf b/glue_create.tf new file mode 100644 index 0000000..1f2c96b --- /dev/null +++ b/glue_create.tf @@ -0,0 +1,272 @@ + +# glue-create lambda resource +module "glue_create_athena_workgroup_bucket" { + count = local.enable_glue_create ? 1 : 0 + + source = "terraform-aws-modules/s3-bucket/aws" + version = "4.1.2" + bucket = var.glue_create_config.athena_bucket_name + block_public_acls = true + block_public_policy = true + ignore_public_acls = true + restrict_public_buckets = true + control_object_ownership = true + object_ownership = "BucketOwnerEnforced" + tags = var.tags + versioning = { + enabled = false + } +} + +resource "aws_athena_workgroup" "glue_create" { + count = local.enable_glue_create ? 1 : 0 + + name = var.glue_create_config.athena_workgroup_name + tags = var.tags + configuration { + enforce_workgroup_configuration = true + publish_cloudwatch_metrics_enabled = false + + result_configuration { + output_location = "s3://${module.glue_create_athena_workgroup_bucket[0].s3_bucket_id}/" + } + } + depends_on = [module.glue_create_athena_workgroup_bucket] +} + +data "aws_iam_policy_document" "glue_create_sqs" { + count = local.enable_glue_create ? 1 : 0 + + statement { + effect = "Allow" + principals { + type = "*" + identifiers = ["*"] + } + actions = ["sqs:SendMessage", "sqs:ReceiveMessage"] + resources = ["arn:aws:sqs:*:*:${var.glue_create_config.sqs_queue_name}"] + condition { + test = "ArnEquals" + variable = "aws:SourceArn" + values = [var.glue_create_config.sns_topic_arn] + } + } +} + +data "aws_iam_policy_document" "glue_create_sqs_dl" { + count = local.enable_glue_create ? 1 : 0 + + statement { + effect = "Allow" + principals { + type = "AWS" + identifiers = ["*"] + } + actions = ["sqs:SendMessage", "sqs:ReceiveMessage"] + resources = ["arn:aws:sqs:*:*:${var.glue_create_config.sqs_queue_name_dl}"] + condition { + test = "ForAllValues:StringEquals" + variable = "aws:SourceArn" + values = ["arn:aws:sqs:*:*:${var.glue_create_config.sqs_queue_name}"] + } + } +} + +resource "aws_sqs_queue" "glue_create" { + count = local.enable_glue_create ? 1 : 0 + + name = var.glue_create_config.sqs_queue_name + policy = data.aws_iam_policy_document.glue_create_sqs[0].json + visibility_timeout_seconds = var.sqs_visibility_timeout_seconds + delay_seconds = var.sqs_delay_seconds + redrive_policy = jsonencode({ + deadLetterTargetArn = aws_sqs_queue.glue_create_dl[0].arn + maxReceiveCount = var.sqs_redrive_policy_maxReceiveCount + }) + tags = var.tags +} + +resource "aws_sqs_queue" "glue_create_dl" { + count = local.enable_glue_create ? 1 : 0 + + name = var.glue_create_config.sqs_queue_name_dl + policy = data.aws_iam_policy_document.glue_create_sqs_dl[0].json + tags = var.tags +} + +resource "aws_sqs_queue_redrive_allow_policy" "terraform_queue_redrive_allow_policy" { + count = local.enable_glue_create ? 1 : 0 + + queue_url = aws_sqs_queue.glue_create_dl[0].id + redrive_allow_policy = jsonencode({ + redrivePermission = "byQueue", + sourceQueueArns = [aws_sqs_queue.glue_create[0].arn] + }) +} + +resource "aws_sns_topic_subscription" "glue_create_sns_sub" { + count = local.enable_glue_create ? 1 : 0 + + topic_arn = var.glue_create_config.sns_topic_arn + protocol = "sqs" + endpoint = aws_sqs_queue.glue_create[0].arn +} + +data "aws_iam_policy_document" "glue_create_assume" { + count = local.enable_glue_create ? 1 : 0 + + statement { + effect = "Allow" + principals { + type = "Service" + identifiers = ["lambda.amazonaws.com"] + } + actions = [ + "sts:AssumeRole", + ] + } +} + +data "aws_iam_policy_document" "glue_create" { + count = local.enable_glue_create ? 1 : 0 + + statement { + sid = "AthenaWorkgroupAthenaRW" + actions = [ + "athena:StartQueryExecution", + "athena:GetQueryResults", + "athena:GetWorkGroup", + "athena:StopQueryExecution", + "athena:GetQueryExecution", + ] + resources = [ + aws_athena_workgroup.glue_create[0].arn + ] + effect = "Allow" + } + statement { + sid = "AthenaWorkgroupS3RW" + effect = "Allow" + actions = [ + "s3:PutObject", + "s3:GetObject", + "s3:AbortMultipartUpload", + "s3:GetBucketLocation" + ] + resources = [ + "${module.glue_create_athena_workgroup_bucket[0].s3_bucket_arn}/*", + module.glue_create_athena_workgroup_bucket[0].s3_bucket_arn + ] + } + statement { + sid = "AthenaWorkgroupList1" + effect = "Allow" + actions = ["athena:ListWorkGroups"] + resources = ["*"] + } + statement { + sid = "GlueAllowTables" + effect = "Allow" + actions = [ + "glue:GetTable", + "glue:GetTables", + "glue:GetPartitions", + "glue:CreateTable", + "glue:UpdateTable" + ] + resources = [ + "arn:aws:glue:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:catalog", + "arn:aws:glue:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:database/*", + "arn:aws:glue:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:table/*" + ] + } + statement { + sid = "GlueCatalogAllowDatabases" + effect = "Allow" + actions = [ + "glue:GetDatabase", + "glue:GetDatabases", + "glue:CreateDatabase" + ] + resources = [ + "*" + ] + } + statement { + sid = "TableExtLocS3RO" + effect = "Allow" + actions = [ + "s3:GetObject", + "s3:GetObjectTagging", + "s3:GetObjectVersion", + "s3:GetBucketLocation", + "s3:ListBucket", + "s3:ListBucketVersions" + ] + resources = [ + var.warehouse_bucket_arn, + "${var.warehouse_bucket_arn}/${var.s3_path}/*" + ] + } + statement { + effect = "Allow" + actions = ["sqs:*"] + resources = [aws_sqs_queue.glue_create[0].arn] + } + statement { + effect = "Allow" + actions = [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents" + ] + resources = ["*"] + } +} + +resource "aws_iam_policy" "glue_create_managed" { + count = local.enable_glue_create ? 1 : 0 + + name = var.glue_create_config.iam_policy_name + description = "Glue create policy allows access to Athena and S3" + policy = data.aws_iam_policy_document.glue_create[0].json + tags = var.tags +} + +resource "aws_iam_role" "glue_create" { + count = local.enable_glue_create ? 1 : 0 + + name = var.glue_create_config.iam_role_name + assume_role_policy = data.aws_iam_policy_document.glue_create_assume[0].json + managed_policy_arns = [aws_iam_policy.glue_create_managed[0].arn] + tags = var.tags +} + +resource "aws_lambda_function" "glue_create_lambda" { + count = local.enable_glue_create ? 1 : 0 + + description = "Greate tables in AWS Glue catalog based on the table prefix" + s3_key = var.glue_create_config.lambda_s3_key + s3_bucket = var.glue_create_config.lambda_s3_bucket + function_name = var.glue_create_config.lambda_function_name + role = aws_iam_role.glue_create[0].arn + handler = "provided" + runtime = "provided.al2" + + environment { + variables = { + RUST_LOG = var.rust_log_oxbow_debug_level + ATHENA_WORKGROUP = var.glue_create_config.athena_workgroup_name + ATHENA_DATA_SOURCE = var.glue_create_config.athena_data_source + GLUE_PATH_REGEX = var.glue_create_config.path_regex + UNWRAP_SNS_ENVELOPE = true + } + } +} + +resource "aws_lambda_event_source_mapping" "glue_create" { + count = local.enable_glue_create ? 1 : 0 + + event_source_arn = aws_sqs_queue.glue_create[0].arn + function_name = aws_lambda_function.glue_create_lambda[0].arn +} diff --git a/main.tf b/main.tf index 30a7193..738abc8 100644 --- a/main.tf +++ b/main.tf @@ -539,275 +539,3 @@ resource "aws_dynamodb_table" "this_oxbow_locking" { } tags = var.tags } - -# glue-create lambda resource -module "glue_create_athena_workgroup_bucket" { - count = local.enable_glue_create ? 1 : 0 - - source = "terraform-aws-modules/s3-bucket/aws" - version = "4.1.2" - bucket = var.glue_create_config.athena_bucket_name - block_public_acls = true - block_public_policy = true - ignore_public_acls = true - restrict_public_buckets = true - control_object_ownership = true - object_ownership = "BucketOwnerEnforced" - tags = var.tags - versioning = { - enabled = false - } -} - -resource "aws_athena_workgroup" "glue_create" { - count = local.enable_glue_create ? 1 : 0 - - name = var.glue_create_config.athena_workgroup_name - tags = var.tags - configuration { - enforce_workgroup_configuration = true - publish_cloudwatch_metrics_enabled = false - - result_configuration { - output_location = "s3://${module.glue_create_athena_workgroup_bucket[0].s3_bucket_id}/" - } - } - depends_on = [module.glue_create_athena_workgroup_bucket] -} - -data "aws_iam_policy_document" "glue_create_sqs" { - count = local.enable_glue_create ? 1 : 0 - - statement { - effect = "Allow" - principals { - type = "*" - identifiers = ["*"] - } - actions = ["sqs:SendMessage", "sqs:ReceiveMessage"] - resources = ["arn:aws:sqs:*:*:${var.glue_create_config.sqs_queue_name}"] - condition { - test = "ArnEquals" - variable = "aws:SourceArn" - values = [var.glue_create_config.sns_topic_arn] - } - } -} - -data "aws_iam_policy_document" "glue_create_sqs_dl" { - count = local.enable_glue_create ? 1 : 0 - - statement { - effect = "Allow" - principals { - type = "AWS" - identifiers = ["*"] - } - actions = ["sqs:SendMessage", "sqs:ReceiveMessage"] - resources = ["arn:aws:sqs:*:*:${var.glue_create_config.sqs_queue_name_dl}"] - condition { - test = "ForAllValues:StringEquals" - variable = "aws:SourceArn" - values = ["arn:aws:sqs:*:*:${var.glue_create_config.sqs_queue_name}"] - } - } -} - -resource "aws_sqs_queue" "glue_create" { - count = local.enable_glue_create ? 1 : 0 - - name = var.glue_create_config.sqs_queue_name - policy = data.aws_iam_policy_document.glue_create_sqs[0].json - visibility_timeout_seconds = var.sqs_visibility_timeout_seconds - delay_seconds = var.sqs_delay_seconds - redrive_policy = jsonencode({ - deadLetterTargetArn = aws_sqs_queue.glue_create_dl[0].arn - maxReceiveCount = var.sqs_redrive_policy_maxReceiveCount - }) - tags = var.tags -} - -resource "aws_sqs_queue" "glue_create_dl" { - count = local.enable_glue_create ? 1 : 0 - - name = var.glue_create_config.sqs_queue_name_dl - policy = data.aws_iam_policy_document.glue_create_sqs_dl[0].json - tags = var.tags -} - -resource "aws_sqs_queue_redrive_allow_policy" "terraform_queue_redrive_allow_policy" { - count = local.enable_glue_create ? 1 : 0 - - queue_url = aws_sqs_queue.glue_create_dl[0].id - redrive_allow_policy = jsonencode({ - redrivePermission = "byQueue", - sourceQueueArns = [aws_sqs_queue.glue_create[0].arn] - }) -} - -resource "aws_sns_topic_subscription" "glue_create_sns_sub" { - count = local.enable_glue_create ? 1 : 0 - - topic_arn = var.glue_create_config.sns_topic_arn - protocol = "sqs" - endpoint = aws_sqs_queue.glue_create[0].arn -} - -data "aws_iam_policy_document" "glue_create_assume" { - count = local.enable_glue_create ? 1 : 0 - - statement { - effect = "Allow" - principals { - type = "Service" - identifiers = ["lambda.amazonaws.com"] - } - actions = [ - "sts:AssumeRole", - ] - } -} - -data "aws_iam_policy_document" "glue_create" { - count = local.enable_glue_create ? 1 : 0 - - statement { - sid = "AthenaWorkgroupAthenaRW" - actions = [ - "athena:StartQueryExecution", - "athena:GetQueryResults", - "athena:GetWorkGroup", - "athena:StopQueryExecution", - "athena:GetQueryExecution", - ] - resources = [ - aws_athena_workgroup.glue_create[0].arn - ] - effect = "Allow" - } - statement { - sid = "AthenaWorkgroupS3RW" - effect = "Allow" - actions = [ - "s3:PutObject", - "s3:GetObject", - "s3:AbortMultipartUpload", - "s3:GetBucketLocation" - ] - resources = [ - "${module.glue_create_athena_workgroup_bucket[0].s3_bucket_arn}/*", - module.glue_create_athena_workgroup_bucket[0].s3_bucket_arn - ] - } - statement { - sid = "AthenaWorkgroupList1" - effect = "Allow" - actions = ["athena:ListWorkGroups"] - resources = ["*"] - } - statement { - sid = "GlueAllowTables" - effect = "Allow" - actions = [ - "glue:GetTable", - "glue:GetTables", - "glue:GetPartitions", - "glue:CreateTable", - "glue:UpdateTable" - ] - resources = [ - "arn:aws:glue:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:catalog", - "arn:aws:glue:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:database/*", - "arn:aws:glue:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:table/*" - ] - } - statement { - sid = "GlueCatalogAllowDatabases" - effect = "Allow" - actions = [ - "glue:GetDatabase", - "glue:GetDatabases", - "glue:CreateDatabase" - ] - resources = [ - "*" - ] - } - statement { - sid = "TableExtLocS3RO" - effect = "Allow" - actions = [ - "s3:GetObject", - "s3:GetObjectTagging", - "s3:GetObjectVersion", - "s3:GetBucketLocation", - "s3:ListBucket", - "s3:ListBucketVersions" - ] - resources = [ - var.warehouse_bucket_arn, - "${var.warehouse_bucket_arn}/${var.s3_path}/*" - ] - } - statement { - effect = "Allow" - actions = ["sqs:*"] - resources = [aws_sqs_queue.glue_create[0].arn] - } - statement { - effect = "Allow" - actions = [ - "logs:CreateLogGroup", - "logs:CreateLogStream", - "logs:PutLogEvents" - ] - resources = ["*"] - } -} - -resource "aws_iam_policy" "glue_create_managed" { - count = local.enable_glue_create ? 1 : 0 - - name = var.glue_create_config.iam_policy_name - description = "Glue create policy allows access to Athena and S3" - policy = data.aws_iam_policy_document.glue_create[0].json - tags = var.tags -} - -resource "aws_iam_role" "glue_create" { - count = local.enable_glue_create ? 1 : 0 - - name = var.glue_create_config.iam_role_name - assume_role_policy = data.aws_iam_policy_document.glue_create_assume[0].json - managed_policy_arns = [aws_iam_policy.glue_create_managed[0].arn] - tags = var.tags -} - -resource "aws_lambda_function" "glue_create_lambda" { - count = local.enable_glue_create ? 1 : 0 - - description = "Greate tables in AWS Glue catalog based on the table prefix" - s3_key = var.glue_create_config.lambda_s3_key - s3_bucket = var.glue_create_config.lambda_s3_bucket - function_name = var.glue_create_config.lambda_function_name - role = aws_iam_role.glue_create[0].arn - handler = "provided" - runtime = "provided.al2" - - environment { - variables = { - RUST_LOG = var.rust_log_oxbow_debug_level - ATHENA_WORKGROUP = var.glue_create_config.athena_workgroup_name - ATHENA_DATA_SOURCE = var.glue_create_config.athena_data_source - GLUE_PATH_REGEX = var.glue_create_config.path_regex - UNWRAP_SNS_ENVELOPE = true - } - } -} - -resource "aws_lambda_event_source_mapping" "glue_create" { - count = local.enable_glue_create ? 1 : 0 - - event_source_arn = aws_sqs_queue.glue_create[0].arn - function_name = aws_lambda_function.glue_create_lambda[0].arn -} From c3feabc202243f72ebee60371991755b94cb1e85 Mon Sep 17 00:00:00 2001 From: Kuntal Basu Date: Tue, 10 Sep 2024 16:59:37 -0400 Subject: [PATCH 4/8] Refactored the monitoring code --- glue_create.tf | 3 ++- glue_sync.tf | 3 ++- monitoring.tf | 37 +++++++++---------------------------- variables.tf | 15 +++++++++++++++ 4 files changed, 28 insertions(+), 30 deletions(-) diff --git a/glue_create.tf b/glue_create.tf index 1f2c96b..73479ea 100644 --- a/glue_create.tf +++ b/glue_create.tf @@ -252,7 +252,8 @@ resource "aws_lambda_function" "glue_create_lambda" { role = aws_iam_role.glue_create[0].arn handler = "provided" runtime = "provided.al2" - + memory_size = 1024 + timeout = 120 environment { variables = { RUST_LOG = var.rust_log_oxbow_debug_level diff --git a/glue_sync.tf b/glue_sync.tf index ab38813..d142148 100644 --- a/glue_sync.tf +++ b/glue_sync.tf @@ -181,7 +181,8 @@ resource "aws_lambda_function" "glue_sync_lambda" { role = aws_iam_role.glue_sync[0].arn handler = "provided" runtime = "provided.al2" - + memory_size = 1024 + timeout = 120 environment { variables = { RUST_LOG = var.rust_log_oxbow_debug_level diff --git a/monitoring.tf b/monitoring.tf index 2fed34a..461db37 100644 --- a/monitoring.tf +++ b/monitoring.tf @@ -2,42 +2,23 @@ locals { enable_dead_letters_monitoring = var.enabled_dead_letters_monitoring dl_warning = var.dl_warning dl_critical = var.dl_critical - + dlq_to_monitor = [ + local.enable_group_events ? var.sqs_fifo_DL_queue_name : var.sqs_queue_name_dl, + local.enable_glue_create ? var.glue_create_config.sqs_queue_name_dl : "", + local.enable_glue_sync ? var.glue_sync_config.sqs_queue_name_dl : "", + ] } resource "datadog_monitor" "dead_letters_monitor" { - count = local.enable_dead_letters_monitoring ? 1 : 0 - - type = "metric alert" - name = "${var.sqs_queue_name_dl}-monitor" - message = templatefile("${path.module}/templates/dl_monitor.tmpl", { - dead_letters_queue_name = local.enable_group_events ? var.sqs_fifo_DL_queue_name : var.sqs_queue_name_dl - notify = join(", ", var.dl_alert_recipients) - }) - query = "avg(last_1h):avg:aws.sqs.approximate_number_of_messages_visible{queuename:${var.sqs_queue_name_dl}} > ${var.dl_critical}" - - monitor_thresholds { - warning = local.dl_warning - warning_recovery = local.dl_warning - 1 - critical = local.dl_critical - critical_recovery = local.dl_critical - 1 - } - - notify_no_data = false - renotify_interval = 60 - tags = var.tags_monitoring -} - -resource "datadog_monitor" "dead_letters_monitor_glue_create" { - count = local.enable_dead_letters_monitoring && local.enable_glue_create ? 1 : 0 + for_each = toset(local.dlq_to_monitor) && local.enable_dead_letters_monitoring type = "metric alert" - name = "${var.glue_create_config.sqs_queue_name_dl}-monitor" + name = "${each.key}-monitor" message = templatefile("${path.module}/templates/dl_monitor.tmpl", { - dead_letters_queue_name = var.glue_create_config.sqs_queue_name_dl + dead_letters_queue_name = each.key notify = join(", ", var.dl_alert_recipients) }) - query = "avg(last_1h):avg:aws.sqs.approximate_number_of_messages_visible{queuename:${var.glue_create_config.sqs_queue_name_dl}} > ${var.dl_critical}" + query = "avg(last_1h):avg:aws.sqs.approximate_number_of_messages_visible{queuename:${each.key}} > ${var.dl_critical}" monitor_thresholds { warning = local.dl_warning diff --git a/variables.tf b/variables.tf index 956cdc2..20feb29 100644 --- a/variables.tf +++ b/variables.tf @@ -333,3 +333,18 @@ variable "enable_glue_sync" { description = "Whether to turn on Glue create Lambda" default = false } + +variable "glue_sync_config" { + type = object({ + lambda_s3_key = string // lambda s3 key - lambda path on S3 and file name filename + lambda_s3_bucket = string // lambda s3 bucket where lambda is stored + lambda_function_name = string // lambda function name + path_regex = string // regexp for mapping s3 path to database/table + sns_topic_arn = string // sns topic arn with s3 events (source for lambda) + sqs_queue_name = string // name of sqs queue for glue-sync lambda + sqs_queue_name_dl = string // name dead letter sqs que with not processed s3 events + iam_role_name = string // lambda role name + iam_policy_name = string // lambda policy name + }) + description = "Configuration of glue-sync lambda" +} From 949c4b0a44cae75719d9d23225be736a8e799a51 Mon Sep 17 00:00:00 2001 From: Kuntal Basu Date: Tue, 10 Sep 2024 17:02:22 -0400 Subject: [PATCH 5/8] Fixed syntax --- glue_sync.tf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/glue_sync.tf b/glue_sync.tf index d142148..12d7b2f 100644 --- a/glue_sync.tf +++ b/glue_sync.tf @@ -58,7 +58,7 @@ resource "aws_sqs_queue" "glue_sync_dl" { tags = var.tags } -resource "aws_sqs_queue_redrive_allow_policy" "terraform_queue_redrive_allow_policy" { +resource "aws_sqs_queue_redrive_allow_policy" "glue_syncredrive_allow_policy" { count = local.enable_glue_sync ? 1 : 0 queue_url = aws_sqs_queue.glue_sync_dl[0].id From 932c77eba2aa21a398a62620a6d3a96ba7529f69 Mon Sep 17 00:00:00 2001 From: Kuntal Basu Date: Tue, 10 Sep 2024 17:08:07 -0400 Subject: [PATCH 6/8] using var instead of local --- glue_create.tf | 28 ++++++++++++++-------------- glue_sync.tf | 24 ++++++++++++------------ main.tf | 1 - 3 files changed, 26 insertions(+), 27 deletions(-) diff --git a/glue_create.tf b/glue_create.tf index 73479ea..7153e1e 100644 --- a/glue_create.tf +++ b/glue_create.tf @@ -1,7 +1,7 @@ # glue-create lambda resource module "glue_create_athena_workgroup_bucket" { - count = local.enable_glue_create ? 1 : 0 + count = var.enable_glue_create ? 1 : 0 source = "terraform-aws-modules/s3-bucket/aws" version = "4.1.2" @@ -19,7 +19,7 @@ module "glue_create_athena_workgroup_bucket" { } resource "aws_athena_workgroup" "glue_create" { - count = local.enable_glue_create ? 1 : 0 + count = var.enable_glue_create ? 1 : 0 name = var.glue_create_config.athena_workgroup_name tags = var.tags @@ -35,7 +35,7 @@ resource "aws_athena_workgroup" "glue_create" { } data "aws_iam_policy_document" "glue_create_sqs" { - count = local.enable_glue_create ? 1 : 0 + count = var.enable_glue_create ? 1 : 0 statement { effect = "Allow" @@ -54,7 +54,7 @@ data "aws_iam_policy_document" "glue_create_sqs" { } data "aws_iam_policy_document" "glue_create_sqs_dl" { - count = local.enable_glue_create ? 1 : 0 + count = var.enable_glue_create ? 1 : 0 statement { effect = "Allow" @@ -73,7 +73,7 @@ data "aws_iam_policy_document" "glue_create_sqs_dl" { } resource "aws_sqs_queue" "glue_create" { - count = local.enable_glue_create ? 1 : 0 + count = var.enable_glue_create ? 1 : 0 name = var.glue_create_config.sqs_queue_name policy = data.aws_iam_policy_document.glue_create_sqs[0].json @@ -87,7 +87,7 @@ resource "aws_sqs_queue" "glue_create" { } resource "aws_sqs_queue" "glue_create_dl" { - count = local.enable_glue_create ? 1 : 0 + count = var.enable_glue_create ? 1 : 0 name = var.glue_create_config.sqs_queue_name_dl policy = data.aws_iam_policy_document.glue_create_sqs_dl[0].json @@ -95,7 +95,7 @@ resource "aws_sqs_queue" "glue_create_dl" { } resource "aws_sqs_queue_redrive_allow_policy" "terraform_queue_redrive_allow_policy" { - count = local.enable_glue_create ? 1 : 0 + count = var.enable_glue_create ? 1 : 0 queue_url = aws_sqs_queue.glue_create_dl[0].id redrive_allow_policy = jsonencode({ @@ -105,7 +105,7 @@ resource "aws_sqs_queue_redrive_allow_policy" "terraform_queue_redrive_allow_pol } resource "aws_sns_topic_subscription" "glue_create_sns_sub" { - count = local.enable_glue_create ? 1 : 0 + count = var.enable_glue_create ? 1 : 0 topic_arn = var.glue_create_config.sns_topic_arn protocol = "sqs" @@ -113,7 +113,7 @@ resource "aws_sns_topic_subscription" "glue_create_sns_sub" { } data "aws_iam_policy_document" "glue_create_assume" { - count = local.enable_glue_create ? 1 : 0 + count = var.enable_glue_create ? 1 : 0 statement { effect = "Allow" @@ -128,7 +128,7 @@ data "aws_iam_policy_document" "glue_create_assume" { } data "aws_iam_policy_document" "glue_create" { - count = local.enable_glue_create ? 1 : 0 + count = var.enable_glue_create ? 1 : 0 statement { sid = "AthenaWorkgroupAthenaRW" @@ -225,7 +225,7 @@ data "aws_iam_policy_document" "glue_create" { } resource "aws_iam_policy" "glue_create_managed" { - count = local.enable_glue_create ? 1 : 0 + count = var.enable_glue_create ? 1 : 0 name = var.glue_create_config.iam_policy_name description = "Glue create policy allows access to Athena and S3" @@ -234,7 +234,7 @@ resource "aws_iam_policy" "glue_create_managed" { } resource "aws_iam_role" "glue_create" { - count = local.enable_glue_create ? 1 : 0 + count = var.enable_glue_create ? 1 : 0 name = var.glue_create_config.iam_role_name assume_role_policy = data.aws_iam_policy_document.glue_create_assume[0].json @@ -243,7 +243,7 @@ resource "aws_iam_role" "glue_create" { } resource "aws_lambda_function" "glue_create_lambda" { - count = local.enable_glue_create ? 1 : 0 + count = var.enable_glue_create ? 1 : 0 description = "Greate tables in AWS Glue catalog based on the table prefix" s3_key = var.glue_create_config.lambda_s3_key @@ -266,7 +266,7 @@ resource "aws_lambda_function" "glue_create_lambda" { } resource "aws_lambda_event_source_mapping" "glue_create" { - count = local.enable_glue_create ? 1 : 0 + count = var.enable_glue_create ? 1 : 0 event_source_arn = aws_sqs_queue.glue_create[0].arn function_name = aws_lambda_function.glue_create_lambda[0].arn diff --git a/glue_sync.tf b/glue_sync.tf index 12d7b2f..7f8a1c0 100644 --- a/glue_sync.tf +++ b/glue_sync.tf @@ -1,5 +1,5 @@ data "aws_iam_policy_document" "glue_sync_sqs" { - count = local.enable_glue_sync ? 1 : 0 + count = var.enable_glue_sync ? 1 : 0 statement { effect = "Allow" @@ -18,7 +18,7 @@ data "aws_iam_policy_document" "glue_sync_sqs" { } data "aws_iam_policy_document" "glue_sync_sqs_dl" { - count = local.enable_glue_sync ? 1 : 0 + count = var.enable_glue_sync ? 1 : 0 statement { effect = "Allow" @@ -37,7 +37,7 @@ data "aws_iam_policy_document" "glue_sync_sqs_dl" { } resource "aws_sqs_queue" "glue_sync" { - count = local.enable_glue_sync ? 1 : 0 + count = var.enable_glue_sync ? 1 : 0 name = var.glue_sync_config.sqs_queue_name policy = data.aws_iam_policy_document.glue_sync_sqs[0].json @@ -51,7 +51,7 @@ resource "aws_sqs_queue" "glue_sync" { } resource "aws_sqs_queue" "glue_sync_dl" { - count = local.enable_glue_sync ? 1 : 0 + count = var.enable_glue_sync ? 1 : 0 name = var.glue_sync_config.sqs_queue_name_dl policy = data.aws_iam_policy_document.glue_sync_sqs_dl[0].json @@ -59,7 +59,7 @@ resource "aws_sqs_queue" "glue_sync_dl" { } resource "aws_sqs_queue_redrive_allow_policy" "glue_syncredrive_allow_policy" { - count = local.enable_glue_sync ? 1 : 0 + count = var.enable_glue_sync ? 1 : 0 queue_url = aws_sqs_queue.glue_sync_dl[0].id redrive_allow_policy = jsonencode({ @@ -69,7 +69,7 @@ resource "aws_sqs_queue_redrive_allow_policy" "glue_syncredrive_allow_policy" { } resource "aws_sns_topic_subscription" "glue_sync_sns_sub" { - count = local.enable_glue_sync ? 1 : 0 + count = var.enable_glue_sync ? 1 : 0 topic_arn = var.glue_sync_config.sns_topic_arn protocol = "sqs" @@ -77,7 +77,7 @@ resource "aws_sns_topic_subscription" "glue_sync_sns_sub" { } data "aws_iam_policy_document" "glue_sync_assume" { - count = local.enable_glue_sync ? 1 : 0 + count = var.enable_glue_sync ? 1 : 0 statement { effect = "Allow" @@ -92,7 +92,7 @@ data "aws_iam_policy_document" "glue_sync_assume" { } data "aws_iam_policy_document" "glue_sync" { - count = local.enable_glue_sync ? 1 : 0 + count = var.enable_glue_sync ? 1 : 0 statement { sid = "GlueAllowTables" effect = "Allow" @@ -154,7 +154,7 @@ data "aws_iam_policy_document" "glue_sync" { } resource "aws_iam_policy" "glue_sync_managed" { - count = local.enable_glue_sync ? 1 : 0 + count = var.enable_glue_sync ? 1 : 0 name = var.glue_sync_config.iam_policy_name description = "Glue create policy allows access to Athena and S3" @@ -163,7 +163,7 @@ resource "aws_iam_policy" "glue_sync_managed" { } resource "aws_iam_role" "glue_sync" { - count = local.enable_glue_sync ? 1 : 0 + count = var.enable_glue_sync ? 1 : 0 name = var.glue_sync_config.iam_role_name assume_role_policy = data.aws_iam_policy_document.glue_sync_assume[0].json @@ -172,7 +172,7 @@ resource "aws_iam_role" "glue_sync" { } resource "aws_lambda_function" "glue_sync_lambda" { - count = local.enable_glue_sync ? 1 : 0 + count = var.enable_glue_sync ? 1 : 0 description = "Greate tables in AWS Glue catalog based on the table prefix" s3_key = var.glue_sync_config.lambda_s3_key @@ -193,7 +193,7 @@ resource "aws_lambda_function" "glue_sync_lambda" { } resource "aws_lambda_event_source_mapping" "glue_sync" { - count = local.enable_glue_sync ? 1 : 0 + count = var.enable_glue_sync ? 1 : 0 event_source_arn = aws_sqs_queue.glue_sync[0].arn function_name = aws_lambda_function.glue_sync_lambda[0].arn diff --git a/main.tf b/main.tf index 738abc8..d48cb46 100644 --- a/main.tf +++ b/main.tf @@ -8,7 +8,6 @@ locals { enable_kinesis_firehose_delivery_stream = var.enable_kinesis_firehose_delivery_stream enable_bucket_notification = var.enable_bucket_notification enable_group_events = var.enable_group_events - enable_glue_create = var.enable_glue_create } From e6ba967ea55bdfc6233789c1b9b9c7a1c40d8cb4 Mon Sep 17 00:00:00 2001 From: Kuntal Basu Date: Tue, 10 Sep 2024 17:09:28 -0400 Subject: [PATCH 7/8] moving to var instead of local --- monitoring.tf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/monitoring.tf b/monitoring.tf index 461db37..bb8de32 100644 --- a/monitoring.tf +++ b/monitoring.tf @@ -4,8 +4,8 @@ locals { dl_critical = var.dl_critical dlq_to_monitor = [ local.enable_group_events ? var.sqs_fifo_DL_queue_name : var.sqs_queue_name_dl, - local.enable_glue_create ? var.glue_create_config.sqs_queue_name_dl : "", - local.enable_glue_sync ? var.glue_sync_config.sqs_queue_name_dl : "", + var.enable_glue_create ? var.glue_create_config.sqs_queue_name_dl : "", + var.enable_glue_sync ? var.glue_sync_config.sqs_queue_name_dl : "", ] } From 8df292335e36eda1d870edbaccc93dd46891d097 Mon Sep 17 00:00:00 2001 From: Kuntal Basu Date: Tue, 10 Sep 2024 17:11:59 -0400 Subject: [PATCH 8/8] fixing monitoring syntax --- monitoring.tf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/monitoring.tf b/monitoring.tf index bb8de32..a84781e 100644 --- a/monitoring.tf +++ b/monitoring.tf @@ -10,7 +10,7 @@ locals { } resource "datadog_monitor" "dead_letters_monitor" { - for_each = toset(local.dlq_to_monitor) && local.enable_dead_letters_monitoring + for_each = local.enable_dead_letters_monitoring ? toset(local.dlq_to_monitor) : [] type = "metric alert" name = "${each.key}-monitor"