Skip to content

Commit f3b2100

Browse files
committed
DATAPLAT-260: integrate glue-sync lambda
1 parent 80ea99e commit f3b2100

File tree

3 files changed

+335
-3
lines changed

3 files changed

+335
-3
lines changed

main.tf

Lines changed: 287 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
# This module creates Kinesis Firehose service (optionally), SQS, lambda function OXBOW
22
# to receive data and convert it into parquet then Delta log is added by Oxbow lambda
33
data "aws_caller_identity" "current" {}
4+
data "aws_region" "current" {}
45

56
locals {
67
enable_aws_glue_catalog_table = var.enable_aws_glue_catalog_table
78
enable_kinesis_firehose_delivery_stream = var.enable_kinesis_firehose_delivery_stream
89
enable_bucket_notification = var.enable_bucket_notification
910
enable_group_events = var.enable_group_events
11+
enable_glue_create = var.enable_glue_create
1012
}
1113

1214

@@ -235,7 +237,6 @@ data "aws_iam_policy_document" "group_event_lambda_sqs_dlq" {
235237
}
236238
}
237239

238-
239240
resource "aws_sqs_queue" "group_events_lambda_sqs" {
240241
count = local.enable_group_events ? 1 : 0
241242
name = var.sqs_group_queue_name
@@ -258,7 +259,7 @@ resource "aws_sqs_queue" "group_events_lambda_sqs_dlq" {
258259

259260

260261

261-
### This is to ensure we are triggering oxbow lambda properly whether grou event is enable or not
262+
### This is to ensure we are triggering oxbow lambda properly whether group event is enable or not
262263
### if group event is enabled we are using the fifo queue populated by group events as a source for oxbow
263264
resource "aws_lambda_event_source_mapping" "this_lambda_events" {
264265
event_source_arn = local.enable_group_events ? aws_sqs_queue.oxbow_lambda_fifo_sqs[0].arn : aws_sqs_queue.this_sqs[0].arn
@@ -536,3 +537,287 @@ resource "aws_dynamodb_table" "this_oxbow_locking" {
536537
}
537538
tags = var.tags
538539
}
540+
541+
# glue-create lambda resource
542+
module "glue_create_athena_workgroup_bucket" {
543+
count = local.enable_glue_create ? 1 : 0
544+
545+
source = "terraform-aws-modules/s3-bucket/aws"
546+
version = "4.1.2"
547+
bucket = var.glue_create_config.athena_bucket_name
548+
block_public_acls = true
549+
block_public_policy = true
550+
ignore_public_acls = true
551+
restrict_public_buckets = true
552+
control_object_ownership = true
553+
object_ownership = "BucketOwnerEnforced"
554+
tags = var.tags
555+
versioning = {
556+
enabled = false
557+
}
558+
}
559+
560+
resource "aws_athena_workgroup" "glue_create" {
561+
count = local.enable_glue_create ? 1 : 0
562+
563+
name = var.glue_create_config.athena_workgroup_name
564+
tags = var.tags
565+
configuration {
566+
enforce_workgroup_configuration = true
567+
publish_cloudwatch_metrics_enabled = false
568+
569+
result_configuration {
570+
output_location = "s3://${module.glue_create_athena_workgroup_bucket[0].s3_bucket_id}/"
571+
}
572+
}
573+
}
574+
575+
data "aws_iam_policy_document" "glue_create_sqs" {
576+
count = local.enable_glue_create ? 1 : 0
577+
578+
statement {
579+
effect = "Allow"
580+
principals {
581+
type = "*"
582+
identifiers = ["*"]
583+
}
584+
actions = ["sqs:SendMessage"]
585+
resources = ["arn:aws:sqs:*:*:${var.glue_create_config.sqs_queue_name}"]
586+
condition {
587+
test = "ArnEquals"
588+
variable = "aws:SourceArn"
589+
values = [var.glue_create_config.sns_topic_arn]
590+
}
591+
}
592+
}
593+
594+
data "aws_iam_policy_document" "glue_create_sqs_dl" {
595+
count = local.enable_glue_create ? 1 : 0
596+
597+
statement {
598+
effect = "Allow"
599+
principals {
600+
type = "AWS"
601+
identifiers = ["*"]
602+
}
603+
actions = ["sqs:SendMessage"]
604+
resources = ["arn:aws:sqs:*:*:${var.glue_create_config.sqs_queue_name_dl}"]
605+
condition {
606+
test = "ForAllValues:StringEquals"
607+
variable = "aws:SourceArn"
608+
values = ["arn:aws:sqs:*:*:${var.glue_create_config.sqs_queue_name}"]
609+
}
610+
}
611+
}
612+
613+
resource "aws_sqs_queue" "glue_create" {
614+
count = local.enable_glue_create ? 1 : 0
615+
616+
name = var.glue_create_config.sqs_queue_name
617+
policy = data.aws_iam_policy_document.glue_create_sqs[0].json
618+
visibility_timeout_seconds = var.sqs_visibility_timeout_seconds
619+
delay_seconds = var.sqs_delay_seconds
620+
redrive_policy = jsonencode({
621+
deadLetterTargetArn = aws_sqs_queue.glue_create_dl[0].arn
622+
maxReceiveCount = var.sqs_redrive_policy_maxReceiveCount
623+
})
624+
tags = var.tags
625+
}
626+
627+
resource "aws_sqs_queue" "glue_create_dl" {
628+
count = local.enable_glue_create ? 1 : 0
629+
630+
name = var.glue_create_config.sqs_queue_name_dl
631+
policy = data.aws_iam_policy_document.glue_create_sqs_dl[0].json
632+
tags = var.tags
633+
}
634+
635+
resource "aws_sqs_queue_redrive_allow_policy" "terraform_queue_redrive_allow_policy" {
636+
count = local.enable_glue_create ? 1 : 0
637+
638+
queue_url = aws_sqs_queue.glue_create_dl[0].id
639+
redrive_allow_policy = jsonencode({
640+
redrivePermission = "byQueue",
641+
sourceQueueArns = [aws_sqs_queue.glue_create[0].arn]
642+
})
643+
}
644+
645+
resource "aws_sns_topic_subscription" "glue_create_sns_sub" {
646+
count = local.enable_glue_create ? 1 : 0
647+
648+
topic_arn = var.glue_create_config.sns_topic_arn
649+
protocol = "sqs"
650+
endpoint = aws_sqs_queue.glue_create[0].arn
651+
}
652+
653+
data "aws_iam_policy_document" "glue_create_assume" {
654+
count = local.enable_glue_create ? 1 : 0
655+
656+
statement {
657+
effect = "Allow"
658+
principals {
659+
type = "Service"
660+
identifiers = ["lambda.amazonaws.com"]
661+
}
662+
actions = [
663+
"sts:AssumeRole",
664+
]
665+
}
666+
}
667+
668+
data "aws_iam_policy_document" "glue_create" {
669+
count = local.enable_glue_create ? 1 : 0
670+
671+
statement {
672+
sid = "AthenaWorkgroupAthenaRW"
673+
actions = [
674+
"athena:StartQueryExecution",
675+
"athena:GetQueryResults",
676+
"athena:GetWorkGroup",
677+
"athena:StopQueryExecution",
678+
"athena:GetQueryExecution",
679+
]
680+
resources = [
681+
aws_athena_workgroup.glue_create[0].arn
682+
]
683+
effect = "Allow"
684+
}
685+
statement {
686+
sid = "AthenaWorkgroupS3RW"
687+
effect = "Allow"
688+
actions = [
689+
"s3:PutObject",
690+
"s3:GetObject",
691+
"s3:AbortMultipartUpload",
692+
"s3:GetBucketLocation"
693+
]
694+
resources = [
695+
"${module.glue_create_athena_workgroup_bucket[0].s3_bucket_arn}/*",
696+
module.glue_create_athena_workgroup_bucket[0].s3_bucket_arn
697+
]
698+
}
699+
statement {
700+
sid = "AthenaWorkgroupList1"
701+
effect = "Allow"
702+
actions = ["athena:ListWorkGroups"]
703+
resources = ["*"]
704+
}
705+
statement {
706+
sid = "GlueAllowTables"
707+
effect = "Allow"
708+
actions = [
709+
"glue:GetTable",
710+
"glue:GetTables",
711+
"glue:GetPartitions",
712+
"glue:CreateTable"
713+
]
714+
resources = [
715+
"arn:aws:glue:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:catalog",
716+
"arn:aws:glue:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:database/*",
717+
"arn:aws:glue:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:table/*"
718+
]
719+
}
720+
statement {
721+
sid = "GlueCatalogAllowDatabases"
722+
effect = "Allow"
723+
actions = [
724+
"glue:GetDatabase",
725+
"glue:GetDatabases",
726+
]
727+
resources = [
728+
"*"
729+
]
730+
}
731+
statement {
732+
sid = "TableExtLocS3RO"
733+
effect = "Allow"
734+
actions = [
735+
"s3:GetObject",
736+
"s3:GetObjectTagging",
737+
"s3:GetObjectVersion",
738+
"s3:GetBucketLocation",
739+
"s3:ListBucket",
740+
"s3:ListBucketVersions"
741+
]
742+
resources = [
743+
var.warehouse_bucket_arn,
744+
"${var.warehouse_bucket_arn}/${var.s3_path}/*"
745+
]
746+
}
747+
statement {
748+
effect = "Allow"
749+
principals {
750+
type = "*"
751+
identifiers = ["*"]
752+
}
753+
actions = ["sqs:ReceiveMessage"]
754+
resources = [aws_sqs_queue.glue_create[0].arn]
755+
756+
condition {
757+
test = "ArnEquals"
758+
variable = "aws:SourceArn"
759+
values = [var.warehouse_bucket_arn]
760+
}
761+
}
762+
statement {
763+
effect = "Allow"
764+
principals {
765+
type = "AWS"
766+
identifiers = ["*"]
767+
}
768+
actions = [
769+
"sqs:SendMessage"
770+
]
771+
resources = [aws_sqs_queue.glue_create[0].arn]
772+
condition {
773+
test = "ForAllValues:StringEquals"
774+
variable = "aws:SourceArn"
775+
values = [aws_sqs_queue.glue_create_dl[0].arn]
776+
}
777+
}
778+
}
779+
780+
resource "aws_iam_policy" "glue_create_managed" {
781+
count = local.enable_glue_create ? 1 : 0
782+
783+
name = var.glue_create_config.iam_police_name
784+
description = "Glue create policy allows access to Athena and S3"
785+
policy = data.aws_iam_policy_document.glue_create[0].json
786+
tags = var.tags
787+
}
788+
789+
resource "aws_iam_role" "glue_create" {
790+
name = var.glue_create_config.iam_role_name
791+
assume_role_policy = data.aws_iam_policy_document.glue_create_assume[0].json
792+
managed_policy_arns = [aws_iam_policy.glue_create_managed[0].arn]
793+
tags = var.tags
794+
}
795+
796+
resource "aws_lambda_function" "glue_create_lambda" {
797+
count = local.enable_glue_create ? 1 : 0
798+
799+
description = "Greate tables in AWS Glue catalog based on the table prefix"
800+
s3_key = var.glue_create_config.lambda_s3_key
801+
s3_bucket = var.glue_create_config.lambda_s3_bucket
802+
function_name = var.glue_create_config.lambda_function_name
803+
role = aws_iam_role.glue_create.arn
804+
handler = "provided"
805+
runtime = "provided.al2"
806+
807+
environment {
808+
variables = {
809+
RUST_LOG = var.rust_log_oxbow_debug_level
810+
ATHENA_WORKGROUP = var.glue_create_config.athena_workgroup_name
811+
ATHENA_DATA_SOURCE = var.glue_create_config.athena_data_source
812+
GLUE_PATH_REGEX = var.glue_create_config.path_regex
813+
UNWRAP_SNS_ENVELOPE = true
814+
}
815+
}
816+
}
817+
818+
resource "aws_lambda_event_source_mapping" "glue_create" {
819+
count = local.enable_glue_create ? 1 : 0
820+
821+
event_source_arn = aws_sqs_queue.glue_create[0].arn
822+
function_name = aws_lambda_function.glue_create_lambda[0].arn
823+
}

monitoring.tf

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,26 @@ resource "datadog_monitor" "dead_letters_monitor" {
2727
renotify_interval = 60
2828
tags = var.tags_monitoring
2929
}
30+
31+
resource "datadog_monitor" "dead_letters_monitor_glue_create" {
32+
count = local.enable_dead_letters_monitoring && local.enable_glue_create ? 1 : 0
33+
34+
type = "metric alert"
35+
name = "${var.glue_create_config.sqs_queue_name_dl}-monitor"
36+
message = templatefile("${path.module}/templates/dl_monitor.tmpl", {
37+
dead_letters_queue_name = var.glue_create_config.sqs_queue_name_dl
38+
notify = join(", ", var.dl_alert_recipients)
39+
})
40+
query = "avg(last_1h):avg:aws.sqs.approximate_number_of_messages_visible{queuename:${var.glue_create_config.sqs_queue_name_dl}} > ${var.dl_critical}"
41+
42+
monitor_thresholds {
43+
warning = local.dl_warning
44+
warning_recovery = local.dl_warning - 1
45+
critical = local.dl_critical
46+
critical_recovery = local.dl_critical - 1
47+
}
48+
49+
notify_no_data = false
50+
renotify_interval = 60
51+
tags = var.tags_monitoring
52+
}

variables.tf

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,30 @@ variable "enable_auto_tagging" {
294294

295295
variable "sns_topic_arn" {
296296
type = string
297-
description = "Optional arn to enable the SNS subscription and ENV var for Oxbo"
297+
description = "Optional arn to enable the SNS subscription and ENV var for Oxbow"
298298
default = ""
299299
}
300+
301+
variable "enable_glue_create" {
302+
type = bool
303+
description = "Whether to turn on Glue create Lambda"
304+
default = false
305+
}
306+
307+
variable "glue_create_config" {
308+
type = object({
309+
athena_workgroup_name = string // Name of AWS Athena workgroup
310+
athena_data_source = string // Arn name of AWS Athena data source (catalog)
311+
athena_bucket_name = string // name of AWS Athena bucket.
312+
lambda_s3_key = string // lambda s3 key - lambda path on S3 and file name filename
313+
lambda_s3_bucket = string // lambda s3 bucket where lambda is stored
314+
lambda_function_name = string // lambda function name
315+
path_regex = string // regexp for mapping s3 path to database/table
316+
sns_topic_arn = string // sns topic arn with s3 events (source for lambda)
317+
sqs_queue_name = string // name of sqs queue for glue-sync lambda
318+
sqs_queue_name_dl = string // name dead letter sqs que with not processed s3 events
319+
iam_role_name = string // lambda role name
320+
iam_police_name = string // lambda policy name
321+
})
322+
description = "Configuration of glue-create lambda"
323+
}

0 commit comments

Comments
 (0)