From b173b403d6e4e33a80a6db0f77125a6661e7412e Mon Sep 17 00:00:00 2001 From: Kuntal Basu Date: Tue, 15 Apr 2025 16:47:32 -0400 Subject: [PATCH] feat! Kinesis no more supported --- README.md | 33 ++++---------- autotagging.tf | 2 +- main.tf | 121 +++++-------------------------------------------- outputs.tf | 5 +- variables.tf | 40 +--------------- 5 files changed, 24 insertions(+), 177 deletions(-) diff --git a/README.md b/README.md index 3db0074..a62a631 100644 --- a/README.md +++ b/README.md @@ -2,23 +2,19 @@ *Terraform module to manage oxbow Lambda and its components. We can have the following components in AWS: 1. Lambda -2. Kinesis Data Firehose -3. SQS -4. SQS dead letters -5. IAM policy -6. S3 bucket notifications -7. Dynamo DB table -8. Glue catalog -9. Glue table +2. SQS +3. SQS dead letters +4. IAM policy +5. S3 bucket notifications +6. Dynamo DB table +7. Glue catalog +8. Glue table ### examples: if we need Glue catalog and table ``` enable_aws_glue_catalog_table = true -``` -if we need Kinesis Data Firehose delivery stream -``` -enable_kinesis_firehose_delivery_stream = true + ``` if we need s3 bucket notification ``` @@ -35,7 +31,6 @@ module "terraform-oxbow" { source = "" enable_aws_glue_catalog_table = true - enable_kinesis_firehose_delivery_stream = true enable_bucket_notification = false @@ -46,8 +41,6 @@ module "terraform-oxbow" { # the place where we store files s3_path = "" - - lambda_kinesis_role_name = "" lambda_function_name = "" lambda_description = "" lambda_s3_key = "" @@ -60,20 +53,10 @@ module "terraform-oxbow" { sqs_queue_name = "${var.env}--queue" sqs_queue_name_dl = "${var.env}--queue-dl" dynamodb_table_name = "${var.env}-oxbow-lock" - - kinesis_delivery_stream_name = "" - kinesis_s3_prefix = "" - kinesis_s3_errors_prefix = "" - kinesis_policy_name = "" - kinesis_policy_description = "" - - # We have dedicated service Database for Kinesis file conversions glue_database_name = "" glue_table_name = "" glue_location_uri = "" glue_table_description = "" - # this schema is required by Kinesis to convert input into Parquet - parquet_schema = ["parquet_schema] aws_s3_locking_provider = "dynamodb" enabled_dead_letters_monitoring = true diff --git a/autotagging.tf b/autotagging.tf index 36c73a6..2072c3e 100644 --- a/autotagging.tf +++ b/autotagging.tf @@ -164,7 +164,7 @@ data "aws_iam_policy_document" "auto_tagging_assume_role" { resource "aws_iam_role" "auto_tagging_lambda" { count = var.enable_auto_tagging == true ? 1 : 0 - name = "${var.lambda_kinesis_role_name}-auto_tagging" + name = "${var.oxbow_lambda_role_name}-auto_tagging" assume_role_policy = data.aws_iam_policy_document.auto_tagging_assume_role[0].json managed_policy_arns = [aws_iam_policy.auto_tagging_lambda[0].arn] diff --git a/main.tf b/main.tf index 5657131..cb4d456 100644 --- a/main.tf +++ b/main.tf @@ -1,13 +1,12 @@ -# This module creates Kinesis Firehose service (optionally), SQS, lambda function OXBOW +# This module creates SQS, lambda function OXBOW # to receive data and convert it into parquet then Delta log is added by Oxbow lambda data "aws_caller_identity" "current" {} data "aws_region" "current" {} locals { - enable_aws_glue_catalog_table = var.enable_aws_glue_catalog_table - enable_kinesis_firehose_delivery_stream = var.enable_kinesis_firehose_delivery_stream - enable_bucket_notification = var.enable_bucket_notification - enable_group_events = var.enable_group_events + enable_aws_glue_catalog_table = var.enable_aws_glue_catalog_table + enable_bucket_notification = var.enable_bucket_notification + enable_group_events = var.enable_group_events } @@ -41,43 +40,6 @@ resource "aws_glue_catalog_table" "this_glue_table" { } } -resource "aws_kinesis_firehose_delivery_stream" "this_kinesis" { - count = local.enable_kinesis_firehose_delivery_stream ? 1 : 0 - name = var.kinesis_delivery_stream_name - destination = "extended_s3" - extended_s3_configuration { - buffering_size = 128 - role_arn = aws_iam_role.this_iam_role_lambda_kinesis.arn - bucket_arn = var.warehouse_bucket_arn - error_output_prefix = var.kinesis_s3_errors_prefix - prefix = var.kinesis_s3_prefix - - cloudwatch_logging_options { - enabled = true - log_group_name = "/aws/kinesisfirehose/${var.kinesis_delivery_stream_name}" - log_stream_name = "DestinationDelivery" - } - data_format_conversion_configuration { - input_format_configuration { - deserializer { - open_x_json_ser_de {} - } - } - output_format_configuration { - serializer { - parquet_ser_de {} - } - } - schema_configuration { - database_name = var.glue_database_name - role_arn = aws_iam_role.this_iam_role_lambda_kinesis.arn - table_name = var.glue_table_name - region = "us-east-2" - } - } - } - tags = var.tags -} locals { oxbow_lambda_unwrap_sns_event = var.enable_group_events == true ? {} : var.sns_topic_arn == "" ? {} : { UNWRAP_SNS_ENVELOPE = true } group_eventlambda_unwrap_sns_event = var.sns_topic_arn == "" ? {} : { UNWRAP_SNS_ENVELOPE = true } @@ -91,7 +53,7 @@ resource "aws_lambda_function" "this_lambda" { s3_key = var.lambda_s3_key s3_bucket = var.lambda_s3_bucket function_name = var.lambda_function_name - role = aws_iam_role.this_iam_role_lambda_kinesis.arn + role = aws_iam_role.oxbow_lambda_role.arn handler = "provided" runtime = "provided.al2023" memory_size = var.lambda_memory_size @@ -119,7 +81,7 @@ resource "aws_lambda_function" "group_events_lambda" { s3_key = var.events_lambda_s3_key s3_bucket = var.events_lambda_s3_bucket function_name = var.events_lambda_function_name - role = aws_iam_role.this_iam_role_lambda_kinesis.arn + role = aws_iam_role.oxbow_lambda_role.arn handler = "provided" runtime = "provided.al2023" @@ -328,11 +290,8 @@ data "aws_iam_policy_document" "this_services_assume_role" { statement { effect = "Allow" principals { - type = "Service" - identifiers = concat( - ["lambda.amazonaws.com"], - local.enable_kinesis_firehose_delivery_stream ? ["firehose.amazonaws.com"] : [] - ) + type = "Service" + identifiers = ["lambda.amazonaws.com"] } actions = [ "sts:AssumeRole", @@ -462,67 +421,11 @@ data "aws_iam_policy_document" "this_dead_letter_queue_policy" { } } +resource "aws_iam_role" "oxbow_lambda_role" { + name = var.oxbow_lambda_role_name + assume_role_policy = data.aws_iam_policy_document.this_services_assume_role.json + managed_policy_arns = [aws_iam_policy.this_lambda_permissions.arn] -data "aws_iam_policy_document" "this_kinesis_policy_data" { - count = local.enable_kinesis_firehose_delivery_stream ? 1 : 0 - statement { - sid = "GlueAccess" - effect = "Allow" - actions = [ - "glue:GetTable", - "glue:GetTableVersion", - "glue:GetTableVersions", - ] - resources = [ - "arn:aws:glue:us-east-2:${data.aws_caller_identity.current.account_id}:catalog", - "arn:aws:glue:us-east-2:${data.aws_caller_identity.current.account_id}:database/${var.glue_database_name}", - "arn:aws:glue:us-east-2:${data.aws_caller_identity.current.account_id}:table/${var.glue_database_name}/${var.glue_table_name}" - ] - } - statement { - sid = "S3Access" - effect = "Allow" - actions = [ - "s3:AbortMultipartUpload", - "s3:GetBucketLocation", - "s3:GetObject", - "s3:ListBucket", - "s3:ListBucketMultipartUploads", - "s3:PutObject" - ] - resources = [ - "${var.warehouse_bucket_arn}/${var.s3_path}", - "${var.warehouse_bucket_arn}/${var.s3_path}/*" - ] - } - statement { - sid = "LogsAccess" - effect = "Allow" - actions = [ - "logs:PutLogEvents" - ] - resources = [ - "arn:aws:logs:us-east-2:${data.aws_caller_identity.current.account_id}:log-group:/aws/kinesisfirehose/${var.kinesis_delivery_stream_name}:log-stream:*" - ] - } -} - -resource "aws_iam_policy" "this_kinesis_policy" { - count = local.enable_kinesis_firehose_delivery_stream ? 1 : 0 - name = var.kinesis_policy_name - description = var.kinesis_policy_description - policy = data.aws_iam_policy_document.this_kinesis_policy_data[0].json - tags = var.tags -} - - -resource "aws_iam_role" "this_iam_role_lambda_kinesis" { - name = var.lambda_kinesis_role_name - assume_role_policy = data.aws_iam_policy_document.this_services_assume_role.json - managed_policy_arns = concat( - local.enable_kinesis_firehose_delivery_stream ? [aws_iam_policy.this_kinesis_policy[0].arn] : [], - [aws_iam_policy.this_lambda_permissions.arn] - ) tags = var.tags } diff --git a/outputs.tf b/outputs.tf index fb32e61..34e2a29 100644 --- a/outputs.tf +++ b/outputs.tf @@ -1,7 +1,4 @@ -output "kinesis_stream_arn" { - description = "Kinesis stream arn" - value = local.enable_kinesis_firehose_delivery_stream ? aws_kinesis_firehose_delivery_stream.this_kinesis[0].arn : "" -} + output "lambda_arn" { description = "Lambda arn" diff --git a/variables.tf b/variables.tf index a71d53a..fbb8b00 100644 --- a/variables.tf +++ b/variables.tf @@ -28,18 +28,6 @@ variable "parquet_schema" { default = [] } -variable "kinesis_s3_prefix" { - default = "" - type = string - description = "Kinesis s3 prefix - s3 location where the files will be output" -} - -variable "kinesis_delivery_stream_name" { - type = string - default = "" - description = "Kinesis delivery stream name" -} - variable "warehouse_bucket_arn" { type = string description = "Warehouse bucket arn" @@ -50,12 +38,6 @@ variable "warehouse_bucket_name" { description = "Warehouse bucket name" } -variable "kinesis_s3_errors_prefix" { - type = string - default = "" - description = "Kinesiss3 errors prefix - s3 location where the files will be output" -} - variable "lambda_function_name" { type = string description = "Lambda function name" @@ -130,18 +112,6 @@ variable "lambda_reserved_concurrent_executions" { default = 1 } -variable "kinesis_policy_name" { - type = string - default = "" - description = "Kinesis policy name" -} - -variable "kinesis_policy_description" { - type = string - description = "Kinesis policy description" - default = "" -} - variable "rust_log_deltalake_debug_level" { type = string description = "Rust log deltalake debug level" @@ -221,9 +191,9 @@ variable "sqs_queue_name_dl" { description = "Sqs queue name - dead letters" } -variable "lambda_kinesis_role_name" { +variable "oxbow_lambda_role_name" { type = string - description = "Lambda kinesis IAM role name" + description = "Lambda oxbow IAM role name" } variable "tags" { @@ -283,12 +253,6 @@ variable "enable_aws_glue_catalog_table" { default = false } -variable "enable_kinesis_firehose_delivery_stream" { - type = bool - description = "Enable firehose delivery stream" - default = false -} - variable "enable_bucket_notification" { type = bool description = "Enable enable_bucket_notification"