From f3b21003cff872899e68ceef48e38f84a7aaead4 Mon Sep 17 00:00:00 2001 From: Maksym Dovhal Date: Wed, 19 Jun 2024 10:18:31 +0300 Subject: [PATCH 1/2] DATAPLAT-260: integrate glue-sync lambda --- main.tf | 289 +++++++++++++++++++++++++++++++++++++++++++++++++- monitoring.tf | 23 ++++ variables.tf | 26 ++++- 3 files changed, 335 insertions(+), 3 deletions(-) diff --git a/main.tf b/main.tf index aeb2bd3..757a0d8 100644 --- a/main.tf +++ b/main.tf @@ -1,12 +1,14 @@ # This module creates Kinesis Firehose service (optionally), SQS, lambda function OXBOW # to receive data and convert it into parquet then Delta log is added by Oxbow lambda data "aws_caller_identity" "current" {} +data "aws_region" "current" {} locals { enable_aws_glue_catalog_table = var.enable_aws_glue_catalog_table enable_kinesis_firehose_delivery_stream = var.enable_kinesis_firehose_delivery_stream enable_bucket_notification = var.enable_bucket_notification enable_group_events = var.enable_group_events + enable_glue_create = var.enable_glue_create } @@ -235,7 +237,6 @@ data "aws_iam_policy_document" "group_event_lambda_sqs_dlq" { } } - resource "aws_sqs_queue" "group_events_lambda_sqs" { count = local.enable_group_events ? 1 : 0 name = var.sqs_group_queue_name @@ -258,7 +259,7 @@ resource "aws_sqs_queue" "group_events_lambda_sqs_dlq" { -### This is to ensure we are triggering oxbow lambda properly whether grou event is enable or not +### This is to ensure we are triggering oxbow lambda properly whether group event is enable or not ### if group event is enabled we are using the fifo queue populated by group events as a source for oxbow resource "aws_lambda_event_source_mapping" "this_lambda_events" { event_source_arn = local.enable_group_events ? aws_sqs_queue.oxbow_lambda_fifo_sqs[0].arn : aws_sqs_queue.this_sqs[0].arn @@ -536,3 +537,287 @@ resource "aws_dynamodb_table" "this_oxbow_locking" { } tags = var.tags } + +# glue-create lambda resource +module "glue_create_athena_workgroup_bucket" { + count = local.enable_glue_create ? 1 : 0 + + source = "terraform-aws-modules/s3-bucket/aws" + version = "4.1.2" + bucket = var.glue_create_config.athena_bucket_name + block_public_acls = true + block_public_policy = true + ignore_public_acls = true + restrict_public_buckets = true + control_object_ownership = true + object_ownership = "BucketOwnerEnforced" + tags = var.tags + versioning = { + enabled = false + } +} + +resource "aws_athena_workgroup" "glue_create" { + count = local.enable_glue_create ? 1 : 0 + + name = var.glue_create_config.athena_workgroup_name + tags = var.tags + configuration { + enforce_workgroup_configuration = true + publish_cloudwatch_metrics_enabled = false + + result_configuration { + output_location = "s3://${module.glue_create_athena_workgroup_bucket[0].s3_bucket_id}/" + } + } +} + +data "aws_iam_policy_document" "glue_create_sqs" { + count = local.enable_glue_create ? 1 : 0 + + statement { + effect = "Allow" + principals { + type = "*" + identifiers = ["*"] + } + actions = ["sqs:SendMessage"] + resources = ["arn:aws:sqs:*:*:${var.glue_create_config.sqs_queue_name}"] + condition { + test = "ArnEquals" + variable = "aws:SourceArn" + values = [var.glue_create_config.sns_topic_arn] + } + } +} + +data "aws_iam_policy_document" "glue_create_sqs_dl" { + count = local.enable_glue_create ? 1 : 0 + + statement { + effect = "Allow" + principals { + type = "AWS" + identifiers = ["*"] + } + actions = ["sqs:SendMessage"] + resources = ["arn:aws:sqs:*:*:${var.glue_create_config.sqs_queue_name_dl}"] + condition { + test = "ForAllValues:StringEquals" + variable = "aws:SourceArn" + values = ["arn:aws:sqs:*:*:${var.glue_create_config.sqs_queue_name}"] + } + } +} + +resource "aws_sqs_queue" "glue_create" { + count = local.enable_glue_create ? 1 : 0 + + name = var.glue_create_config.sqs_queue_name + policy = data.aws_iam_policy_document.glue_create_sqs[0].json + visibility_timeout_seconds = var.sqs_visibility_timeout_seconds + delay_seconds = var.sqs_delay_seconds + redrive_policy = jsonencode({ + deadLetterTargetArn = aws_sqs_queue.glue_create_dl[0].arn + maxReceiveCount = var.sqs_redrive_policy_maxReceiveCount + }) + tags = var.tags +} + +resource "aws_sqs_queue" "glue_create_dl" { + count = local.enable_glue_create ? 1 : 0 + + name = var.glue_create_config.sqs_queue_name_dl + policy = data.aws_iam_policy_document.glue_create_sqs_dl[0].json + tags = var.tags +} + +resource "aws_sqs_queue_redrive_allow_policy" "terraform_queue_redrive_allow_policy" { + count = local.enable_glue_create ? 1 : 0 + + queue_url = aws_sqs_queue.glue_create_dl[0].id + redrive_allow_policy = jsonencode({ + redrivePermission = "byQueue", + sourceQueueArns = [aws_sqs_queue.glue_create[0].arn] + }) +} + +resource "aws_sns_topic_subscription" "glue_create_sns_sub" { + count = local.enable_glue_create ? 1 : 0 + + topic_arn = var.glue_create_config.sns_topic_arn + protocol = "sqs" + endpoint = aws_sqs_queue.glue_create[0].arn +} + +data "aws_iam_policy_document" "glue_create_assume" { + count = local.enable_glue_create ? 1 : 0 + + statement { + effect = "Allow" + principals { + type = "Service" + identifiers = ["lambda.amazonaws.com"] + } + actions = [ + "sts:AssumeRole", + ] + } +} + +data "aws_iam_policy_document" "glue_create" { + count = local.enable_glue_create ? 1 : 0 + + statement { + sid = "AthenaWorkgroupAthenaRW" + actions = [ + "athena:StartQueryExecution", + "athena:GetQueryResults", + "athena:GetWorkGroup", + "athena:StopQueryExecution", + "athena:GetQueryExecution", + ] + resources = [ + aws_athena_workgroup.glue_create[0].arn + ] + effect = "Allow" + } + statement { + sid = "AthenaWorkgroupS3RW" + effect = "Allow" + actions = [ + "s3:PutObject", + "s3:GetObject", + "s3:AbortMultipartUpload", + "s3:GetBucketLocation" + ] + resources = [ + "${module.glue_create_athena_workgroup_bucket[0].s3_bucket_arn}/*", + module.glue_create_athena_workgroup_bucket[0].s3_bucket_arn + ] + } + statement { + sid = "AthenaWorkgroupList1" + effect = "Allow" + actions = ["athena:ListWorkGroups"] + resources = ["*"] + } + statement { + sid = "GlueAllowTables" + effect = "Allow" + actions = [ + "glue:GetTable", + "glue:GetTables", + "glue:GetPartitions", + "glue:CreateTable" + ] + resources = [ + "arn:aws:glue:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:catalog", + "arn:aws:glue:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:database/*", + "arn:aws:glue:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:table/*" + ] + } + statement { + sid = "GlueCatalogAllowDatabases" + effect = "Allow" + actions = [ + "glue:GetDatabase", + "glue:GetDatabases", + ] + resources = [ + "*" + ] + } + statement { + sid = "TableExtLocS3RO" + effect = "Allow" + actions = [ + "s3:GetObject", + "s3:GetObjectTagging", + "s3:GetObjectVersion", + "s3:GetBucketLocation", + "s3:ListBucket", + "s3:ListBucketVersions" + ] + resources = [ + var.warehouse_bucket_arn, + "${var.warehouse_bucket_arn}/${var.s3_path}/*" + ] + } + statement { + effect = "Allow" + principals { + type = "*" + identifiers = ["*"] + } + actions = ["sqs:ReceiveMessage"] + resources = [aws_sqs_queue.glue_create[0].arn] + + condition { + test = "ArnEquals" + variable = "aws:SourceArn" + values = [var.warehouse_bucket_arn] + } + } + statement { + effect = "Allow" + principals { + type = "AWS" + identifiers = ["*"] + } + actions = [ + "sqs:SendMessage" + ] + resources = [aws_sqs_queue.glue_create[0].arn] + condition { + test = "ForAllValues:StringEquals" + variable = "aws:SourceArn" + values = [aws_sqs_queue.glue_create_dl[0].arn] + } + } +} + +resource "aws_iam_policy" "glue_create_managed" { + count = local.enable_glue_create ? 1 : 0 + + name = var.glue_create_config.iam_police_name + description = "Glue create policy allows access to Athena and S3" + policy = data.aws_iam_policy_document.glue_create[0].json + tags = var.tags +} + +resource "aws_iam_role" "glue_create" { + name = var.glue_create_config.iam_role_name + assume_role_policy = data.aws_iam_policy_document.glue_create_assume[0].json + managed_policy_arns = [aws_iam_policy.glue_create_managed[0].arn] + tags = var.tags +} + +resource "aws_lambda_function" "glue_create_lambda" { + count = local.enable_glue_create ? 1 : 0 + + description = "Greate tables in AWS Glue catalog based on the table prefix" + s3_key = var.glue_create_config.lambda_s3_key + s3_bucket = var.glue_create_config.lambda_s3_bucket + function_name = var.glue_create_config.lambda_function_name + role = aws_iam_role.glue_create.arn + handler = "provided" + runtime = "provided.al2" + + environment { + variables = { + RUST_LOG = var.rust_log_oxbow_debug_level + ATHENA_WORKGROUP = var.glue_create_config.athena_workgroup_name + ATHENA_DATA_SOURCE = var.glue_create_config.athena_data_source + GLUE_PATH_REGEX = var.glue_create_config.path_regex + UNWRAP_SNS_ENVELOPE = true + } + } +} + +resource "aws_lambda_event_source_mapping" "glue_create" { + count = local.enable_glue_create ? 1 : 0 + + event_source_arn = aws_sqs_queue.glue_create[0].arn + function_name = aws_lambda_function.glue_create_lambda[0].arn +} diff --git a/monitoring.tf b/monitoring.tf index 90a8876..2fed34a 100644 --- a/monitoring.tf +++ b/monitoring.tf @@ -27,3 +27,26 @@ resource "datadog_monitor" "dead_letters_monitor" { renotify_interval = 60 tags = var.tags_monitoring } + +resource "datadog_monitor" "dead_letters_monitor_glue_create" { + count = local.enable_dead_letters_monitoring && local.enable_glue_create ? 1 : 0 + + type = "metric alert" + name = "${var.glue_create_config.sqs_queue_name_dl}-monitor" + message = templatefile("${path.module}/templates/dl_monitor.tmpl", { + dead_letters_queue_name = var.glue_create_config.sqs_queue_name_dl + notify = join(", ", var.dl_alert_recipients) + }) + query = "avg(last_1h):avg:aws.sqs.approximate_number_of_messages_visible{queuename:${var.glue_create_config.sqs_queue_name_dl}} > ${var.dl_critical}" + + monitor_thresholds { + warning = local.dl_warning + warning_recovery = local.dl_warning - 1 + critical = local.dl_critical + critical_recovery = local.dl_critical - 1 + } + + notify_no_data = false + renotify_interval = 60 + tags = var.tags_monitoring +} diff --git a/variables.tf b/variables.tf index ef717ff..05bb410 100644 --- a/variables.tf +++ b/variables.tf @@ -294,6 +294,30 @@ variable "enable_auto_tagging" { variable "sns_topic_arn" { type = string - description = "Optional arn to enable the SNS subscription and ENV var for Oxbo" + description = "Optional arn to enable the SNS subscription and ENV var for Oxbow" default = "" } + +variable "enable_glue_create" { + type = bool + description = "Whether to turn on Glue create Lambda" + default = false +} + +variable "glue_create_config" { + type = object({ + athena_workgroup_name = string // Name of AWS Athena workgroup + athena_data_source = string // Arn name of AWS Athena data source (catalog) + athena_bucket_name = string // name of AWS Athena bucket. + lambda_s3_key = string // lambda s3 key - lambda path on S3 and file name filename + lambda_s3_bucket = string // lambda s3 bucket where lambda is stored + lambda_function_name = string // lambda function name + path_regex = string // regexp for mapping s3 path to database/table + sns_topic_arn = string // sns topic arn with s3 events (source for lambda) + sqs_queue_name = string // name of sqs queue for glue-sync lambda + sqs_queue_name_dl = string // name dead letter sqs que with not processed s3 events + iam_role_name = string // lambda role name + iam_police_name = string // lambda policy name + }) + description = "Configuration of glue-create lambda" +} From 3c6adaf0d3c9d00192294a22570a2eca63e80e9a Mon Sep 17 00:00:00 2001 From: Maksym Dovhal Date: Thu, 20 Jun 2024 17:45:59 +0300 Subject: [PATCH 2/2] Added dependencies on bucket --- main.tf | 1 + 1 file changed, 1 insertion(+) diff --git a/main.tf b/main.tf index 757a0d8..8f5b507 100644 --- a/main.tf +++ b/main.tf @@ -570,6 +570,7 @@ resource "aws_athena_workgroup" "glue_create" { output_location = "s3://${module.glue_create_athena_workgroup_bucket[0].s3_bucket_id}/" } } + depends_on = [module.glue_create_athena_workgroup_bucket] } data "aws_iam_policy_document" "glue_create_sqs" {