From 6b384a14862558422a162ac44d741720d0673c14 Mon Sep 17 00:00:00 2001 From: kuntalkumarbasu Date: Wed, 10 Apr 2024 15:55:48 -0400 Subject: [PATCH 1/7] fix!: targeting proper sqs when group event lambda is enabled --- main.tf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.tf b/main.tf index 87c96e6..7a99f35 100644 --- a/main.tf +++ b/main.tf @@ -192,7 +192,7 @@ resource "aws_sns_topic_subscription" "this_sns_sub" { topic_arn = var.sns_topic_arn protocol = "sqs" - endpoint = aws_sqs_queue.this_sqs[0].arn + endpoint = local.enable_group_events ? aws_sqs_queue.this_group_events[0].arn : aws_sqs_queue.this_sqs[0].arn } resource "aws_lambda_permission" "this_lambda_allow_bucket_permissions" { From c6be18d197646ba34d344fbe8847c830336548ca Mon Sep 17 00:00:00 2001 From: kuntalkumarbasu Date: Wed, 10 Apr 2024 16:47:42 -0400 Subject: [PATCH 2/7] fix!:the policy and queues for group event and fifo --- main.tf | 143 ++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 119 insertions(+), 24 deletions(-) diff --git a/main.tf b/main.tf index 7a99f35..e0781b2 100644 --- a/main.tf +++ b/main.tf @@ -101,8 +101,8 @@ resource "aws_lambda_function" "this_lambda" { } tags = var.tags } - -resource "aws_lambda_function" "this_group_events_lambda" { +#### This lambda is optional and used only when grouping of events is required +resource "aws_lambda_function" "group_events_lambda" { count = local.enable_group_events ? 1 : 0 description = "Group events for oxbow based on the table prefix" s3_key = var.events_lambda_s3_key @@ -115,55 +115,149 @@ resource "aws_lambda_function" "this_group_events_lambda" { environment { variables = { RUST_LOG = "group-events=${var.rust_log_oxbow_debug_level}" - QUEUE_URL = aws_sqs_queue.this_sqs_fifo[0].url + QUEUE_URL = aws_sqs_queue.oxbow_lambda_fifo_sqs[0].url + } + } +} + + +data "aws_iam_policy_document" "oxbow_lambda_fifo_sqs" { + count = local.enable_group_events ? 1 : 0 + statement { + effect = "Allow" + principals { + type = "*" + identifiers = ["*"] + } + actions = ["sqs:SendMessage"] + # Hard-coding an ARN like syntax here because of the dependency cycle + resources = ["arn:aws:sqs:*:*:${var.sqs_fifo_queue_name}.fifo"] + + condition { + test = "ArnEquals" + variable = "aws:SourceArn" + values = [var.warehouse_bucket_arn] } } } -resource "aws_sqs_queue" "this_sqs_fifo" { +data "aws_iam_policy_document" "oxbow_lambda_fifo_sqs_dlq" { + count = local.enable_group_events ? 1 : 0 + statement { + sid = "DLQSendMessages" + effect = "Allow" + principals { + type = "AWS" + identifiers = ["*"] + } + actions = [ + "sqs:SendMessage" + ] + resources = ["arn:aws:sqs:*:*:${var.sqs_fifo_DL_queue_name}"] + condition { + test = "ForAllValues:StringEquals" + variable = "aws:SourceArn" + values = [ + "arn:aws:sqs:*:*:${var.sqs_fifo_DL_queue_name}.fifo" + ] + } + } +} + +resource "aws_sqs_queue" "oxbow_lambda_fifo_sqs" { count = local.enable_group_events ? 1 : 0 name = "${var.sqs_fifo_queue_name}.fifo" - policy = data.aws_iam_policy_document.this_sqs_queue_policy_data.json + policy = data.aws_iam_policy_document.oxbow_lambda_fifo_sqs[0].json content_based_deduplication = true fifo_queue = true redrive_policy = jsonencode({ - deadLetterTargetArn = aws_sqs_queue.this_sqs_fifo_dlq[0].arn + deadLetterTargetArn = aws_sqs_queue.oxbow_lambda_fifo_sqs_dlq[0].arn maxReceiveCount = 8 }) } -resource "aws_sqs_queue" "this_sqs_fifo_dlq" { +resource "aws_sqs_queue" "oxbow_lambda_fifo_sqs_dlq" { count = local.enable_group_events ? 1 : 0 name = "${var.sqs_fifo_DL_queue_name}.fifo" + policy = data.aws_iam_policy_document.oxbow_lambda_fifo_sqs_dlq[0].json fifo_queue = true } -resource "aws_lambda_event_source_mapping" "this_group_events_trigger" { +resource "aws_lambda_event_source_mapping" "group_events_lambda_sqs_trigger" { count = local.enable_group_events ? 1 : 0 - event_source_arn = aws_sqs_queue.this_group_events[0].arn - function_name = aws_lambda_function.this_group_events_lambda[0].arn + event_source_arn = aws_sqs_queue.group_events_lambda_sqs[0].arn + function_name = aws_lambda_function.group_events_lambda[0].arn +} + + +data "aws_iam_policy_document" "group_event_lambda_sqs" { + count = local.enable_group_events ? 1 : 0 + statement { + effect = "Allow" + principals { + type = "*" + identifiers = ["*"] + } + actions = ["sqs:SendMessage"] + # Hard-coding an ARN like syntax here because of the dependency cycle + resources = ["arn:aws:sqs:*:*:${var.sqs_group_queue_name}"] + condition { + test = "ArnEquals" + variable = "aws:SourceArn" + values = [var.warehouse_bucket_arn] + } + } } -resource "aws_sqs_queue" "this_group_events" { +data "aws_iam_policy_document" "group_event_lambda_sqs_dlq" { + count = local.enable_group_events ? 1 : 0 + statement { + sid = "DLQSendMessages" + effect = "Allow" + principals { + type = "AWS" + identifiers = ["*"] + } + actions = [ + "sqs:SendMessage" + ] + resources = ["arn:aws:sqs:*:*:${var.sqs_group_DL_queue_name}"] + condition { + test = "ForAllValues:StringEquals" + variable = "aws:SourceArn" + values = [ + "arn:aws:sqs:*:*:${var.sqs_group_queue_name}" + ] + } + } +} + + +resource "aws_sqs_queue" "group_events_lambda_sqs" { count = local.enable_group_events ? 1 : 0 name = var.sqs_group_queue_name - policy = data.aws_iam_policy_document.this_sqs_queue_policy_data.json + policy = var.sns_topic_arn == "" ? data.aws_iam_policy_document.group_event_lambda_sqs[0].json : data.aws_iam_policy_document.this_sns_to_sqs[0].json redrive_policy = jsonencode({ - deadLetterTargetArn = aws_sqs_queue.this_group_events_dlq[0].arn + deadLetterTargetArn = aws_sqs_queue.group_events_lambda_sqs_dlq[0].arn maxReceiveCount = 8 }) } -resource "aws_sqs_queue" "this_group_events_dlq" { - count = local.enable_group_events ? 1 : 0 - name = var.sqs_group_DL_queue_name +resource "aws_sqs_queue" "group_events_lambda_sqs_dlq" { + count = local.enable_group_events ? 1 : 0 + policy = data.aws_iam_policy_document.group_event_lambda_sqs_dlq[0].json + name = var.sqs_group_DL_queue_name } + + +### This is to ensure we are triggering oxbow lambda properly whether grou event is enable or not +### if group event is enabled we are using the fifo queue populated by group events as a source for oxbow resource "aws_lambda_event_source_mapping" "this_lambda_events" { - event_source_arn = local.enable_group_events ? aws_sqs_queue.this_sqs_fifo[0].arn : aws_sqs_queue.this_sqs[0].arn + event_source_arn = local.enable_group_events ? aws_sqs_queue.oxbow_lambda_fifo_sqs[0].arn : aws_sqs_queue.this_sqs[0].arn function_name = aws_lambda_function.this_lambda.arn } @@ -192,7 +286,7 @@ resource "aws_sns_topic_subscription" "this_sns_sub" { topic_arn = var.sns_topic_arn protocol = "sqs" - endpoint = local.enable_group_events ? aws_sqs_queue.this_group_events[0].arn : aws_sqs_queue.this_sqs[0].arn + endpoint = local.enable_group_events ? aws_sqs_queue.group_events_lambda_sqs[0].arn : aws_sqs_queue.this_sqs[0].arn } resource "aws_lambda_permission" "this_lambda_allow_bucket_permissions" { @@ -209,7 +303,7 @@ resource "aws_s3_bucket_notification" "this_bucket_notification" { count = local.enable_bucket_notification ? 1 : 0 bucket = var.warehouse_bucket_name queue { - queue_arn = local.enable_group_events ? aws_sqs_queue.this_group_events[0].arn : aws_sqs_queue.this_sqs[0].arn + queue_arn = local.enable_group_events ? aws_sqs_queue.group_events_lambda_sqs[0].arn : aws_sqs_queue.this_sqs[0].arn events = ["s3:ObjectCreated:*"] filter_suffix = ".parquet" filter_prefix = "${var.s3_path}/" @@ -277,7 +371,7 @@ resource "aws_iam_policy" "this_lambda_permissions" { }, { Action = ["sqs:*"] - Resource = local.enable_group_events ? aws_sqs_queue.this_group_events[0].arn : aws_sqs_queue.this_sqs[0].arn + Resource = local.enable_group_events ? aws_sqs_queue.group_events_lambda_sqs[0].arn : aws_sqs_queue.this_sqs[0].arn Effect = "Allow" }, { @@ -302,7 +396,7 @@ data "aws_iam_policy_document" "this_sqs_queue_policy_data" { } actions = ["sqs:SendMessage"] # Hard-coding an ARN like syntax here because of the dependency cycle - resources = local.enable_group_events ? ["arn:aws:sqs:*:*:${var.sqs_group_queue_name}", "arn:aws:sqs:*:*:${var.sqs_fifo_queue_name}.fifo"] : ["arn:aws:sqs:*:*:${var.sqs_queue_name}"] + resources = ["arn:aws:sqs:*:*:${var.sqs_queue_name}"] condition { test = "ArnEquals" @@ -312,6 +406,7 @@ data "aws_iam_policy_document" "this_sqs_queue_policy_data" { } } + data "aws_iam_policy_document" "this_sns_to_sqs" { count = var.sns_topic_arn == "" ? 0 : 1 @@ -322,7 +417,7 @@ data "aws_iam_policy_document" "this_sns_to_sqs" { identifiers = ["*"] } actions = ["sqs:SendMessage"] - resources = local.enable_group_events ? ["arn:aws:sqs:*:*:${var.sqs_group_queue_name}", "arn:aws:sqs:*:*:${var.sqs_fifo_queue_name}.fifo"] : ["arn:aws:sqs:*:*:${var.sqs_queue_name}"] + resources = local.enable_group_events ? ["arn:aws:sqs:*:*:${var.sqs_group_queue_name}"] : ["arn:aws:sqs:*:*:${var.sqs_queue_name}"] condition { test = "ArnEquals" variable = "aws:SourceArn" @@ -332,7 +427,6 @@ data "aws_iam_policy_document" "this_sns_to_sqs" { } - data "aws_iam_policy_document" "this_dead_letter_queue_policy" { statement { sid = "DLQSendMessages" @@ -344,7 +438,7 @@ data "aws_iam_policy_document" "this_dead_letter_queue_policy" { actions = [ "sqs:SendMessage" ] - resources = local.enable_group_events ? ["arn:aws:sqs:*:*:${var.sqs_group_DL_queue_name}", "arn:aws:sqs:*:*:${var.sqs_fifo_DL_queue_name}.fifo"] : ["arn:aws:sqs:*:*:${var.sqs_queue_name_dl}"] + resources = ["arn:aws:sqs:*:*:${var.sqs_queue_name_dl}"] condition { test = "ForAllValues:StringEquals" variable = "aws:SourceArn" @@ -355,6 +449,7 @@ data "aws_iam_policy_document" "this_dead_letter_queue_policy" { } } + data "aws_iam_policy_document" "this_kinesis_policy_data" { count = local.enable_kinesis_firehose_delivery_stream ? 1 : 0 statement { From 982d77fb20e0d817da90026720809a649c4db2a7 Mon Sep 17 00:00:00 2001 From: kuntalkumarbasu Date: Wed, 10 Apr 2024 16:52:18 -0400 Subject: [PATCH 3/7] fix:!changed the variable name in output --- outputs.tf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/outputs.tf b/outputs.tf index 5dbf2d7..fb32e61 100644 --- a/outputs.tf +++ b/outputs.tf @@ -10,7 +10,7 @@ output "lambda_arn" { output "sqs_queue_arn" { description = "SQSqueue ARN" - value = length(aws_sqs_queue.this_sqs) > 0 ? aws_sqs_queue.this_sqs[0].arn : aws_sqs_queue.this_sqs_fifo[0].arn + value = length(aws_sqs_queue.this_sqs) > 0 ? aws_sqs_queue.this_sqs[0].arn : aws_sqs_queue.oxbow_lambda_fifo_sqs[0].arn } output "autotag_sqs_arn" { From 1bd0264e32d14f682b291630e193a1676c1937a9 Mon Sep 17 00:00:00 2001 From: kuntalkumarbasu Date: Wed, 10 Apr 2024 17:19:42 -0400 Subject: [PATCH 4/7] ensuring we are monitoring the proper queue when group event is enabled --- monitoring.tf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/monitoring.tf b/monitoring.tf index 8b3afac..90a8876 100644 --- a/monitoring.tf +++ b/monitoring.tf @@ -11,7 +11,7 @@ resource "datadog_monitor" "dead_letters_monitor" { type = "metric alert" name = "${var.sqs_queue_name_dl}-monitor" message = templatefile("${path.module}/templates/dl_monitor.tmpl", { - dead_letters_queue_name = var.sqs_queue_name_dl + dead_letters_queue_name = local.enable_group_events ? var.sqs_fifo_DL_queue_name : var.sqs_queue_name_dl notify = join(", ", var.dl_alert_recipients) }) query = "avg(last_1h):avg:aws.sqs.approximate_number_of_messages_visible{queuename:${var.sqs_queue_name_dl}} > ${var.dl_critical}" From e2c3531324c6ff0c70ac2026b259751a0479aa01 Mon Sep 17 00:00:00 2001 From: kuntalkumarbasu Date: Wed, 10 Apr 2024 17:24:24 -0400 Subject: [PATCH 5/7] fix: ensuring the queues have recievemessage enabled --- main.tf | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/main.tf b/main.tf index e0781b2..5bad550 100644 --- a/main.tf +++ b/main.tf @@ -129,7 +129,7 @@ data "aws_iam_policy_document" "oxbow_lambda_fifo_sqs" { type = "*" identifiers = ["*"] } - actions = ["sqs:SendMessage"] + actions = ["sqs:SendMessage", "sqs:ReceiveMessage"] # Hard-coding an ARN like syntax here because of the dependency cycle resources = ["arn:aws:sqs:*:*:${var.sqs_fifo_queue_name}.fifo"] @@ -150,9 +150,7 @@ data "aws_iam_policy_document" "oxbow_lambda_fifo_sqs_dlq" { type = "AWS" identifiers = ["*"] } - actions = [ - "sqs:SendMessage" - ] + actions = ["sqs:SendMessage", "sqs:ReceiveMessage"] resources = ["arn:aws:sqs:*:*:${var.sqs_fifo_DL_queue_name}"] condition { test = "ForAllValues:StringEquals" @@ -200,7 +198,7 @@ data "aws_iam_policy_document" "group_event_lambda_sqs" { type = "*" identifiers = ["*"] } - actions = ["sqs:SendMessage"] + actions = ["sqs:SendMessage", "sqs:ReceiveMessage"] # Hard-coding an ARN like syntax here because of the dependency cycle resources = ["arn:aws:sqs:*:*:${var.sqs_group_queue_name}"] condition { @@ -220,9 +218,7 @@ data "aws_iam_policy_document" "group_event_lambda_sqs_dlq" { type = "AWS" identifiers = ["*"] } - actions = [ - "sqs:SendMessage" - ] + actions = ["sqs:SendMessage", "sqs:ReceiveMessage"] resources = ["arn:aws:sqs:*:*:${var.sqs_group_DL_queue_name}"] condition { test = "ForAllValues:StringEquals" From 6cafe4bb0698da493bf355b40aee86e7ec703c62 Mon Sep 17 00:00:00 2001 From: kuntalkumarbasu Date: Wed, 10 Apr 2024 17:45:59 -0400 Subject: [PATCH 6/7] fix:fixing the visibility timeouts for the oxbow fifo queue --- main.tf | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/main.tf b/main.tf index 5bad550..736ec2e 100644 --- a/main.tf +++ b/main.tf @@ -163,13 +163,14 @@ data "aws_iam_policy_document" "oxbow_lambda_fifo_sqs_dlq" { } resource "aws_sqs_queue" "oxbow_lambda_fifo_sqs" { - count = local.enable_group_events ? 1 : 0 - name = "${var.sqs_fifo_queue_name}.fifo" - policy = data.aws_iam_policy_document.oxbow_lambda_fifo_sqs[0].json - + count = local.enable_group_events ? 1 : 0 + name = "${var.sqs_fifo_queue_name}.fifo" + policy = data.aws_iam_policy_document.oxbow_lambda_fifo_sqs[0].json + visibility_timeout_seconds = var.sqs_visibility_timeout_seconds + delay_seconds = var.sqs_delay_seconds content_based_deduplication = true fifo_queue = true - + tags = var.tags redrive_policy = jsonencode({ deadLetterTargetArn = aws_sqs_queue.oxbow_lambda_fifo_sqs_dlq[0].arn maxReceiveCount = 8 @@ -181,12 +182,14 @@ resource "aws_sqs_queue" "oxbow_lambda_fifo_sqs_dlq" { name = "${var.sqs_fifo_DL_queue_name}.fifo" policy = data.aws_iam_policy_document.oxbow_lambda_fifo_sqs_dlq[0].json fifo_queue = true + tags = var.tags } resource "aws_lambda_event_source_mapping" "group_events_lambda_sqs_trigger" { count = local.enable_group_events ? 1 : 0 event_source_arn = aws_sqs_queue.group_events_lambda_sqs[0].arn function_name = aws_lambda_function.group_events_lambda[0].arn + } @@ -232,20 +235,23 @@ data "aws_iam_policy_document" "group_event_lambda_sqs_dlq" { resource "aws_sqs_queue" "group_events_lambda_sqs" { - count = local.enable_group_events ? 1 : 0 - name = var.sqs_group_queue_name - policy = var.sns_topic_arn == "" ? data.aws_iam_policy_document.group_event_lambda_sqs[0].json : data.aws_iam_policy_document.this_sns_to_sqs[0].json - + count = local.enable_group_events ? 1 : 0 + name = var.sqs_group_queue_name + policy = var.sns_topic_arn == "" ? data.aws_iam_policy_document.group_event_lambda_sqs[0].json : data.aws_iam_policy_document.this_sns_to_sqs[0].json + visibility_timeout_seconds = var.sqs_visibility_timeout_seconds + delay_seconds = var.sqs_delay_seconds redrive_policy = jsonencode({ deadLetterTargetArn = aws_sqs_queue.group_events_lambda_sqs_dlq[0].arn maxReceiveCount = 8 }) + tags = var.tags } resource "aws_sqs_queue" "group_events_lambda_sqs_dlq" { count = local.enable_group_events ? 1 : 0 policy = data.aws_iam_policy_document.group_event_lambda_sqs_dlq[0].json name = var.sqs_group_DL_queue_name + tags = var.tags } @@ -367,7 +373,7 @@ resource "aws_iam_policy" "this_lambda_permissions" { }, { Action = ["sqs:*"] - Resource = local.enable_group_events ? aws_sqs_queue.group_events_lambda_sqs[0].arn : aws_sqs_queue.this_sqs[0].arn + Resource = local.enable_group_events ? [aws_sqs_queue.group_events_lambda_sqs[0].arn, aws_sqs_queue.oxbow_lambda_fifo_sqs[0].arn] : [aws_sqs_queue.this_sqs[0].arn] Effect = "Allow" }, { From 3f4c7c1bab359e376369cd8c24fe5c6e798282aa Mon Sep 17 00:00:00 2001 From: kuntalkumarbasu Date: Wed, 10 Apr 2024 18:49:59 -0400 Subject: [PATCH 7/7] fixing UNWRAP_SNS_ENVELOPE value when group event is enabled --- main.tf | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/main.tf b/main.tf index 736ec2e..f0a7357 100644 --- a/main.tf +++ b/main.tf @@ -96,7 +96,7 @@ resource "aws_lambda_function" "this_lambda" { AWS_S3_LOCKING_PROVIDER = var.aws_s3_locking_provider RUST_LOG = "deltalake=${var.rust_log_deltalake_debug_level},oxbow=${var.rust_log_oxbow_debug_level}" DYNAMO_LOCK_TABLE_NAME = var.dynamodb_table_name - UNWRAP_SNS_ENVELOPE = var.sns_topic_arn == "" ? false : true + UNWRAP_SNS_ENVELOPE = var.enable_group_events == true ? false : var.sns_topic_arn == "" ? false : true } } tags = var.tags @@ -114,8 +114,9 @@ resource "aws_lambda_function" "group_events_lambda" { environment { variables = { - RUST_LOG = "group-events=${var.rust_log_oxbow_debug_level}" - QUEUE_URL = aws_sqs_queue.oxbow_lambda_fifo_sqs[0].url + RUST_LOG = var.rust_log_oxbow_debug_level + QUEUE_URL = aws_sqs_queue.oxbow_lambda_fifo_sqs[0].url + UNWRAP_SNS_ENVELOPE = var.sns_topic_arn == "" ? false : true } } }