Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
290 changes: 288 additions & 2 deletions main.tf
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# This module creates Kinesis Firehose service (optionally), SQS, lambda function OXBOW
# to receive data and convert it into parquet then Delta log is added by Oxbow lambda
data "aws_caller_identity" "current" {}
data "aws_region" "current" {}

locals {
enable_aws_glue_catalog_table = var.enable_aws_glue_catalog_table
enable_kinesis_firehose_delivery_stream = var.enable_kinesis_firehose_delivery_stream
enable_bucket_notification = var.enable_bucket_notification
enable_group_events = var.enable_group_events
enable_glue_create = var.enable_glue_create
}


Expand Down Expand Up @@ -235,7 +237,6 @@ data "aws_iam_policy_document" "group_event_lambda_sqs_dlq" {
}
}


resource "aws_sqs_queue" "group_events_lambda_sqs" {
count = local.enable_group_events ? 1 : 0
name = var.sqs_group_queue_name
Expand All @@ -258,7 +259,7 @@ resource "aws_sqs_queue" "group_events_lambda_sqs_dlq" {



### This is to ensure we are triggering oxbow lambda properly whether grou event is enable or not
### This is to ensure we are triggering oxbow lambda properly whether group event is enable or not
### if group event is enabled we are using the fifo queue populated by group events as a source for oxbow
resource "aws_lambda_event_source_mapping" "this_lambda_events" {
event_source_arn = local.enable_group_events ? aws_sqs_queue.oxbow_lambda_fifo_sqs[0].arn : aws_sqs_queue.this_sqs[0].arn
Expand Down Expand Up @@ -536,3 +537,288 @@ resource "aws_dynamodb_table" "this_oxbow_locking" {
}
tags = var.tags
}

# glue-create lambda resource
module "glue_create_athena_workgroup_bucket" {
count = local.enable_glue_create ? 1 : 0

source = "terraform-aws-modules/s3-bucket/aws"
version = "4.1.2"
bucket = var.glue_create_config.athena_bucket_name
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
control_object_ownership = true
object_ownership = "BucketOwnerEnforced"
tags = var.tags
versioning = {
enabled = false
}
}

resource "aws_athena_workgroup" "glue_create" {
count = local.enable_glue_create ? 1 : 0

name = var.glue_create_config.athena_workgroup_name
tags = var.tags
configuration {
enforce_workgroup_configuration = true
publish_cloudwatch_metrics_enabled = false

result_configuration {
output_location = "s3://${module.glue_create_athena_workgroup_bucket[0].s3_bucket_id}/"
}
}
depends_on = [module.glue_create_athena_workgroup_bucket]
}

data "aws_iam_policy_document" "glue_create_sqs" {
count = local.enable_glue_create ? 1 : 0

statement {
effect = "Allow"
principals {
type = "*"
identifiers = ["*"]
}
actions = ["sqs:SendMessage"]
resources = ["arn:aws:sqs:*:*:${var.glue_create_config.sqs_queue_name}"]
condition {
test = "ArnEquals"
variable = "aws:SourceArn"
values = [var.glue_create_config.sns_topic_arn]
}
}
}

data "aws_iam_policy_document" "glue_create_sqs_dl" {
count = local.enable_glue_create ? 1 : 0

statement {
effect = "Allow"
principals {
type = "AWS"
identifiers = ["*"]
}
actions = ["sqs:SendMessage"]
resources = ["arn:aws:sqs:*:*:${var.glue_create_config.sqs_queue_name_dl}"]
condition {
test = "ForAllValues:StringEquals"
variable = "aws:SourceArn"
values = ["arn:aws:sqs:*:*:${var.glue_create_config.sqs_queue_name}"]
}
}
}

resource "aws_sqs_queue" "glue_create" {
count = local.enable_glue_create ? 1 : 0

name = var.glue_create_config.sqs_queue_name
policy = data.aws_iam_policy_document.glue_create_sqs[0].json
visibility_timeout_seconds = var.sqs_visibility_timeout_seconds
delay_seconds = var.sqs_delay_seconds
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.glue_create_dl[0].arn
maxReceiveCount = var.sqs_redrive_policy_maxReceiveCount
})
tags = var.tags
}

resource "aws_sqs_queue" "glue_create_dl" {
count = local.enable_glue_create ? 1 : 0

name = var.glue_create_config.sqs_queue_name_dl
policy = data.aws_iam_policy_document.glue_create_sqs_dl[0].json
tags = var.tags
}

resource "aws_sqs_queue_redrive_allow_policy" "terraform_queue_redrive_allow_policy" {
count = local.enable_glue_create ? 1 : 0

queue_url = aws_sqs_queue.glue_create_dl[0].id
redrive_allow_policy = jsonencode({
redrivePermission = "byQueue",
sourceQueueArns = [aws_sqs_queue.glue_create[0].arn]
})
}

resource "aws_sns_topic_subscription" "glue_create_sns_sub" {
count = local.enable_glue_create ? 1 : 0

topic_arn = var.glue_create_config.sns_topic_arn
protocol = "sqs"
endpoint = aws_sqs_queue.glue_create[0].arn
}

data "aws_iam_policy_document" "glue_create_assume" {
count = local.enable_glue_create ? 1 : 0

statement {
effect = "Allow"
principals {
type = "Service"
identifiers = ["lambda.amazonaws.com"]
}
actions = [
"sts:AssumeRole",
]
}
}

data "aws_iam_policy_document" "glue_create" {
count = local.enable_glue_create ? 1 : 0

statement {
sid = "AthenaWorkgroupAthenaRW"
actions = [
"athena:StartQueryExecution",
"athena:GetQueryResults",
"athena:GetWorkGroup",
"athena:StopQueryExecution",
"athena:GetQueryExecution",
]
resources = [
aws_athena_workgroup.glue_create[0].arn
]
effect = "Allow"
}
statement {
sid = "AthenaWorkgroupS3RW"
effect = "Allow"
actions = [
"s3:PutObject",
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:GetBucketLocation"
]
resources = [
"${module.glue_create_athena_workgroup_bucket[0].s3_bucket_arn}/*",
module.glue_create_athena_workgroup_bucket[0].s3_bucket_arn
]
}
statement {
sid = "AthenaWorkgroupList1"
effect = "Allow"
actions = ["athena:ListWorkGroups"]
resources = ["*"]
}
statement {
sid = "GlueAllowTables"
effect = "Allow"
actions = [
"glue:GetTable",
"glue:GetTables",
"glue:GetPartitions",
"glue:CreateTable"
]
resources = [
"arn:aws:glue:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:catalog",
"arn:aws:glue:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:database/*",
"arn:aws:glue:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:table/*"
]
}
statement {
sid = "GlueCatalogAllowDatabases"
effect = "Allow"
actions = [
"glue:GetDatabase",
"glue:GetDatabases",
]
resources = [
"*"
]
}
statement {
sid = "TableExtLocS3RO"
effect = "Allow"
actions = [
"s3:GetObject",
"s3:GetObjectTagging",
"s3:GetObjectVersion",
"s3:GetBucketLocation",
"s3:ListBucket",
"s3:ListBucketVersions"
]
resources = [
var.warehouse_bucket_arn,
"${var.warehouse_bucket_arn}/${var.s3_path}/*"
]
}
statement {
effect = "Allow"
principals {
type = "*"
identifiers = ["*"]
}
actions = ["sqs:ReceiveMessage"]
resources = [aws_sqs_queue.glue_create[0].arn]

condition {
test = "ArnEquals"
variable = "aws:SourceArn"
values = [var.warehouse_bucket_arn]
}
}
statement {
effect = "Allow"
principals {
type = "AWS"
identifiers = ["*"]
}
actions = [
"sqs:SendMessage"
]
resources = [aws_sqs_queue.glue_create[0].arn]
condition {
test = "ForAllValues:StringEquals"
variable = "aws:SourceArn"
values = [aws_sqs_queue.glue_create_dl[0].arn]
}
}
}

resource "aws_iam_policy" "glue_create_managed" {
count = local.enable_glue_create ? 1 : 0

name = var.glue_create_config.iam_police_name
description = "Glue create policy allows access to Athena and S3"
policy = data.aws_iam_policy_document.glue_create[0].json
tags = var.tags
}

resource "aws_iam_role" "glue_create" {
name = var.glue_create_config.iam_role_name
assume_role_policy = data.aws_iam_policy_document.glue_create_assume[0].json
managed_policy_arns = [aws_iam_policy.glue_create_managed[0].arn]
tags = var.tags
}

resource "aws_lambda_function" "glue_create_lambda" {
count = local.enable_glue_create ? 1 : 0

description = "Greate tables in AWS Glue catalog based on the table prefix"
s3_key = var.glue_create_config.lambda_s3_key
s3_bucket = var.glue_create_config.lambda_s3_bucket
function_name = var.glue_create_config.lambda_function_name
role = aws_iam_role.glue_create.arn
handler = "provided"
runtime = "provided.al2"

environment {
variables = {
RUST_LOG = var.rust_log_oxbow_debug_level
ATHENA_WORKGROUP = var.glue_create_config.athena_workgroup_name
ATHENA_DATA_SOURCE = var.glue_create_config.athena_data_source
GLUE_PATH_REGEX = var.glue_create_config.path_regex
UNWRAP_SNS_ENVELOPE = true
}
}
}

resource "aws_lambda_event_source_mapping" "glue_create" {
count = local.enable_glue_create ? 1 : 0

event_source_arn = aws_sqs_queue.glue_create[0].arn
function_name = aws_lambda_function.glue_create_lambda[0].arn
}
23 changes: 23 additions & 0 deletions monitoring.tf
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,26 @@ resource "datadog_monitor" "dead_letters_monitor" {
renotify_interval = 60
tags = var.tags_monitoring
}

resource "datadog_monitor" "dead_letters_monitor_glue_create" {
count = local.enable_dead_letters_monitoring && local.enable_glue_create ? 1 : 0

type = "metric alert"
name = "${var.glue_create_config.sqs_queue_name_dl}-monitor"
message = templatefile("${path.module}/templates/dl_monitor.tmpl", {
dead_letters_queue_name = var.glue_create_config.sqs_queue_name_dl
notify = join(", ", var.dl_alert_recipients)
})
query = "avg(last_1h):avg:aws.sqs.approximate_number_of_messages_visible{queuename:${var.glue_create_config.sqs_queue_name_dl}} > ${var.dl_critical}"

monitor_thresholds {
warning = local.dl_warning
warning_recovery = local.dl_warning - 1
critical = local.dl_critical
critical_recovery = local.dl_critical - 1
}

notify_no_data = false
renotify_interval = 60
tags = var.tags_monitoring
}
26 changes: 25 additions & 1 deletion variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,30 @@ variable "enable_auto_tagging" {

variable "sns_topic_arn" {
type = string
description = "Optional arn to enable the SNS subscription and ENV var for Oxbo"
description = "Optional arn to enable the SNS subscription and ENV var for Oxbow"
default = ""
}

variable "enable_glue_create" {
type = bool
description = "Whether to turn on Glue create Lambda"
default = false
}

variable "glue_create_config" {
type = object({
athena_workgroup_name = string // Name of AWS Athena workgroup
athena_data_source = string // Arn name of AWS Athena data source (catalog)
athena_bucket_name = string // name of AWS Athena bucket.
lambda_s3_key = string // lambda s3 key - lambda path on S3 and file name filename
lambda_s3_bucket = string // lambda s3 bucket where lambda is stored
lambda_function_name = string // lambda function name
path_regex = string // regexp for mapping s3 path to database/table
sns_topic_arn = string // sns topic arn with s3 events (source for lambda)
sqs_queue_name = string // name of sqs queue for glue-sync lambda
sqs_queue_name_dl = string // name dead letter sqs que with not processed s3 events
iam_role_name = string // lambda role name
iam_police_name = string // lambda policy name
})
description = "Configuration of glue-create lambda"
}