In [24]:
import boto3
from dotenv import load_dotenv
import os
import json

In [55]:
load_dotenv(override=True)
aws_access_key_id = os.getenv("aws_access_key_id")
aws_secret_access_key = os.getenv("aws_secret_access_key")
aws_account_id = os.getenv("aws_account_id")
region_name = "us-east-2"
availability_zone_1 = "us-east-2a"
availability_zone_2 = "us-east-2b"
bucket_name = "indeed-job-data"
mwaa_environ_name = "mwaa-job-analysis"

# First we create the s3 bucket which will hold our data

In [16]:
#init s3 client
s3 = boto3.client("s3", aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key,
                  region_name=region_name)

In [None]:
# Create an S3 bucket
s3.create_bucket(
    Bucket=bucket_name,
    CreateBucketConfiguration={
        'LocationConstraint': region_name
    }
)

In [25]:
# Enable bucket versioning
s3.put_bucket_versioning(
    Bucket=bucket_name,
    VersioningConfiguration={'Status': 'Enabled'}
)

# Initialize an S3 control client
s3_control = boto3.client('s3control', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name)

# Configure public access blocking for the entire AWS account
s3_control.put_public_access_block(
    PublicAccessBlockConfiguration={
        'BlockPublicAcls': True,
        'IgnorePublicAcls': True,
        'BlockPublicPolicy': True,
        'RestrictPublicBuckets': True
    },
    AccountId=aws_account_id  # Replace with your AWS account ID
)

# Define a bucket policy to grant MWAA permissions
bucket_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "MWAAReadWriteAccess",
            "Effect": "Allow",
            "Principal": {
                "Service": "airflow.amazonaws.com"
            },
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:ListBucket"
            ],
            "Resource": [
                f"arn:aws:s3:::{bucket_name}/*",
                f"arn:aws:s3:::{bucket_name}"
            ]
        }
    ]
}

# Convert the policy to a JSON string
bucket_policy_json = json.dumps(bucket_policy)

# Apply the bucket policy
s3.put_bucket_policy(Bucket=bucket_name, Policy=bucket_policy_json)

print(f"S3 bucket {bucket_name} created and configured for MWAA.")

S3 bucket indeed-job-data created and configured for MWAA.


# Now we create the VPC MWAA will use

In [58]:
# Init ec2 instance
ec2 = boto3.client('ec2', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name)

In [59]:
# Create a VPC
vpc = ec2.create_vpc(CidrBlock='10.0.0.0/16')
# Get the VPC ID
vpc_id = vpc['Vpc']['VpcId']
# Enable DNS resolution and DNS hostnames for the VPC
ec2.modify_vpc_attribute(VpcId=vpc_id, EnableDnsSupport={'Value': True})
ec2.modify_vpc_attribute(VpcId=vpc_id, EnableDnsHostnames={'Value': True})

# Create a subnet within the VPC
subnet1 = ec2.create_subnet(VpcId=vpc_id, CidrBlock='10.0.0.0/24', AvailabilityZone=availability_zone_1)
# Create a second subnet as MWAA requires at least 2
subnet2 = ec2.create_subnet(VpcId=vpc_id, CidrBlock='10.0.1.0/24', AvailabilityZone=availability_zone_2)
# Get the Subnet ID
subnet_id_1 = subnet1['Subnet']['SubnetId']
subnet_id_2 = subnet2['Subnet']['SubnetId']
subnet_list = [subnet_id_1, subnet_id_2]
print(f"VPC with ID {vpc_id} and Subnets with IDs {subnet_list} "
      f"created in availability zones {availability_zone_1} and {availability_zone_2} respectively.")

VPC with ID vpc-0ba65f485f6a97895 and Subnets with IDs ['subnet-07e4d0953abcaf3f3', 'subnet-000075b37a18c3bef'] created in availability zones us-east-2a and us-east-2b respectively.


# Create the IAM role for MWAA
Giving this role carte blanche with S3 and VPC access, not good practice but fine for now.

In [30]:
# Initialize an IAM client
iam = boto3.client('iam', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name)

In [31]:
execution_role_name = "mwaaJobExecutionRole"

In [68]:
# Define the trust policy document for the MWAA execution role
trust_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "airflow-env.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}

# Create the MWAA execution role
execution_role = iam.create_role(
    RoleName=execution_role_name,
    AssumeRolePolicyDocument=json.dumps(trust_policy)
)

# Attach the necessary policies to the execution role for S3 access
iam.attach_role_policy(
    RoleName=execution_role_name,
    PolicyArn='arn:aws:iam::aws:policy/AmazonS3FullAccess'
)

# Attach the necessary policies to the execution role for VPC access (example policy)
iam.attach_role_policy(
    RoleName=execution_role_name,
    PolicyArn='arn:aws:iam::aws:policy/AmazonVPCFullAccess'
)

# Print the ARN of the execution role
execution_role_arn = execution_role['Role']['Arn']
print(f"MWAA execution role ARN: {execution_role_arn}")

MWAA execution role ARN: arn:aws:iam::910115230456:role/mwaaJobExecutionRole


# Create the MWAA environment
Note that we are putting our dags in the source bucket rather than having a separate dag bucket here.

In [69]:
source_bucket_arn = "arn:aws:s3:::indeed-job-data" # From our s3 bucket creation above
webserver_access_mode = "PUBLIC_ONLY"
dag_s3_path = "dags/" # since we will store our dags in the main s3 bucket we just need to go to the proper folder.
security_group_id_1 = "sg-064309e82cdc9d018"
security_group_id_2 = "sg-090ae318b4ecd5dba"
# security_group_id_3 = "sg-0a7d1ebce99fbdd16"
security_group_ids = [security_group_id_2]

In [70]:
# Initialize an MWAA client
mwaa = boto3.client('mwaa', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name)
                    
# Define your network configuration
network_configuration = {
    'SecurityGroupIds': security_group_ids,
    'SubnetIds': subnet_list
}

# Create the MWAA environment
mwaa.create_environment(
    Name=mwaa_environ_name,
    SourceBucketArn=source_bucket_arn,
    ExecutionRoleArn=execution_role_arn,
    WebserverAccessMode=webserver_access_mode,
    DagS3Path=dag_s3_path,
    NetworkConfiguration=network_configuration
    # Add other configuration parameters as needed
)

print(f"MWAA environment '{mwaa_environ_name}' created.")

MWAA environment 'mwaa-job-analysis' created.


# Now I need to upload my DAG and the raw data to my S3 bucket.