In [None]:
"""Project Summary

Step 1: Scope the Project and Gather Data
Step 2: Explore and Assess the Data
Step 3: Define the Data Model
Step 4: Run ETL to Model the Data
Step 5: Complete Project Write Up

"""

In [None]:
'''
# Step 1: Scope the Project and Gather Data

-- From Toronto datasets collected, we will extract various data points on apartment buildings in the Toronto area
such income, type of building, and ratings. In the end, We will build a dashboard to show those found data points
to understand the potential area we will place our distribution center. 
'''

In [5]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import psycopg2
import uuid
import re
import os
import configparser
# import boto3

# Configuration
# config = configparser.ConfigParser()
# config.read('config.cfg')

# os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS', 'AWS_ACCESS_KEY_ID')
# os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS', 'AWS_SECRET_ACCESS_KEY')

# Redshift connection setup
# conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))

# Get the Spark session
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

spark

In [31]:
# 1.1 reading in data for development applications
dev_application_df = (spark.read
  .option("header", True)
  .csv("Development_Applications_Data.csv")
)
dev_application_df.limit(4).toPandas()

Unnamed: 0,_id,APPLICATION#,APPLICATION_TYPE,DATE_SUBMITTED,DESCRIPTION,HEARING_DATE,POSTAL,REFERENCE_FILE#,STATUS,STREET_DIRECTION,STREET_NAME,STREET_NUM,STREET_TYPE,X,Y
0,540631,17 260753 STE 28 SA,SA,2017-11-10,Site Plan Control application for a proposed 1...,,,,Closed,E,KING,65,ST,314829.237,4834177.49
1,540632,18 189938 STE 22 OZ,OZ,2018-07-06,The City has received applications to amend th...,,M4T,,Under Review,,YONGE,1506,ST,313272.284,4838553.3
2,540633,18 189996 NNY 25 MV,MV,2018-07-06,To construct a new residential dwelling with a...,2018-08-30,M4N,A0513/18NY,Approved,,TEDDINGTON PARK,50,AVE,312715.229,4843548.456
3,540634,18 169199 STE 22 SA,SA,2018-05-31,To alter the redevelopment plan approved under...,,M4V,,Under Review,,AVENUE,620,RD,312561.597,4838702.786


In [27]:
# 1.2 reading in data for ward demographics
ward_demo_df = (spark.read
    .option("header", True)
    .csv("pdf_census.csv")
)
dev_application.limit(4).toPandas()

Unnamed: 0,Ward Population,Ward
0,Median Age 40.6,Ward 20
1,Median Age 40P,Ward 15
2,Median Age 35.6,Ward 11
3,Median Age 38.2,Ward 24


In [22]:
# 1.1 reading in data for apartment building evaluation
build_eval_df = (spark.read
  .option("header", True)
  .json("Apartment_Building_Evaluation.json")
)
dev_application.limit(4).toPandas()


Unnamed: 0,BALCONY_GUARDS,CONFIRMED_STOREYS,CONFIRMED_UNITS,ELEVATORS,ENTRANCE_DOORS_WINDOWS,ENTRANCE_LOBBY,EVALUATION_COMPLETED_ON,EXTERIOR_CLADDING,EXTERIOR_GROUNDS,EXTERIOR_WALKWAYS,...,SCORE,SECURITY,SITE_ADDRESS,STAIRWELLS,STORAGE_AREAS_LOCKERS,WARD,WATER_PEN_EXT_BLDG_ELEMENTS,YEAR_BUILT,YEAR_REGISTERED,_id
0,,7.0,34.0,4.0,4.0,4.0,03/12/2019,3.0,4.0,4.0,...,78.0,5.0,49 ST CLAIR AVE W,4.0,,12,4.0,1930.0,2017.0,6906
1,,4.0,24.0,,3.0,3.0,03/05/2019,3.0,3.0,3.0,...,73.0,5.0,320 AVENUE RD,4.0,,12,4.0,1925.0,2017.0,6907
2,4.0,7.0,68.0,4.0,4.0,4.0,03/08/2019,3.0,4.0,4.0,...,77.0,5.0,7 EDMUND AVE,3.0,4.0,12,4.0,1928.0,2017.0,6908
3,,6.0,41.0,4.0,4.0,4.0,03/07/2019,3.0,4.0,4.0,...,75.0,5.0,394 AVENUE RD,4.0,,12,4.0,1931.0,2017.0,6909


In [29]:
'''Step 2: Explore and Assess the Data

Explore the Data
Identify data quality issues, like missing values, duplicate data, etc.

Cleaning Steps
Document steps necessary to clean the data

- dev_application_df to replace # on APPLICATION and REFERENCE_FILE#
- We'd like to only analyze data for apartments with balconies. Any apt with no BALCONY_GUARDS should be removed

'''


Row(Ward Population='Median Age 40.6', Ward='Ward 20')

In [None]:
''' Step 3: Define the Data Model

3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

'''

In [32]:
# Need to create a temp veiw to run spark sql
dev_application_df.createOrReplaceTempView("dev_application")
ward_demo_df.createOrReplaceTempView("ward_demo")
build_eval_df.createOrReplaceTempView("build_eval")

In [36]:
'''Step 4: Run Pipelines to Model the Data

4.1 Create the data model
'''

dev_application_table = spark.sql("""
select
      _id,
      APPLICATION_TYPE,
      DATE_SUBMITTED,
      DESCRIPTION,
      HEARING_DATE,
      POSTAL,
      STATUS,
      STREET_DIRECTION,
      STREET_NAME,
      STREET_NUM,
      STREET_TYPE,
      X,
      Y
from dev_application
""")
dev_application_table.limit(2).toPandas()

Unnamed: 0,_id,APPLICATION_TYPE,DATE_SUBMITTED,DESCRIPTION,HEARING_DATE,POSTAL,STATUS,STREET_DIRECTION,STREET_NAME,STREET_NUM,STREET_TYPE,X,Y
0,540631,SA,2017-11-10,Site Plan Control application for a proposed 1...,,,Closed,E,KING,65,ST,314829.237,4834177.49
1,540632,OZ,2018-07-06,The City has received applications to amend th...,,M4T,Under Review,,YONGE,1506,ST,313272.284,4838553.3


In [None]:
table_name = "dev_application_table"
parquet_file_name = temp_bucket + table_name + ".parquet"
dev_application_table.write.parquet("s3a://" + parquet_file_name)

In [None]:

try:
    with conn.cursor() as cur:
        cur.execute(copy_sql.format(
            table_name,
            parquet_file_name,
            config.get("AWS", "AWS_IAM_ROLE")))
    conn.commit()
except Exception as err:
    print("Error: ", err)
    conn.rollback()

In [None]:
'''4.3 Data dictionary

Create a data dictionary for your data model. 
For each field, provide a brief description of what the data is and where it came from. 

'''

In [None]:
# Clean up 
s3 = boto3.resource(
    's3',
    aws_access_key_id=config.get("AWS", "AWS_ACCESS_KEY_ID"),
    aws_secret_access_key=config.get("AWS", "AWS_SECRET_ACCESS_KEY"))
bucket = s3.Bucket(main_bucket)
bucket.objects.filter(Prefix="temp").delete()

In [None]:
'''
Step 5: Complete Project Write Up


'''