# Project Title
### Data Engineering Capstone Project

#### Project Summary
A team of Data Scientists has asked the following question: could the number of crimes committed in a city correlated to the average temperature and the immigration rates?

**NOTE:** Not considering psychological or socioeconomic factors.

To develop a pilot test, the Data Science team decided to take as a sample from one of the main cities in the United States, the city of Los Angeles California.

To help this team of researchers, the Data Engineering team has been asked to be in charge of obtaining and preparing the data to carry out this analysis.

This project was designed to execute the process to pull and transform the data needed for the Data Science team.

The project follows the next steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Doing all imports and installs
import pandas as pd

import configparser
from datetime import datetime, timedelta
import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, to_date
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek, date_format
from pyspark.sql import types as T
from pyspark.sql.types import *

### Step 1: Scope the Project and Gather Data

#### Scope
For this project the main idea is to create a ETL process to make sure Immigration, crime and temperature data match. This process will allow the Data Science team to test its hypothesis about the correlation between immigration, crime and temperature data.


#### Describe and Gather Data

In [2]:
# Initializing Spark session
spark = SparkSession.builder\
.config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport()\
.getOrCreate()

* #### Immigrtion Data
Each report contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries).

    Data and description retrieved from: 
    [Immigration data source](https://travel.trade.gov/research/reports/i94/historical/2016.html)

In [3]:
# Setting the path for Immigration data
i94_path = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'

# Reading data
df_i94 = spark.read.format('com.github.saurfang.sas.spark').load(i94_path)

In [4]:
# Visualizing Immigration data
df_i94.limit(10).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2
5,18.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MI,20555.0,...,,M,1959.0,09302016,,,AZ,92471040000.0,602.0,B1
6,19.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1953.0,09302016,,,AZ,92471400000.0,602.0,B2
7,20.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,...,,M,1959.0,09302016,,,AZ,92471610000.0,602.0,B2
8,21.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20553.0,...,,M,1970.0,09302016,,,AZ,92470800000.0,602.0,B2
9,22.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20562.0,...,,M,1968.0,09302016,,,AZ,92478490000.0,608.0,B1


* #### LA Crime Data
This dataset reflects incidents of crime in the City of Los Angeles from 2010 to 2019. This data is transcribed from original crime reports that are typed on paper and therefore there may be some inaccuracies within the data. Some location fields with missing data are noted as (0°, 0°). Address fields are only provided to the nearest hundred block in order to maintain privacy.

    Data and description retrieved from:
    [LA Crime data source](https://www.kaggle.com/chaitanyakck/crime-data-from-2020-to-present?select=Crime_Data_from_2020_to_Present.csv)

In [5]:
# Setting the path for Crimes in L.A. city data
crimes_data_path = 'csv_data/LA_crime_data/Crime_Data_from_2010_to_2019.csv'

# Reading data
df_LA_crimes = spark.read.csv(crimes_data_path, header=True, inferSchema=True)

In [6]:
# Visualizing LA Crimes data
df_LA_crimes.limit(10).toPandas()

Unnamed: 0,DR_NO,Date Rptd,DATE OCC,TIME OCC,AREA,AREA NAME,Rpt Dist No,Part 1-2,Crm Cd,Crm Cd Desc,...,Status,Status Desc,Crm Cd 1,Crm Cd 2,Crm Cd 3,Crm Cd 4,LOCATION,Cross Street,LAT,LON
0,1307355,02/20/2010 12:00:00 AM,02/20/2010 12:00:00 AM,1350,13,Newton,1385,2,900,VIOLATION OF COURT ORDER,...,AA,Adult Arrest,900,,,,300 E GAGE AV,,33.9825,-118.2695
1,11401303,09/13/2010 12:00:00 AM,09/12/2010 12:00:00 AM,45,14,Pacific,1485,2,740,"VANDALISM - FELONY ($400 & OVER, ALL CHURCH VA...",...,IC,Invest Cont,740,,,,SEPULVEDA BL,MANCHESTER AV,33.9599,-118.3962
2,70309629,08/09/2010 12:00:00 AM,08/09/2010 12:00:00 AM,1515,13,Newton,1324,2,946,OTHER MISCELLANEOUS CRIME,...,IC,Invest Cont,946,,,,1300 E 21ST ST,,34.0224,-118.2524
3,90631215,01/05/2010 12:00:00 AM,01/05/2010 12:00:00 AM,150,6,Hollywood,646,2,900,VIOLATION OF COURT ORDER,...,IC,Invest Cont,900,998.0,,,CAHUENGA BL,HOLLYWOOD BL,34.1016,-118.3295
4,100100501,01/03/2010 12:00:00 AM,01/02/2010 12:00:00 AM,2100,1,Central,176,1,122,"RAPE, ATTEMPTED",...,IC,Invest Cont,122,,,,8TH ST,SAN PEDRO ST,34.0387,-118.2488
5,100100506,01/05/2010 12:00:00 AM,01/04/2010 12:00:00 AM,1650,1,Central,162,1,442,SHOPLIFTING - PETTY THEFT ($950 & UNDER),...,AA,Adult Arrest,442,,,,700 W 7TH ST,,34.048,-118.2577
6,100100508,01/08/2010 12:00:00 AM,01/07/2010 12:00:00 AM,2005,1,Central,182,1,330,BURGLARY FROM VEHICLE,...,IC,Invest Cont,330,,,,PICO BL,GRAND AV,34.0389,-118.2643
7,100100509,01/09/2010 12:00:00 AM,01/08/2010 12:00:00 AM,2100,1,Central,157,1,230,"ASSAULT WITH DEADLY WEAPON, AGGRAVATED ASSAULT",...,AA,Adult Arrest,230,,,,500 CROCKER ST,,34.0435,-118.2427
8,100100510,01/09/2010 12:00:00 AM,01/09/2010 12:00:00 AM,230,1,Central,171,1,230,"ASSAULT WITH DEADLY WEAPON, AGGRAVATED ASSAULT",...,IC,Invest Cont,230,,,,800 W OLYMPIC BL,,34.045,-118.264
9,100100511,01/09/2010 12:00:00 AM,01/06/2010 12:00:00 AM,2100,1,Central,132,1,341,"THEFT-GRAND ($950.01 & OVER)EXCPT,GUNS,FOWL,LI...",...,IC,Invest Cont,341,998.0,,,200 S OLIVE ST,,34.0538,-118.2488


* #### World Temperature Data

    Daily temperature data from ERA5 reanalysis data from Copernicus Climate Service. It's time-series value in degrees Celsius, for 1000 most populous cities in the world, from Jan-01-1980 to Sept-30-2020.

    Data and description retreived from:
    [Temperature data source](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)

In [7]:
# Setting the path for Top Cities Temperature data
cities_temp_path = 'csv_data/Temp_data/daily_temperature_1000_cities_1980_2020.csv'

# Reading data
df_cities_temp = spark.read.csv(cities_temp_path, header=True, inferSchema=True)

In [8]:
# Visualizing Top Cities Temperature data
df_cities_temp.limit(10).toPandas()

Unnamed: 0,_c0,0,1,2,3,4,5,6,7,8,...,990,991,992,993,994,995,996,997,998,999
0,city,Tokyo,New York,Mexico City,Mumbai,São Paulo,Delhi,Shanghai,Kolkata,Los Angeles,...,Bilāspur,Sargodha,Leipzig,Tinnevelly,Cancún,Yangzhou,Novokuznetsk,Latakia,Heroica Matamoros,Göteborg
1,city_ascii,Tokyo,New York,Mexico City,Mumbai,Sao Paulo,Delhi,Shanghai,Kolkata,Los Angeles,...,Bilaspur,Sargodha,Leipzig,Tinnevelly,Cancun,Yangzhou,Novokuznetsk,Latakia,Heroica Matamoros,Goteborg
2,lat,35.685,40.6943,19.4424,19.017,-23.5587,28.67,31.2165,22.495,34.1139,...,22.0904,32.0854,51.3354,8.7304,21.17,32.4,53.75,35.54,25.88,57.75
3,lng,139.7514,-73.9249,-99.131,72.857,-46.625,77.23,121.4365,88.3247,-118.4068,...,82.16,72.675,12.41,77.69,-86.83,119.43,87.115,35.78,-97.5,12.0
4,country,Japan,United States,Mexico,India,Brazil,India,China,India,United States,...,India,Pakistan,Germany,India,Mexico,China,Russia,Syria,Mexico,Sweden
5,iso2,JP,US,MX,IN,BR,IN,CN,IN,US,...,IN,PK,DE,IN,MX,CN,RU,SY,MX,SE
6,iso3,JPN,USA,MEX,IND,BRA,IND,CHN,IND,USA,...,IND,PAK,DEU,IND,MEX,CHN,RUS,SYR,MEX,SWE
7,admin_name,Tōkyō,New York,Ciudad de México,Mahārāshtra,São Paulo,Delhi,Shanghai,West Bengal,California,...,Chhattīsgarh,Punjab,Saxony,Tamil Nādu,Quintana Roo,Jiangsu,Kemerovskaya Oblast’,Al Lādhiqīyah,Tamaulipas,Västra Götaland
8,capital,primary,,primary,admin,admin,admin,admin,admin,,...,,minor,minor,,minor,,,admin,minor,admin
9,population,35676000.0,19354922.0,19028000.0,18978000.0,18845000.0,15926000.0,14987000.0,14787000.0,12815475.0,...,543454.0,542603.0,542529.0,542200.0,542043.0,539715.0,539616.0,539147.0,538785.0,537797.0


### Step 2: Explore and Assess the Data
#### Explore and Clean the Data 
In this step the objective is to identify data quality issues, like missing values, duplicate data, etc. The data types most be consitent to make sure the Data Science Team will be able to use all data

To clean the data sets the Data Engineering Team should drop duplicate records and make sure there is no missing values in columns that contain dates because this columns will be used to create the relations between data sets.

The general process consists of 3 steps
1. Exploring the data and counting records
2. Dropping duplicates and missing values
3. Changing data type if needed

Exploring Immigration Data

In [9]:
# Exploring df_i94 schema
df_i94.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [10]:
# Counting total rows in df_i94 dataframe
df_i94.count()

3096313

In [11]:
# Dropping duplicated and missing values
df_i94_cleaned = df_i94.dropDuplicates(["cicid"])
df_i94_cleaned = df_i94.dropna(how="any", subset=["cicid", "arrdate", "depdate"])
df_i94_cleaned.count()

2953856

In [12]:
# Defining function to transform dates from sas files
def convert_datetime(x):
    """
    This function transforms dates in sas files into
    date type.
    """
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None
udf_datetime_from_sas = udf(lambda x: convert_datetime(x), T.DateType())

In [13]:
# Changing data types
df_i94_cleaned = df_i94.withColumn("arrdate", udf_datetime_from_sas("arrdate"))\
                    .withColumn("depdate", udf_datetime_from_sas("depdate"))\
                    .withColumn("cicid", df_i94["cicid"].cast("bigint"))\
                    .withColumn("i94yr", df_i94["i94yr"].cast("int"))\
                    .withColumn("i94mon", df_i94["i94mon"].cast("int"))\
                    .withColumn("i94cit", df_i94["i94cit"].cast("int"))\
                    .withColumn("i94res", df_i94["i94res"].cast("int"))\
                    .withColumn("i94mode", df_i94["i94mode"].cast("int"))\
                    .withColumn("i94bir", df_i94["i94bir"].cast("int"))\
                    .withColumn("i94visa", df_i94["i94visa"].cast("int"))\
                    .withColumn("biryear", df_i94["biryear"].cast("int"))\
                    .withColumn("admnum", df_i94["admnum"].cast("bigint"))

# Validating data type changes
df_i94_cleaned.printSchema()

df_i94_cleaned.limit(5).toPandas()

root
 |-- cicid: long (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: date (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: date (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: long (nullable = 

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6,2016,4,692,692,XXX,2016-04-29,,,,...,U,,1979,10282016,,,,1897628485,,B2
1,7,2016,4,254,276,ATL,2016-04-07,1.0,AL,,...,Y,,1991,D/S,M,,,3736796330,296.0,F1
2,15,2016,4,101,101,WAS,2016-04-01,1.0,MI,2016-08-25,...,,M,1961,09302016,M,,OS,666643185,93.0,B2
3,16,2016,4,101,101,NYC,2016-04-01,1.0,MA,2016-04-23,...,,M,1988,09302016,,,AA,92468461330,199.0,B2
4,17,2016,4,101,101,NYC,2016-04-01,1.0,MA,2016-04-23,...,,M,2012,09302016,,,AA,92468463130,199.0,B2


Exploring LA Crime Data

In [14]:
# Exploring df_LA_crimes schema
df_LA_crimes.printSchema()

root
 |-- DR_NO: integer (nullable = true)
 |-- Date Rptd: string (nullable = true)
 |-- DATE OCC: string (nullable = true)
 |-- TIME OCC: integer (nullable = true)
 |-- AREA : integer (nullable = true)
 |-- AREA NAME: string (nullable = true)
 |-- Rpt Dist No: integer (nullable = true)
 |-- Part 1-2: integer (nullable = true)
 |-- Crm Cd: integer (nullable = true)
 |-- Crm Cd Desc: string (nullable = true)
 |-- Mocodes: string (nullable = true)
 |-- Vict Age: integer (nullable = true)
 |-- Vict Sex: string (nullable = true)
 |-- Vict Descent: string (nullable = true)
 |-- Premis Cd: integer (nullable = true)
 |-- Premis Desc: string (nullable = true)
 |-- Weapon Used Cd: integer (nullable = true)
 |-- Weapon Desc: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Status Desc: string (nullable = true)
 |-- Crm Cd 1: integer (nullable = true)
 |-- Crm Cd 2: integer (nullable = true)
 |-- Crm Cd 3: integer (nullable = true)
 |-- Crm Cd 4: integer (nullable = true)
 |-- 

In [15]:
# Counting total rows in df_LA_crimes dataframe
df_LA_crimes.count()

2116239

In [16]:
# Dropping duplicated and missing values
df_LA_crimes_cleaned = df_LA_crimes.dropDuplicates(["DR_NO"])
df_LA_crimes_cleaned = df_LA_crimes.dropna(how="any", subset=["DR_NO", "DATE OCC"])
df_LA_crimes_cleaned.count()

2116239

In [17]:
# Changing data types
df_LA_crimes_cleaned = df_LA_crimes_cleaned.withColumn("Date Rptd", to_date(col("Date Rptd"), "MM/dd/yyyy"))\
                                           .withColumn("DATE OCC", to_date(col("DATE OCC"), "MM/dd/yyyy"))\
                                           .withColumn("DR_NO", df_LA_crimes_cleaned["DR_NO"].cast("bigint"))\

# Validating data type changes
df_LA_crimes_cleaned.printSchema()

#df_LA_crimes_cleaned.limit(5).toPandas()

root
 |-- DR_NO: long (nullable = true)
 |-- Date Rptd: date (nullable = true)
 |-- DATE OCC: date (nullable = true)
 |-- TIME OCC: integer (nullable = true)
 |-- AREA : integer (nullable = true)
 |-- AREA NAME: string (nullable = true)
 |-- Rpt Dist No: integer (nullable = true)
 |-- Part 1-2: integer (nullable = true)
 |-- Crm Cd: integer (nullable = true)
 |-- Crm Cd Desc: string (nullable = true)
 |-- Mocodes: string (nullable = true)
 |-- Vict Age: integer (nullable = true)
 |-- Vict Sex: string (nullable = true)
 |-- Vict Descent: string (nullable = true)
 |-- Premis Cd: integer (nullable = true)
 |-- Premis Desc: string (nullable = true)
 |-- Weapon Used Cd: integer (nullable = true)
 |-- Weapon Desc: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Status Desc: string (nullable = true)
 |-- Crm Cd 1: integer (nullable = true)
 |-- Crm Cd 2: integer (nullable = true)
 |-- Crm Cd 3: integer (nullable = true)
 |-- Crm Cd 4: integer (nullable = true)
 |-- LOCATIO

Exploring Top Cities Temperatures Data

1. Columns are named with numbers from 1 to 999. For this project it is required LA data so the columns needed are "_c0" and "8", the remaining columns can be dropped

2. First records have informative data not needed for this project so they can be filtered out 

In [18]:
# Creating a list of columns that will be included
columns_to_keep = ["_c0", "8"]

# Creating a list of rows to filter
no_valid_values = ["city", "city_ascii", "lat", "lng", "country", "iso2", "iso3", "admin_name", "capital", "population", "id", "datetime"]

In [19]:
# Dropping all columns except for "_c0" and "8"
#df_LA_temp = df_cities_temp.drop(*columns_to_drop)
df_LA_temp = df_cities_temp.select(*columns_to_keep)

# Filtering out records included in no_valid_values
df_LA_temp_cleaned = df_LA_temp.filter(~df_LA_temp._c0.isin(no_valid_values))

In [20]:
# Exploring LA Temperaure schema
df_LA_temp_cleaned.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- 8: string (nullable = true)



In [21]:
# Counting total rows in df_LA_temp_cleaned dataframe
df_LA_temp_cleaned.count()

14884

In [22]:
# Dropping duplicated and missing values
df_LA_temp_cleaned = df_LA_temp_cleaned.dropDuplicates(["_c0"])
df_LA_temp_cleaned = df_LA_temp_cleaned.dropna(how="any", subset=["_c0", "8"])
df_LA_temp_cleaned.count()

14884

In [23]:
# Changing data types
df_LA_temp_cleaned = df_LA_temp_cleaned.withColumn("_c0", df_LA_temp_cleaned["_c0"].cast(DateType()))\
                                       .withColumn("8", df_LA_temp_cleaned["8"].cast("double"))

# Validating data type changes
df_LA_temp_cleaned.printSchema()

#df_LA_temp_cleaned.limit(5).toPandas()

root
 |-- _c0: date (nullable = true)
 |-- 8: double (nullable = true)



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
In this step the main idea is to configure a set of attributes (in this case immigration rates and temperature) to validate if they have an influence on the number of crimes committed.

That's the reason why Data Engineering and Data Science teams select the Immigration data and LA temperature data to create the dimensional tables and LA Crimes data to create the fact table.

**Dimension tables**:

- immigration_table
    - date_arr
    - date_dep
    - id_imm
    - year_arr
    - month_arr
    - port_arr
    - trans_mode
    - state_arr
    - visa_code
    - visa_type
    - visa_issued
    - imm_age
    - imm_gender
    - imm_city_birth
    - imm_city_res
    
    
- LA_temp_table
    - date
    - avg_temp
    
    
- time_table
    - date
    - year
    - month
    - week
    - weekday
    - day
    
**Fact table**:

- LA_crime_table
    - id_LA_crime
    - date_occ
    - date_rep
    - time_occ
    - area_name
    - crime_com_desc
    - modus_op_code
    - vict_age
    - vict_gender
    - vict_descent
    - crime_place_desc
    - weapon_used_desc
    - status_case_desc
    - latitude
    - longitude

#### 3.2 Mapping Out Data Pipelines

This pipeline should:
1. Load the datasets cleaned
2. Filter values in the immigration dataset to include only California info.
3. Create the dimension tables (immigration_dim_table, LA_temp_dim_table and time_dim_table)
4. Create fact table (LA_crime_fact_table)

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
This step creates the data model previously defined.

In [24]:
output_data = "data_transformed/"

In [34]:
def process_immigration_data(input_data, output_data):
    """
    This function loads the immigration data 
    previously cleaned and transform it to meake 
    sure only includes the Los Angeles data and 
    finally, creates the .parquet files
    """
    
    # Getting immigration data
    df_i94_cleaned = input_data
    
    # Filtering i94addr = CA
    df_i94_cleaned = df_i94_cleaned.filter(df_i94_cleaned.i94addr == "CA")

    # Creating the year and month columns from depdate
    df_i94_cleaned = df_i94_cleaned.withColumn("month_dep", month("depdate")) \
                                   .withColumn("year_dep", year("depdate"))
    
    # Creating immigration dimension table
    immigration_dim_table = df_i94_cleaned.select(
                                          col("cicid").alias("id_imm"),
                                          col("i94port").alias("port_arr"),
                                          col("i94mode").alias("trans_mode"),
                                          col("i94addr").alias("state_arr"),
                                          col("i94visa").alias("visa_code"),
                                          col("visatype").alias("visa_type"),
                                          col("biryear").alias("imm_age"),
                                          col("gender").alias("imm_gender"),
                                          col("i94cit").alias("imm_city_birth"),
                                          col("i94res").alias("imm_city_res"),
                                          col("arrdate").alias("date_arr"),
                                          col("i94yr").alias("year_arr"),
                                          col("i94mon").alias("month_arr"),
                                          col("depdate").alias("date_dep"),
                                          col("year_dep"),
                                          col("month_dep")) \
                                          .dropDuplicates()
    
    # Writting immigration dimension table to parquet files partitioned by year_dep and month_dep
    immigration_dim_table.write.partitionBy("year_dep", "month_dep").parquet(os.path.join(output_data, "immigration.parquet"), "overwrite")
    
    # Printing the final schema
    #print(immigration_dim_table.printSchema())
    
    return immigration_dim_table

In [35]:
def process_temperature_data(input_data, output_data):
    """
    This function loads the temperature data 
    previously cleaned, transforms it and finally,
    creates the .parquet files
    """
    
    # Getting temperature data
    df_LA_temp = input_data
    
    # Creating LA temperature dimension table
    LA_temp_dim_table = df_LA_temp.select(
                                          col("_c0").alias("date"),
                                          col("8").alias("avg_temp")) \
                                         .dropDuplicates()
    
    # Writting temperature dimension table to parquet files
    LA_temp_dim_table.write.parquet(os.path.join(output_data, "temperature.parquet"), "overwrite")
    
    # Printing the final schema
    #print(LA_temp_dim_table.printSchema())
    
    return LA_temp_dim_table

In [36]:
def process_crimes_data(input_data, output_data):
    """
    This function loads the LA crimes data 
    previously cleaned,transforms it into time 
    dimension table and crimes fact table and finally, 
    creates the .parquet files
    """
    
    # Getting LA crimes data
    df_LA_crimes_cleaned = input_data
    
    # Creating the year, month, week, weekday and day columns from DATE OCC
    df_LA_crimes_cleaned = df_LA_crimes_cleaned.withColumn("year", year("DATE OCC")) \
                                               .withColumn("month", month("DATE OCC")) \
                                               .withColumn("week", weekofyear("DATE OCC")) \
                                               .withColumn("weekday", dayofweek("DATE OCC")) \
                                               .withColumn("day", dayofmonth("DATE OCC"))
    
    # Creating time dimension table
    time_dim_table = df_LA_crimes_cleaned.select(
                                                 col("DATE OCC").alias("date"),
                                                 col("year"),
                                                 col("month"),
                                                 col("week"),
                                                 col("weekday"),
                                                 col("day")) \
                                                 .dropDuplicates()
    
    # Writting crimes fact table to parquet files partitioned by year_occ and month_occ
    time_dim_table.write.partitionBy("year", "month").parquet(os.path.join(output_data, "time.parquet"), "overwrite")
    
    # Printing the final schema
    #print(time_dim_table.printSchema())
    
    # Creating LA temperature fact table
    LA_crimes_fact_table = df_LA_crimes_cleaned.select(
                                                      col("DR_NO").alias("id_crime"),
                                                      col("DATE Rptd").alias("date_rptd"),
                                                      col("TIME OCC").alias("hour_occ"),
                                                      col("AREA NAME").alias("area_name"),
                                                      col("Crm Cd desc").alias("crime_com_desc"),
                                                      col("Mocodes").alias("modus_op_code"),
                                                      col("Vict Age").alias("vict_age"),
                                                      col("Vict Sex").alias("vict_gender"),
                                                      col("Vict Descent").alias("vict_descent_code"),
                                                      col("Premis Desc").alias("crime_place_desc"),
                                                      col("Weapon Desc").alias("weapon_used_desc"),
                                                      col("Status Desc").alias("status_case_desc"),
                                                      col("LAT").alias("latitude"),
                                                      col("LON").alias("longitude"),
                                                      col("DATE OCC").alias("date_occ"),
                                                      col("year").alias("year_occ"),
                                                      col("month").alias("month_occ")) \
                                                     .dropDuplicates()
    
    # Writting crimes fact table to parquet files partitioned by year_occ and month_occ
    LA_crimes_fact_table.write.partitionBy("year_occ", "month_occ").parquet(os.path.join(output_data, "crimes.parquet"), "overwrite")
    
    # Printing the final schema
    #print(LA_crimes_fact_table.printSchema())
    
    return time_dim_table, LA_crimes_fact_table

In [37]:
immigration_dim_table = process_immigration_data(df_i94_cleaned, output_data)

In [38]:
LA_temp_dim_table = process_temperature_data(df_LA_temp_cleaned, output_data)

In [39]:
time_dim_table, LA_crimes_fact_table = process_crimes_data(df_LA_crimes_cleaned, output_data)

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [33]:
# Creating a dictionary of names and tables
dict_tables = {
               "immigration_dim_table": immigration_dim_table, 
               "LA_temp_dim_table": LA_temp_dim_table, 
               "time_dim_table": time_dim_table, 
               "LA_crimes_fact_table": LA_crimes_fact_table
               }

In [34]:
# Creating a function to validate id's are unique values
def data_quality_unique_key(tables):
    """
    This function validates if id column
    in a table is unique.
    """
    for name, table in tables.items():
        col_name = table.schema.names
        if table.count() & table.count() > table.dropDuplicates([col_name[0]]).count():
            print("Test failed...")
            print(f"{name} has duplicated values in ID column")
        else:
            print("Test passed...")
            print(f"{name} has no duplicated values in ID column")

In [35]:
data_quality_unique_key(dict_tables)

Test passed...
immigration_dim_table has no duplicated values in ID column
Test passed...
LA_temp_dim_table has no duplicated values in ID column
Test passed...
time_dim_table has no duplicated values in ID column
Test passed...
LA_crimes_fact_table has no duplicated values in ID column


In [36]:
# Creating a function to validate the completeness of tables
def data_quality_completeness(tables):
    """
    This function validates if table exists
    and if it contains records.
    """
    for name, table in tables.items():
        try:
            if table.count():
                records = table.count()
                if records != 0:
                    print("Test passed...")
                    print(f"{name} exists and it has {records} records")
                else:
                    print("Test failed...")
                    print(f"{name} has zero records")
        except:
            print("Test failed...")
            print(f"{name} does not exist")

In [37]:
data_quality_completeness(dict_tables)

Test passed...
immigration_dim_table exists and it has 470386 records
Test passed...
LA_temp_dim_table exists and it has 14884 records
Test passed...
time_dim_table exists and it has 3652 records
Test passed...
LA_crimes_fact_table exists and it has 2116239 records


#### 4.3 Data dictionary 
The data dictionary provides a brief description of data used in the data model.

immigration_dim_table


| Value | Description |
| --- | --- |
|date_arr | Date when immigrant arrived (yyyy-mmdd) |
date_dep | Date when immigrant departured (yyyy-mmdd) |
id_imm | Number to identify the record |
year_arr | Year when immigrant arrived |
month_arr | Month when immigrant arrived (numeric) |
port_arr | Port where immigrant arrived |
trans_mode | Transport mode (1 = "Air"; 2 = "Sea"; 3 = "Land"; 4 = "Not Reported") |
state_arr | State where immigrant arrived |
visa_code | Visa codes (1 = "Business"; 2 = "Pleasure"; 3 = "Student" |
visa_type | Class of admission legally admitting the non-immigrant to temporarily stayed in U.S. |
visa_issued | Department of State where Visa was issued |
imm_age | Immigrant age |
imm_gender | Immigrant gender |
imm_city_birth | Immigrant city of birth in code |
imm_city_res | Immigrant city of residence in code |


LA_temperature_dim_table

| Value | Description |
| --- | --- |
| date | Date of temperature recorded |
| avg_temp| Average of temperatures |

time_dim_table

| Value | Description |
| --- | --- |
| year | Year |
| month | Month (numeric) |
| week | Week of year (numeric) |
| weekday | Day of the week (numeric) |
| day | Day of month (numeric) |

LA_crimes_fact_table

| Value | Description |
| --- | --- |
| id_crime | Number to identify the record |
| date_rptd | Date when the crime reported (yyyy-mm-dd) |
| date_occ | Date when the crime occurred (yyyy-mm-dd) |
| hour_occ | Hour when the crime occurred |
| area_name | The 21 Geographic Areas or Patrol Divisions |
| crime_com_desc | Description of the crime commited |
| modus_op_code | Modus Operandi code |
| vict_age | Victim age |
| vict_gender | Victim gender (F = "Female"; M = "Male"; X = "Unknown") |
| vict_descent_code | Victim descent (A = "Other Asian"; B = "Black"; C = "Chinese"; D = "Cambodian"; F = "Filipino"; G = "Guamanian"; H = "Hispanic/Latin/Mexican"; I = "American Indian/Alaskan Native"; J = "Japanese"; K = "Korean"; L = "Laotian"; O = "Other"; P = "Pacific Islander"; S = "Samoan"; U = "Hawaiian"; V = "Vietnamese"; W = "White"; X = "Unknown"; Z = "Asian Indian") |
| crime_place_desc | The type of structure, vehicle, or location where the crime occurred |
| weapon_used_desc | The type of weapon used in the crime |
| status_case_desc | Status of the case. (IC is the default) |
| latitude | Latiude coordinates |
| longitude | Longitude coordinates |
| year_occ | Year when the crime occurred |
| month_occ | Month when the crime occurred (numeric) |

#### Step 5: Complete Project Write Up

This project uses Apache Spark because it is a technology built and optimized for processing huge amounts of data within a fast way, it manages easily data formats (SAS, CSV, JSON, parquet), it can read in data from other sources as well (such as Amazon S3). 

Considering this project is not intended to show data in real-time, and Immigration/Crime data sources were built on a monthly basis, the Data Engineering team decided that data will be refreshed by month.
                              

#### The data was increased by 100x.

There are many big data frameworks that could handle huge amounts of data; selecting one will depend on many factors.
For this particular project Apache Spark could be a good solution because we are not working with stream data and Spark is faster than Hadoop.
Also, it can be mentioned that there are options that allow us to manage in an easy way the increments of data; one of these platforms is AWS EMR among others.

#### The data populates a dashboard that must be updated on a daily basis by 7 am every day.

One of the most popular technologies for running pipelines on a schedule is Apache Airflow. 
This tool provides a control dashboard for users and maintainers. Also, it comes with many Hooks that can be integrated with common systems (HttpHook, PostgresHook, MySqlHook, SlackHook, PrestoHook, etc.)
For this particular project implementing Apache Airflow could handle tasks scheduled.

#### The database needed to be accessed by 100+ people.

Data warehouse in cloud is one of the best options for accessing databases simultaneously by a lot of people. Technologies like Redshift, BigQuery, Teradata, Aster, Oracle ExaData or Azure, paralellize the execution of one query on multiple CPU/machines.