Skip to content
Merged
160 changes: 129 additions & 31 deletions main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ resource "aws_lambda_function" "this_lambda" {
AWS_S3_LOCKING_PROVIDER = var.aws_s3_locking_provider
RUST_LOG = "deltalake=${var.rust_log_deltalake_debug_level},oxbow=${var.rust_log_oxbow_debug_level}"
DYNAMO_LOCK_TABLE_NAME = var.dynamodb_table_name
UNWRAP_SNS_ENVELOPE = var.sns_topic_arn == "" ? false : true
UNWRAP_SNS_ENVELOPE = var.enable_group_events == true ? false : var.sns_topic_arn == "" ? false : true
}
}
tags = var.tags
}

resource "aws_lambda_function" "this_group_events_lambda" {
#### This lambda is optional and used only when grouping of events is required
resource "aws_lambda_function" "group_events_lambda" {
count = local.enable_group_events ? 1 : 0
description = "Group events for oxbow based on the table prefix"
s3_key = var.events_lambda_s3_key
Expand All @@ -114,56 +114,153 @@ resource "aws_lambda_function" "this_group_events_lambda" {

environment {
variables = {
RUST_LOG = "group-events=${var.rust_log_oxbow_debug_level}"
QUEUE_URL = aws_sqs_queue.this_sqs_fifo[0].url
RUST_LOG = var.rust_log_oxbow_debug_level
QUEUE_URL = aws_sqs_queue.oxbow_lambda_fifo_sqs[0].url
UNWRAP_SNS_ENVELOPE = var.sns_topic_arn == "" ? false : true
}
}
}

resource "aws_sqs_queue" "this_sqs_fifo" {
count = local.enable_group_events ? 1 : 0
name = "${var.sqs_fifo_queue_name}.fifo"
policy = data.aws_iam_policy_document.this_sqs_queue_policy_data.json

data "aws_iam_policy_document" "oxbow_lambda_fifo_sqs" {
count = local.enable_group_events ? 1 : 0
statement {
effect = "Allow"
principals {
type = "*"
identifiers = ["*"]
}
actions = ["sqs:SendMessage", "sqs:ReceiveMessage"]
# Hard-coding an ARN like syntax here because of the dependency cycle
resources = ["arn:aws:sqs:*:*:${var.sqs_fifo_queue_name}.fifo"]

condition {
test = "ArnEquals"
variable = "aws:SourceArn"
values = [var.warehouse_bucket_arn]
}
}
}

data "aws_iam_policy_document" "oxbow_lambda_fifo_sqs_dlq" {
count = local.enable_group_events ? 1 : 0
statement {
sid = "DLQSendMessages"
effect = "Allow"
principals {
type = "AWS"
identifiers = ["*"]
}
actions = ["sqs:SendMessage", "sqs:ReceiveMessage"]
resources = ["arn:aws:sqs:*:*:${var.sqs_fifo_DL_queue_name}"]
condition {
test = "ForAllValues:StringEquals"
variable = "aws:SourceArn"
values = [
"arn:aws:sqs:*:*:${var.sqs_fifo_DL_queue_name}.fifo"
]
}
}
}

resource "aws_sqs_queue" "oxbow_lambda_fifo_sqs" {
count = local.enable_group_events ? 1 : 0
name = "${var.sqs_fifo_queue_name}.fifo"
policy = data.aws_iam_policy_document.oxbow_lambda_fifo_sqs[0].json
visibility_timeout_seconds = var.sqs_visibility_timeout_seconds
delay_seconds = var.sqs_delay_seconds
content_based_deduplication = true
fifo_queue = true

tags = var.tags
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.this_sqs_fifo_dlq[0].arn
deadLetterTargetArn = aws_sqs_queue.oxbow_lambda_fifo_sqs_dlq[0].arn
maxReceiveCount = 8
})
}

resource "aws_sqs_queue" "this_sqs_fifo_dlq" {
resource "aws_sqs_queue" "oxbow_lambda_fifo_sqs_dlq" {
count = local.enable_group_events ? 1 : 0
name = "${var.sqs_fifo_DL_queue_name}.fifo"
policy = data.aws_iam_policy_document.oxbow_lambda_fifo_sqs_dlq[0].json
fifo_queue = true
tags = var.tags
}

resource "aws_lambda_event_source_mapping" "this_group_events_trigger" {
resource "aws_lambda_event_source_mapping" "group_events_lambda_sqs_trigger" {
count = local.enable_group_events ? 1 : 0
event_source_arn = aws_sqs_queue.this_group_events[0].arn
function_name = aws_lambda_function.this_group_events_lambda[0].arn
event_source_arn = aws_sqs_queue.group_events_lambda_sqs[0].arn
function_name = aws_lambda_function.group_events_lambda[0].arn

}

resource "aws_sqs_queue" "this_group_events" {
count = local.enable_group_events ? 1 : 0
name = var.sqs_group_queue_name
policy = data.aws_iam_policy_document.this_sqs_queue_policy_data.json

data "aws_iam_policy_document" "group_event_lambda_sqs" {
count = local.enable_group_events ? 1 : 0
statement {
effect = "Allow"
principals {
type = "*"
identifiers = ["*"]
}
actions = ["sqs:SendMessage", "sqs:ReceiveMessage"]
# Hard-coding an ARN like syntax here because of the dependency cycle
resources = ["arn:aws:sqs:*:*:${var.sqs_group_queue_name}"]
condition {
test = "ArnEquals"
variable = "aws:SourceArn"
values = [var.warehouse_bucket_arn]
}
}
}

data "aws_iam_policy_document" "group_event_lambda_sqs_dlq" {
count = local.enable_group_events ? 1 : 0
statement {
sid = "DLQSendMessages"
effect = "Allow"
principals {
type = "AWS"
identifiers = ["*"]
}
actions = ["sqs:SendMessage", "sqs:ReceiveMessage"]
resources = ["arn:aws:sqs:*:*:${var.sqs_group_DL_queue_name}"]
condition {
test = "ForAllValues:StringEquals"
variable = "aws:SourceArn"
values = [
"arn:aws:sqs:*:*:${var.sqs_group_queue_name}"
]
}
}
}


resource "aws_sqs_queue" "group_events_lambda_sqs" {
count = local.enable_group_events ? 1 : 0
name = var.sqs_group_queue_name
policy = var.sns_topic_arn == "" ? data.aws_iam_policy_document.group_event_lambda_sqs[0].json : data.aws_iam_policy_document.this_sns_to_sqs[0].json
visibility_timeout_seconds = var.sqs_visibility_timeout_seconds
delay_seconds = var.sqs_delay_seconds
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.this_group_events_dlq[0].arn
deadLetterTargetArn = aws_sqs_queue.group_events_lambda_sqs_dlq[0].arn
maxReceiveCount = 8
})
tags = var.tags
}

resource "aws_sqs_queue" "this_group_events_dlq" {
count = local.enable_group_events ? 1 : 0
name = var.sqs_group_DL_queue_name
resource "aws_sqs_queue" "group_events_lambda_sqs_dlq" {
count = local.enable_group_events ? 1 : 0
policy = data.aws_iam_policy_document.group_event_lambda_sqs_dlq[0].json
name = var.sqs_group_DL_queue_name
tags = var.tags
}



### This is to ensure we are triggering oxbow lambda properly whether grou event is enable or not
### if group event is enabled we are using the fifo queue populated by group events as a source for oxbow
resource "aws_lambda_event_source_mapping" "this_lambda_events" {
event_source_arn = local.enable_group_events ? aws_sqs_queue.this_sqs_fifo[0].arn : aws_sqs_queue.this_sqs[0].arn
event_source_arn = local.enable_group_events ? aws_sqs_queue.oxbow_lambda_fifo_sqs[0].arn : aws_sqs_queue.this_sqs[0].arn
function_name = aws_lambda_function.this_lambda.arn
}

Expand Down Expand Up @@ -192,7 +289,7 @@ resource "aws_sns_topic_subscription" "this_sns_sub" {

topic_arn = var.sns_topic_arn
protocol = "sqs"
endpoint = aws_sqs_queue.this_sqs[0].arn
endpoint = local.enable_group_events ? aws_sqs_queue.group_events_lambda_sqs[0].arn : aws_sqs_queue.this_sqs[0].arn
}

resource "aws_lambda_permission" "this_lambda_allow_bucket_permissions" {
Expand All @@ -209,7 +306,7 @@ resource "aws_s3_bucket_notification" "this_bucket_notification" {
count = local.enable_bucket_notification ? 1 : 0
bucket = var.warehouse_bucket_name
queue {
queue_arn = local.enable_group_events ? aws_sqs_queue.this_group_events[0].arn : aws_sqs_queue.this_sqs[0].arn
queue_arn = local.enable_group_events ? aws_sqs_queue.group_events_lambda_sqs[0].arn : aws_sqs_queue.this_sqs[0].arn
events = ["s3:ObjectCreated:*"]
filter_suffix = ".parquet"
filter_prefix = "${var.s3_path}/"
Expand Down Expand Up @@ -277,7 +374,7 @@ resource "aws_iam_policy" "this_lambda_permissions" {
},
{
Action = ["sqs:*"]
Resource = local.enable_group_events ? aws_sqs_queue.this_group_events[0].arn : aws_sqs_queue.this_sqs[0].arn
Resource = local.enable_group_events ? [aws_sqs_queue.group_events_lambda_sqs[0].arn, aws_sqs_queue.oxbow_lambda_fifo_sqs[0].arn] : [aws_sqs_queue.this_sqs[0].arn]
Effect = "Allow"
},
{
Expand All @@ -302,7 +399,7 @@ data "aws_iam_policy_document" "this_sqs_queue_policy_data" {
}
actions = ["sqs:SendMessage"]
# Hard-coding an ARN like syntax here because of the dependency cycle
resources = local.enable_group_events ? ["arn:aws:sqs:*:*:${var.sqs_group_queue_name}", "arn:aws:sqs:*:*:${var.sqs_fifo_queue_name}.fifo"] : ["arn:aws:sqs:*:*:${var.sqs_queue_name}"]
resources = ["arn:aws:sqs:*:*:${var.sqs_queue_name}"]

condition {
test = "ArnEquals"
Expand All @@ -312,6 +409,7 @@ data "aws_iam_policy_document" "this_sqs_queue_policy_data" {
}
}


data "aws_iam_policy_document" "this_sns_to_sqs" {
count = var.sns_topic_arn == "" ? 0 : 1

Expand All @@ -322,7 +420,7 @@ data "aws_iam_policy_document" "this_sns_to_sqs" {
identifiers = ["*"]
}
actions = ["sqs:SendMessage"]
resources = local.enable_group_events ? ["arn:aws:sqs:*:*:${var.sqs_group_queue_name}", "arn:aws:sqs:*:*:${var.sqs_fifo_queue_name}.fifo"] : ["arn:aws:sqs:*:*:${var.sqs_queue_name}"]
resources = local.enable_group_events ? ["arn:aws:sqs:*:*:${var.sqs_group_queue_name}"] : ["arn:aws:sqs:*:*:${var.sqs_queue_name}"]
condition {
test = "ArnEquals"
variable = "aws:SourceArn"
Expand All @@ -332,7 +430,6 @@ data "aws_iam_policy_document" "this_sns_to_sqs" {

}


data "aws_iam_policy_document" "this_dead_letter_queue_policy" {
statement {
sid = "DLQSendMessages"
Expand All @@ -344,7 +441,7 @@ data "aws_iam_policy_document" "this_dead_letter_queue_policy" {
actions = [
"sqs:SendMessage"
]
resources = local.enable_group_events ? ["arn:aws:sqs:*:*:${var.sqs_group_DL_queue_name}", "arn:aws:sqs:*:*:${var.sqs_fifo_DL_queue_name}.fifo"] : ["arn:aws:sqs:*:*:${var.sqs_queue_name_dl}"]
resources = ["arn:aws:sqs:*:*:${var.sqs_queue_name_dl}"]
condition {
test = "ForAllValues:StringEquals"
variable = "aws:SourceArn"
Expand All @@ -355,6 +452,7 @@ data "aws_iam_policy_document" "this_dead_letter_queue_policy" {
}
}


data "aws_iam_policy_document" "this_kinesis_policy_data" {
count = local.enable_kinesis_firehose_delivery_stream ? 1 : 0
statement {
Expand Down
2 changes: 1 addition & 1 deletion monitoring.tf
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ resource "datadog_monitor" "dead_letters_monitor" {
type = "metric alert"
name = "${var.sqs_queue_name_dl}-monitor"
message = templatefile("${path.module}/templates/dl_monitor.tmpl", {
dead_letters_queue_name = var.sqs_queue_name_dl
dead_letters_queue_name = local.enable_group_events ? var.sqs_fifo_DL_queue_name : var.sqs_queue_name_dl
notify = join(", ", var.dl_alert_recipients)
})
query = "avg(last_1h):avg:aws.sqs.approximate_number_of_messages_visible{queuename:${var.sqs_queue_name_dl}} > ${var.dl_critical}"
Expand Down
2 changes: 1 addition & 1 deletion outputs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ output "lambda_arn" {

output "sqs_queue_arn" {
description = "SQSqueue ARN"
value = length(aws_sqs_queue.this_sqs) > 0 ? aws_sqs_queue.this_sqs[0].arn : aws_sqs_queue.this_sqs_fifo[0].arn
value = length(aws_sqs_queue.this_sqs) > 0 ? aws_sqs_queue.this_sqs[0].arn : aws_sqs_queue.oxbow_lambda_fifo_sqs[0].arn
}

output "autotag_sqs_arn" {
Expand Down