Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 8 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,19 @@
*Terraform module to manage oxbow Lambda and its components.
We can have the following components in AWS:
1. Lambda
2. Kinesis Data Firehose
3. SQS
4. SQS dead letters
5. IAM policy
6. S3 bucket notifications
7. Dynamo DB table
8. Glue catalog
9. Glue table
2. SQS
3. SQS dead letters
4. IAM policy
5. S3 bucket notifications
6. Dynamo DB table
7. Glue catalog
8. Glue table

### examples:
if we need Glue catalog and table
```
enable_aws_glue_catalog_table = true
```
if we need Kinesis Data Firehose delivery stream
```
enable_kinesis_firehose_delivery_stream = true

```
if we need s3 bucket notification
```
Expand All @@ -35,7 +31,6 @@ module "terraform-oxbow" {
source = ""

enable_aws_glue_catalog_table = true
enable_kinesis_firehose_delivery_stream = true
enable_bucket_notification = false


Expand All @@ -46,8 +41,6 @@ module "terraform-oxbow" {

# the place where we store files
s3_path = ""

lambda_kinesis_role_name = ""
lambda_function_name = ""
lambda_description = ""
lambda_s3_key = ""
Expand All @@ -60,20 +53,10 @@ module "terraform-oxbow" {
sqs_queue_name = "${var.env}--queue"
sqs_queue_name_dl = "${var.env}--queue-dl"
dynamodb_table_name = "${var.env}-oxbow-lock"

kinesis_delivery_stream_name = ""
kinesis_s3_prefix = ""
kinesis_s3_errors_prefix = ""
kinesis_policy_name = ""
kinesis_policy_description = ""

# We have dedicated service Database for Kinesis file conversions
glue_database_name = ""
glue_table_name = ""
glue_location_uri = ""
glue_table_description = ""
# this schema is required by Kinesis to convert input into Parquet
parquet_schema = ["parquet_schema]
aws_s3_locking_provider = "dynamodb"

enabled_dead_letters_monitoring = true
Expand Down
2 changes: 1 addition & 1 deletion autotagging.tf
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ data "aws_iam_policy_document" "auto_tagging_assume_role" {
resource "aws_iam_role" "auto_tagging_lambda" {
count = var.enable_auto_tagging == true ? 1 : 0

name = "${var.lambda_kinesis_role_name}-auto_tagging"
name = "${var.oxbow_lambda_role_name}-auto_tagging"
assume_role_policy = data.aws_iam_policy_document.auto_tagging_assume_role[0].json
managed_policy_arns = [aws_iam_policy.auto_tagging_lambda[0].arn]

Expand Down
121 changes: 12 additions & 109 deletions main.tf
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
# This module creates Kinesis Firehose service (optionally), SQS, lambda function OXBOW
# This module creates SQS, lambda function OXBOW
# to receive data and convert it into parquet then Delta log is added by Oxbow lambda
data "aws_caller_identity" "current" {}
data "aws_region" "current" {}

locals {
enable_aws_glue_catalog_table = var.enable_aws_glue_catalog_table
enable_kinesis_firehose_delivery_stream = var.enable_kinesis_firehose_delivery_stream
enable_bucket_notification = var.enable_bucket_notification
enable_group_events = var.enable_group_events
enable_aws_glue_catalog_table = var.enable_aws_glue_catalog_table
enable_bucket_notification = var.enable_bucket_notification
enable_group_events = var.enable_group_events
}


Expand Down Expand Up @@ -41,43 +40,6 @@ resource "aws_glue_catalog_table" "this_glue_table" {
}
}

resource "aws_kinesis_firehose_delivery_stream" "this_kinesis" {
count = local.enable_kinesis_firehose_delivery_stream ? 1 : 0
name = var.kinesis_delivery_stream_name
destination = "extended_s3"
extended_s3_configuration {
buffering_size = 128
role_arn = aws_iam_role.this_iam_role_lambda_kinesis.arn
bucket_arn = var.warehouse_bucket_arn
error_output_prefix = var.kinesis_s3_errors_prefix
prefix = var.kinesis_s3_prefix

cloudwatch_logging_options {
enabled = true
log_group_name = "/aws/kinesisfirehose/${var.kinesis_delivery_stream_name}"
log_stream_name = "DestinationDelivery"
}
data_format_conversion_configuration {
input_format_configuration {
deserializer {
open_x_json_ser_de {}
}
}
output_format_configuration {
serializer {
parquet_ser_de {}
}
}
schema_configuration {
database_name = var.glue_database_name
role_arn = aws_iam_role.this_iam_role_lambda_kinesis.arn
table_name = var.glue_table_name
region = "us-east-2"
}
}
}
tags = var.tags
}
locals {
oxbow_lambda_unwrap_sns_event = var.enable_group_events == true ? {} : var.sns_topic_arn == "" ? {} : { UNWRAP_SNS_ENVELOPE = true }
group_eventlambda_unwrap_sns_event = var.sns_topic_arn == "" ? {} : { UNWRAP_SNS_ENVELOPE = true }
Expand All @@ -91,7 +53,7 @@ resource "aws_lambda_function" "this_lambda" {
s3_key = var.lambda_s3_key
s3_bucket = var.lambda_s3_bucket
function_name = var.lambda_function_name
role = aws_iam_role.this_iam_role_lambda_kinesis.arn
role = aws_iam_role.oxbow_lambda_role.arn
handler = "provided"
runtime = "provided.al2023"
memory_size = var.lambda_memory_size
Expand Down Expand Up @@ -119,7 +81,7 @@ resource "aws_lambda_function" "group_events_lambda" {
s3_key = var.events_lambda_s3_key
s3_bucket = var.events_lambda_s3_bucket
function_name = var.events_lambda_function_name
role = aws_iam_role.this_iam_role_lambda_kinesis.arn
role = aws_iam_role.oxbow_lambda_role.arn
handler = "provided"
runtime = "provided.al2023"

Expand Down Expand Up @@ -328,11 +290,8 @@ data "aws_iam_policy_document" "this_services_assume_role" {
statement {
effect = "Allow"
principals {
type = "Service"
identifiers = concat(
["lambda.amazonaws.com"],
local.enable_kinesis_firehose_delivery_stream ? ["firehose.amazonaws.com"] : []
)
type = "Service"
identifiers = ["lambda.amazonaws.com"]
}
actions = [
"sts:AssumeRole",
Expand Down Expand Up @@ -462,67 +421,11 @@ data "aws_iam_policy_document" "this_dead_letter_queue_policy" {
}
}

resource "aws_iam_role" "oxbow_lambda_role" {
name = var.oxbow_lambda_role_name
assume_role_policy = data.aws_iam_policy_document.this_services_assume_role.json
managed_policy_arns = [aws_iam_policy.this_lambda_permissions.arn]

data "aws_iam_policy_document" "this_kinesis_policy_data" {
count = local.enable_kinesis_firehose_delivery_stream ? 1 : 0
statement {
sid = "GlueAccess"
effect = "Allow"
actions = [
"glue:GetTable",
"glue:GetTableVersion",
"glue:GetTableVersions",
]
resources = [
"arn:aws:glue:us-east-2:${data.aws_caller_identity.current.account_id}:catalog",
"arn:aws:glue:us-east-2:${data.aws_caller_identity.current.account_id}:database/${var.glue_database_name}",
"arn:aws:glue:us-east-2:${data.aws_caller_identity.current.account_id}:table/${var.glue_database_name}/${var.glue_table_name}"
]
}
statement {
sid = "S3Access"
effect = "Allow"
actions = [
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject"
]
resources = [
"${var.warehouse_bucket_arn}/${var.s3_path}",
"${var.warehouse_bucket_arn}/${var.s3_path}/*"
]
}
statement {
sid = "LogsAccess"
effect = "Allow"
actions = [
"logs:PutLogEvents"
]
resources = [
"arn:aws:logs:us-east-2:${data.aws_caller_identity.current.account_id}:log-group:/aws/kinesisfirehose/${var.kinesis_delivery_stream_name}:log-stream:*"
]
}
}

resource "aws_iam_policy" "this_kinesis_policy" {
count = local.enable_kinesis_firehose_delivery_stream ? 1 : 0
name = var.kinesis_policy_name
description = var.kinesis_policy_description
policy = data.aws_iam_policy_document.this_kinesis_policy_data[0].json
tags = var.tags
}


resource "aws_iam_role" "this_iam_role_lambda_kinesis" {
name = var.lambda_kinesis_role_name
assume_role_policy = data.aws_iam_policy_document.this_services_assume_role.json
managed_policy_arns = concat(
local.enable_kinesis_firehose_delivery_stream ? [aws_iam_policy.this_kinesis_policy[0].arn] : [],
[aws_iam_policy.this_lambda_permissions.arn]
)
tags = var.tags
}

Expand Down
5 changes: 1 addition & 4 deletions outputs.tf
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
output "kinesis_stream_arn" {
description = "Kinesis stream arn"
value = local.enable_kinesis_firehose_delivery_stream ? aws_kinesis_firehose_delivery_stream.this_kinesis[0].arn : ""
}


output "lambda_arn" {
description = "Lambda arn"
Expand Down
40 changes: 2 additions & 38 deletions variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,6 @@ variable "parquet_schema" {
default = []
}

variable "kinesis_s3_prefix" {
default = ""
type = string
description = "Kinesis s3 prefix - s3 location where the files will be output"
}

variable "kinesis_delivery_stream_name" {
type = string
default = ""
description = "Kinesis delivery stream name"
}

variable "warehouse_bucket_arn" {
type = string
description = "Warehouse bucket arn"
Expand All @@ -50,12 +38,6 @@ variable "warehouse_bucket_name" {
description = "Warehouse bucket name"
}

variable "kinesis_s3_errors_prefix" {
type = string
default = ""
description = "Kinesiss3 errors prefix - s3 location where the files will be output"
}

variable "lambda_function_name" {
type = string
description = "Lambda function name"
Expand Down Expand Up @@ -130,18 +112,6 @@ variable "lambda_reserved_concurrent_executions" {
default = 1
}

variable "kinesis_policy_name" {
type = string
default = ""
description = "Kinesis policy name"
}

variable "kinesis_policy_description" {
type = string
description = "Kinesis policy description"
default = ""
}

variable "rust_log_deltalake_debug_level" {
type = string
description = "Rust log deltalake debug level"
Expand Down Expand Up @@ -221,9 +191,9 @@ variable "sqs_queue_name_dl" {
description = "Sqs queue name - dead letters"
}

variable "lambda_kinesis_role_name" {
variable "oxbow_lambda_role_name" {
type = string
description = "Lambda kinesis IAM role name"
description = "Lambda oxbow IAM role name"
}

variable "tags" {
Expand Down Expand Up @@ -283,12 +253,6 @@ variable "enable_aws_glue_catalog_table" {
default = false
}

variable "enable_kinesis_firehose_delivery_stream" {
type = bool
description = "Enable firehose delivery stream"
default = false
}

variable "enable_bucket_notification" {
type = bool
description = "Enable enable_bucket_notification"
Expand Down
Loading