In [1]:
import zipfile
import json
import boto3
import mysql.connector
import requests
import dataset
import re
from datetime import datetime
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import csv

In [2]:
aws_lambda = boto3.client('lambda')
iam_client = boto3.client('iam')
role = iam_client.get_role(RoleName='LabRole')

### Query data from Chicago Data Portal and put data onto MySQL table on RDS

#### Set ups: Create RDS instance, s3 bucket, MySQL table

In [4]:
def create_rds_instance(DATABASE_IDENTIFIER, DATABASE_NAME):
    """
    Create a MySQL RDS
    Input: DATABASE_INDENTIFIER, DATABASE_NAME
    """
    try:
        rds = boto3.client('rds')
        response = rds.create_db_instance(
            DBInstanceIdentifier=DATABASE_IDENTIFIER,
            DBName=DATABASE_NAME,
            MasterUsername='username',
            MasterUserPassword='password',
            DBInstanceClass='db.t3.micro',
            Engine='MySQL',
            AllocatedStorage=5
        )

        # Wait until DB is available to continue
        waiter = rds.get_waiter('db_instance_available')
        waiter.wait(DBInstanceIdentifier=DATABASE_IDENTIFIER)
    except:
        db = rds.describe_db_instances()['DBInstances'][0]

In [None]:
# Parameters
DATABASE_NAME = 'final_db'
DATABASE_IDENTIFIER = 'relational-final-db'
USERNAME = 'username'
PASSWORD = 'password'

In [22]:
# Create S3 Bucket
s3_client = boto3.client('s3')
s3_client.create_bucket(Bucket='crime-standard-s3-bucket')

{'ResponseMetadata': {'RequestId': 'XP0PDM79W9GW7XHJ',
  'HostId': '9CSSWEqY5aV/SttuDOCCbo0qpthmVA8GQRGmPaRLUOXD2ShCqF6b3zUDOefUe5ov0umg404CeRc=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': '9CSSWEqY5aV/SttuDOCCbo0qpthmVA8GQRGmPaRLUOXD2ShCqF6b3zUDOefUe5ov0umg404CeRc=',
   'x-amz-request-id': 'XP0PDM79W9GW7XHJ',
   'date': 'Fri, 24 May 2024 18:55:04 GMT',
   'location': '/crime-standard-s3-bucket',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'Location': '/crime-standard-s3-bucket'}

In [12]:
# Create RDS instance
create_rds_instance(DATABASE_IDENTIFIER, DATABASE_NAME)

In [4]:
# Connect to the MySQL RDS instance
rds = boto3.client('rds')
db = rds.describe_db_instances()['DBInstances'][0]
ENDPOINT = db['Endpoint']['Address']
PORT = db['Endpoint']['Port']
DBID = db['DBInstanceIdentifier']
print(DBID, "is available at", ENDPOINT, "on Port", PORT)

relational-final-db is available at relational-final-db.czi23atdudgr.us-east-1.rds.amazonaws.com on Port 3306


In [27]:
def create_mysql_crimes(ENDPOINT, PORT, DBID, DATABASE_NAME):
    """Create MySQL table for crime report data"""
    # Get Name of Security Group
    SGNAME = db['VpcSecurityGroups'][0]['VpcSecurityGroupId']

    try:
        ec2 = boto3.client('ec2')
        data = ec2.authorize_security_group_ingress(
                GroupId=SGNAME,
                IpPermissions=[
                    {'IpProtocol': 'tcp',
                    'FromPort': PORT,
                    'ToPort': PORT,
                    'IpRanges': [{'CidrIp': '0.0.0.0/0'}]}
                ]
        )
    except ec2.exceptions.ClientError as e:
        if e.response["Error"]["Code"] == 'InvalidPermission.Duplicate':
            print("Permissions already adjusted.")
        else:
            print(e)

    # Connect to the MySQL RDS instance
    conn = mysql.connector.connect(
        host=ENDPOINT,
        user='username',
        password='password',
        port=PORT, 
        database=DATABASE_NAME
    )

    cursor = conn.cursor()

    # Create the table if it doesn't exist
    create_crime_table_query = '''
    CREATE TABLE IF NOT EXISTS crime_reports (
        id VARCHAR(255),
        case_number VARCHAR(255),
        date DATE,
        block VARCHAR(255),
        primary_type VARCHAR(255),
        district VARCHAR(50),
        ward VARCHAR(50),
        community_area VARCHAR(50),
        year INT,
        latitude DECIMAL(9,6),
        longitude DECIMAL(9,6),
        PRIMARY KEY (id)
    )
    '''
    cursor.execute(create_crime_table_query)
    cursor.close()
    conn.commit()
    conn.close()
    
create_mysql_crimes(ENDPOINT, PORT, DBID, DATABASE_NAME)

In [5]:
def get_table_size(table_name):
    """Check the size of the MySQL table that speficied by table name"""
    db_url = f'mysql+mysqlconnector://{USERNAME}:{PASSWORD}@{ENDPOINT}:{PORT}/{DATABASE_NAME}'
    db = dataset.connect(db_url)
    book = db[table_name]
    return len(book)

#### Query crime report data in a parallelized way

In [6]:
def make_def(lambda_arn):
    definition = {
      "Comment": "My State Machine",
      "StartAt": "Map",
      "States": {
        "Map": {
          "Type": "Map",
          "End": True,
          "MaxConcurrency": 10,
          "Iterator": {
            "StartAt": "Lambda Invoke",
            "States": {
              "Lambda Invoke": {
                "Type": "Task",
                "Resource": "arn:aws:states:::lambda:invoke",
                "OutputPath": "$.Payload",
                "Parameters": {
                  "Payload.$": "$",
                  "FunctionName": lambda_arn
                },
                "Retry": [
                  {
                    "ErrorEquals": [
                      "Lambda.ServiceException",
                      "Lambda.AWSLambdaException",
                      "Lambda.SdkClientException",
                      "Lambda.TooManyRequestsException",
                      "States.TaskFailed",
                      "Lambda.Unknown"
                    ],
                    "IntervalSeconds": 2,
                    "MaxAttempts": 6,
                    "BackoffRate": 2
                  }
                ],
                "End": True
              }
            }
          }
        }
      }
    }
    return definition

In [None]:
# Create Lambda Function to query data 
# and store in MySQL table and s3 bucket at the same time

with open('lambda_function_query.zip', 'rb') as f:
    # the zipfile only has lambda_function.py in it
    lambda_zip = f.read()
try:
    # If function hasn't yet been created, create it
    response = aws_lambda.create_function(
        FunctionName='lambda-function-final-query-byday',
        Runtime='python3.9',
        Role=role['Role']['Arn'],
        Handler='lambda_function_query.lambda_handler',
        Code=dict(ZipFile=lambda_zip),
        Timeout=800
    )
except aws_lambda.exceptions.ResourceConflictException:
    # If function already exists, update it based on zip file contents
    response = aws_lambda.update_function_code(
    FunctionName='lambda-function-final-query-byday',
    ZipFile=lambda_zip,
    )

response = aws_lambda.put_function_concurrency(
    FunctionName='lambda-function-final-query-byday',
    ReservedConcurrentExecutions=10)

In [24]:
# Create Step function
sfn = boto3.client('stepfunctions')
lambda_arn = aws_lambda.get_function(FunctionName='lambda-function-final-query-byday')['Configuration']['FunctionArn']
sf_def = make_def(lambda_arn)
try:
    response = sfn.create_state_machine(
        name='lambda-function-final-query-byday-step',
        definition=json.dumps(sf_def),
        roleArn=role['Role']['Arn'],
        type='EXPRESS'
    )
except sfn.exceptions.StateMachineAlreadyExists:
    response = sfn.list_state_machines()
    state_machine_arn = [sm['stateMachineArn'] 
                        for sm in response['stateMachines'] 
                        if sm['name'] == 'lambda-function-final-query-byday-step'][0]
    response = sfn.update_state_machine(
        stateMachineArn=state_machine_arn,
        definition=json.dumps(sf_def),
        roleArn=role['Role']['Arn']
    )

response = sfn.list_state_machines()
state_machine_arn = [sm['stateMachineArn'] 
                    for sm in response['stateMachines'] 
                    if sm['name'] == 'lambda-function-final-query-byday-step'][0]


In [5]:
# generate events
def generate_where_clause(start_date, end_date):
    where_clause = f"date between '{start_date.strftime('%Y-%m-%dT%H:%M:%S.000')}' and '{end_date.strftime('%Y-%m-%dT%H:%M:%S.000')}'"
    return where_clause

m = 12
wheres_by_months = [[{'where': generate_where_clause(datetime(2023, m+1, j), datetime(2023, m+1, j+1))} for j in range(1,28)] for m in range(12)]
n = 9
wheres_batches = [[wheres_by_months[m][i:i + n] for i in range(0, len(wheres_by_months), n)] for m in range(12)]

In [None]:
# Start Step Function executions for each query params
response = sfn.start_sync_execution(
    stateMachineArn=state_machine_arn,
    name='crimes_9months',
    input=json.dumps(wheres_batches[0]) # running from 0 to 11
)
response = aws_lambda.put_function_concurrency(
        FunctionName='crimes_9months',
        ReservedConcurrentExecutions=9
    )

In [12]:
# Check the number of entries in the mysql table
table_name = 'crime_reports'
get_table_size(table_name)

261796