# Immigration Data Analytics
### Data Engineering Capstone Project

#### Project Summary
Lot of people migrate to and from US every year, which creates a huge task of tracking each and every immigrant in the country difficult. Luckily a huge ammount of data gets generated for every immigrant, the data about their travel, destination, visa type etc. When combined this data with weather and demographic data would certainly give some insights about immigrants such as, the peak period when there are lot of application for immigration, which sate in US is prefered etc.

With data data available I have built a robust and scalable pipeline in Airflow using Amazon Web Services such as using S3, EMR and postgres.
this pipeline moves raw data to S3 and transforms it into a data model which lets the regulators trach individual immigrants easily.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import datetime
import os
import numpy as np
from math import isnan

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T


In [3]:

def getNullStatus(df):
    df_c = df.apply(lambda x: not x.isnull().values.any(), axis=0).T
    df_c = pd.DataFrame(df_c).rename(columns={0:"contains_null?"})
    df_s = pd.DataFrame(df.iloc[0,:]).rename(columns={0:"example_value"})
    df_f = df_c.join(df_s).reset_index(drop=False)
    
    return df_f


### Step 1: Scope the Project and Gather Data

#### Scope 
Lot of people migrate to and from US every year, which creates a huge task of tracking each and every immigrant in the country difficult. Luckily a huge ammount of data gets generated for every immigrant, the data about their travel, destination, visa type etc. When combined this data with weather and demographic data would certainly give some insights about immigrants such as, the peak period when there are lot of application for immigration, which sate in US is prefered etc.

With data data available I have built a robust and scalable pipeline in Airflow using Amazon Web Services such as using S3, EMR and postgres.
this pipeline moves raw data to S3 and transforms it into a data model which lets the regulators trach individual immigrants easily.

#### Describe and Gather Data 

##### SOURCE
- I94 Immigration Data: This data comes from the US National Tourism and Trade Office [Source](https://travel.trade.gov/research/reports/i94/historical/2016.html). This data records immigration records partitioned by month of every year.
- World temperature Data: This dataset comes from Kaggle [Source](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data). Includes temperature recordings of cities around the world for a period of time
- US City Demographic Data: This dataset comes from OpenSoft [Source](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/). Includes population formation of US states, like race and gender.
- Aiport Code table: [Source](https://datahub.io/core/airport-codes#data). Includes a collection of airport codes and their respective cities, countries around the world.

## Explore Data
Exploring each data for missing values, duplicate values etc.

### Immigration Data

In [15]:
# for data exploration we will use pandas

# immegration data (sample)
df_immData = pd.read_csv("immigration_data_sample.csv", sep = ",")
df_immData = df_immData.iloc[:,1:]
df_immData.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [16]:
print("##############################################\n")
print(f"Sample Data Rows: {df_immData.shape[0]},\nSample Data Columns: {df_immData.shape[1]}\n")
print("##############################################\n")
getNullStatus(df_immData)

##############################################

Sample Data Rows: 1000,
Sample Data Columns: 28

##############################################



Unnamed: 0,index,contains_null?,example_value
0,cicid,True,4.08432e+06
1,i94yr,True,2016
2,i94mon,True,4
3,i94cit,True,209
4,i94res,True,209
5,i94port,True,HHW
6,arrdate,True,20566
7,i94mode,True,1
8,i94addr,False,HI
9,depdate,False,20573


### Observations and fixes [*These fixes will be done in pyspark*]

- From obserrvation I found out that there are some abnormalities in Immigration data which needs to be fixed. Below are the observations about i94 data
    - the data contains unwanted characters in 
        - `[dtadfile, dtaddto]` column contains unwanted characters such as `(D/S)`
        - The documentation of i94 staes that there are 3 genders `[F, M, O]`, But in the data there is an extra character `x`. Replacing this `x` with `O`
    - sas date format for columns `[arrdate, depdate]`
        - convert these columns to datettime
    - change date format of `[dtadfile, dtaddto]`
    - country and city codes are given in description file `I94_SAS_Labels_Descriptions.SAS`
        - extract city codes and country codes
    


### US Cities Demographic data

In [17]:
df_demog = pd.read_csv("us-cities-demographics.csv", sep=";")
df_demog.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [18]:
print("##############################################\n")
print(f"Sample Data Rows: {df_demog.shape[0]},\nSample Data Columns: {df_demog.shape[1]}\n")
print("##############################################\n")
getNullStatus(df_demog)

##############################################

Sample Data Rows: 2891,
Sample Data Columns: 12

##############################################



Unnamed: 0,index,contains_null?,example_value
0,City,True,Silver Spring
1,State,True,Maryland
2,Median Age,True,33.8
3,Male Population,False,40601
4,Female Population,False,41862
5,Total Population,True,82463
6,Number of Veterans,False,1562
7,Foreign-born,False,30908
8,Average Household Size,False,2.6
9,State Code,True,MD


#### Observations and fixes [*These fixes will be done in pyspark*]
- This data is pretty straight-forward. Although, The `State` and `City` columns do contains null values. This can be fixed by combining the same information from airport data with this data so that we have all the cities in US.

### Airport Codes

In [19]:
df_airport = pd.read_csv("airport-codes_csv.csv", sep=',')
df_airport.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [28]:
print("##############################################\n")
print(f"Sample Data Rows: {df_airport.shape[0]},\nSample Data Columns: {df_airport.shape[1]}\n")
print("##############################################\n")
getNullStatus(df_airport)

##############################################

Sample Data Rows: 55075,
Sample Data Columns: 12

##############################################



Unnamed: 0,index,contains_null?,example_value
0,ident,True,00A
1,type,True,heliport
2,name,True,Total Rf Heliport
3,elevation_ft,False,11
4,continent,False,
5,iso_country,False,US
6,iso_region,True,US-PA
7,municipality,False,Bensalem
8,gps_code,False,00A
9,iata_code,False,


#### Observations and fixed [*These fixes will be done in pyspark*]

- The column `coordinates` needs a fix, the values of lattitude and longitude are combined in one column. These values can be seperated.
- state and country codes are combined into one in column `iso_region`

#### Global temperature by cities

In [36]:
df_gtemp = pd.read_csv("/data2/GlobalLandTemperaturesByCity.csv", sep=',', nrows=10000)
df_gtemp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [37]:
print("##############################################\n")
print(f"Sample Data Rows: {df_gtemp.shape[0]},\nSample Data Columns: {df_gtemp.shape[1]}\n")
print("##############################################\n")
getNullStatus(df_gtemp)

##############################################

Sample Data Rows: 10000,
Sample Data Columns: 7

##############################################



Unnamed: 0,index,contains_null?,example_value
0,dt,True,1743-11-01
1,AverageTemperature,False,6.068
2,AverageTemperatureUncertainty,False,1.737
3,City,True,Århus
4,Country,True,Denmark
5,Latitude,True,57.05N
6,Longitude,True,10.33E


## Cleaning Steps

#### Data Fix and transformation in pyspark

In [2]:
spark = SparkSession.builder\
.config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.config("spark.driver.extraJavaOptions", "-Dlog4jspark.root.logger=WARN,console")\
.enableHiveSupport()\
.getOrCreate()

In [3]:

def convertSasDate(sd):
    sas_start = datetime.datetime(year=1960, month=1, day=1)
    sd = sd.apply(lambda x: datetime.timedelta(days=x) if not isnan(x) else x )
    return sas_start + sd

def convertStrToDate(sd, fmt = "%d%m%Y"):
    return sd.apply(lambda x: datetime.datetime.strptime(str(x), fmt) if not isnan(x) else x)
    

In [4]:
# city
demo = spark.read.csv("us-cities-demographics.csv", inferSchema=True, header=True, sep=";")\
.select(F.col("State Code").alias("state_code"), F.col("City").alias("city"))

us_airport = spark.read.csv("airport-codes_csv.csv", inferSchema=True, header=True)\
.filter(F.col("iso_country")==F.lit("US"))\
.withColumn("state_code", F.split("iso_region", "-").getItem(1))\
.select("state_code", F.col("municipality").alias("city"))

city = us_airport.union(demo).dropDuplicates().withColumn("city_id", F.monotonically_increasing_id())

city.show(5)

+----------+------------+-------+
|state_code|        city|city_id|
+----------+------------+-------+
|        MI|       Grant|      0|
|        IL|    Bonfield|      1|
|        NE|      Wilber|      2|
|        IN|Indianapolis|      3|
|        KS|      Ottawa|      4|
+----------+------------+-------+
only showing top 5 rows



In [5]:
us_airport = spark.read.csv("airport-codes_csv.csv", inferSchema=True, header=True)\
.filter(F.col("iso_country")==F.lit('US'))\
.withColumn("airport_latitude", F.split("coordinates", ", ").getItem(0))\
.withColumn("airport_longitude", F.split("coordinates", ", ").getItem(1))\
.withColumn("state", F.split("iso_region", "-").getItem(1))\
.withColumnRenamed("ident", "icao_code")\
.join(city, (F.col("municipality")==city.city) & (F.col("state")==city.state_code), 'left')\
.drop("coordinates", "gps_code", "local_code", "continent", "iso_region", "iso_country", "municipality", "state", "city", "state_code")

In [6]:
us_airport.show(5)

+---------+-------------+--------------------+------------+---------+------------------+-----------------+-------------+
|icao_code|         type|                name|elevation_ft|iata_code| airport_lattitude|airport_longitude|      city_id|
+---------+-------------+--------------------+------------+---------+------------------+-----------------+-------------+
|      00A|     heliport|   Total Rf Heliport|          11|     null|-74.93360137939453|   40.07080078125| 893353197568|
|     00AA|small_airport|Aero B Ranch Airport|        3435|     null|       -101.473911|        38.704022|1047972020224|
|     00AK|small_airport|        Lowell Field|         450|     null|    -151.695999146|      59.94919968| 240518168576|
|     00AL|small_airport|        Epps Airpark|         820|     null|-86.77030181884766|34.86479949951172| 953482739712|
|     00AR|       closed|Newport Hospital ...|         237|     null|        -91.254898|          35.6087| 532575944704|
+---------+-------------+-------

In [7]:
demo = spark.read.csv("us-cities-demographics.csv", inferSchema=True, header=True, sep=';')\
.select(
    F.col("Male Population").cast(T.LongType()).alias("male_population"),
    F.col("Female Population").cast(T.LongType()).alias("female_population"),
    F.col("Total Population").cast(T.LongType()).alias("total_population"),
    F.col("Number of Veterans").cast(T.LongType()).alias("num_veterans"),
    F.col("Foreign-born").cast(T.LongType()).alias("foreign_born"),
    F.col("Average Household Size").alias("avg_household_size"),
    F.col("State Code").alias("state_code"),
    F.col("Race").alias("race"),
    F.col("Median Age").alias("median_age"),
    F.col("City").alias('city')
)\
.join(city, ['city', 'state_code'])\
.drop('city', 'state_code')

demo.show(5)

+---------------+-----------------+----------------+------------+------------+------------------+--------------------+----------+-------+
|male_population|female_population|total_population|num_veterans|foreign_born|avg_household_size|                race|median_age|city_id|
+---------------+-----------------+----------------+------------+------------+------------------+--------------------+----------+-------+
|         410615|           437808|          848423|       42186|       72456|              2.53|               White|      34.1|      3|
|         410615|           437808|          848423|       42186|       72456|              2.53|               Asian|      34.1|      3|
|         410615|           437808|          848423|       42186|       72456|              2.53|Black or African-...|      34.1|      3|
|         410615|           437808|          848423|       42186|       72456|              2.53|  Hispanic or Latino|      34.1|      3|
|         410615|           437808

In [8]:
#/data2/GlobalLandTemperaturesByCity.csv
thres = F.to_date(F.lit("2013-08-01")).cast(T.TimestampType())
global_temp = spark.read.csv("/data2/GlobalLandTemperaturesByCity.csv", inferSchema=True, header=True)\
.where((F.col('dt')>thres) & (F.col("AverageTemperature").isNotNull()) & (F.col("Country")==F.lit("United States")))\
.withColumn("latitude", 
            F.when( F.array_contains( F.split("Latitude", ""), "N"), 
                  F.expr("substring(Latitude, 1, length(Latitude)-1)")).
            otherwise( -1 * F.expr("substring(Latitude, 1, length(Latitude)-1)") ))\
.withColumn("longitude",
           F.when( F.array_contains( F.split("Longitude", ""), "E"), 
                  F.expr("substring(Longitude, 1, length(Longitude)-1)")).
            otherwise( -1 * F.expr("substring(Longitude, 1, length(Longitude)-1)")))\
.withColumnRenamed("AverageTemperature", "avg_temp")\
.withColumnRenamed("AverageTemperatureUncertainty", "std_temp")\
.withColumnRenamed("City", "city")\
.withColumnRenamed("Country", "country")\
.join(city, "city", "left")\
.drop("dt", "Country", "Latitude", "city", "state_code")\



global_temp.show(20)


+------------------+------------------+---------+---------+-------------+
|          avg_temp|          std_temp|longitude|lattitude|      city_id|
+------------------+------------------+---------+---------+-------------+
|            25.791|              1.18|  -100.53|    32.95|1640677507127|
|            25.791|              1.18|  -100.53|    32.95| 893353197587|
|            17.799|             1.093|   -80.95|    40.99|1125281431575|
|            17.799|             1.093|   -80.95|    40.99| 858993459241|
|            17.799|             1.093|   -80.95|    40.99| 850403524630|
|            17.799|             1.093|   -80.95|    40.99| 807453851679|
|            17.799|             1.093|   -80.95|    40.99| 412316860423|
|            19.207|             0.866|  -107.03|    34.56|1400159338540|
|            19.643|              1.05|   -76.99|    39.38|1314259992598|
|            19.643|              1.05|   -76.99|    39.38|1108101562435|
|            19.643|              1.05

In [51]:

month_year = "apr16"
immigrant = spark.read.format('com.github.saurfang.sas.spark')\
.load("/data/18-83510-I94-Data-2016/i94_{}_sub.sas7bdat".format(month_year))\
.withColumn("gender", F.when(F.col("gender")==F.lit("X"), F.lit("O")).otherwise(F.col("gender")))\
.select(
    F.col("cicid").cast(T.IntegerType()).alias("cicid"),
    F.col("i94res").cast(T.IntegerType()).alias("from_country_code"),
    F.col("i94bir").cast(T.IntegerType()).alias("age"),
    F.col("i94visa").cast(T.IntegerType()).alias("visa_code"),
    F.col("visapost").alias("visa_post"),
    F.col("occup").alias("occupation"),
    F.col("visatype").alias("visa_type"),
    F.col("biryear").cast(T.IntegerType()).alias("birth_year"),
    F.col("gender")
)\
.withColumn("i94_dt", F.lit(month_year))

immigrant.show(5)

+-----+-----------------+---+---------+---------+----------+---------+----------+------+------+
|cicid|from_country_code|age|visa_code|visa_post|occupation|visa_type|birth_year|gender|i94_dt|
+-----+-----------------+---+---------+---------+----------+---------+----------+------+------+
|    6|              692| 37|        2|     null|      null|       B2|      1979|  null| apr16|
|    7|              276| 25|        3|      SEO|      null|       F1|      1991|     M| apr16|
|   15|              101| 55|        2|     null|      null|       B2|      1961|     M| apr16|
|   16|              101| 28|        2|     null|      null|       B2|      1988|  null| apr16|
|   17|              101|  4|        2|     null|      null|       B2|      2012|  null| apr16|
+-----+-----------------+---+---------+---------+----------+---------+----------+------+------+
only showing top 5 rows



In [55]:
@F.udf(T.TimestampType())
def convSasDate(daysCount):
    import datetime
    sas_ref = datetime.datetime(1960,1,1)
    try:
        return sas_ref + datetime.timedelta(days=int(daysCount))
    except:
        return daysCount

immigration = spark.read.format('com.github.saurfang.sas.spark')\
.load("/data/18-83510-I94-Data-2016/i94_{}_sub.sas7bdat".format(month_year))\
.select(
    F.col("cicid").cast(T.IntegerType()).alias("cicid"),
    F.col("admnum").cast(T.LongType()).alias("admnum"),
    F.col("i94port").alias("iata_code"),
    F.col("i94addr").alias("state_code"),
    "arrdate","depdate", "dtaddto", "airline", "fltno", "entdepa", "entdepd", "entdepu", "matflag"
)\
.withColumn("arrival_date", convSasDate("arrdate"))\
.withColumn("departure_date", convSasDate("depdate"))\
.withColumn("deadline_departure", F.unix_timestamp("dtaddto", 'mmddyyyy').cast(T.TimestampType()))\
.withColumn("i94_dt", F.lit(month_year))\
.drop("arrdate", "depdate", "dtaddto")


immigration.show(5)


+-----+-----------+---------+----------+-------+-----+-------+-------+-------+-------+-------------------+-------------------+-------------------+------+
|cicid|     admnum|iata_code|state_code|airline|fltno|entdepa|entdepd|entdepu|matflag|       arrival_date|     departure_date| deadline_departure|i94_dt|
+-----+-----------+---------+----------+-------+-----+-------+-------+-------+-------+-------------------+-------------------+-------------------+------+
|    6| 1897628485|      XXX|      null|   null| null|      T|   null|      U|   null|2016-04-29 00:00:00|               null|2016-01-28 00:10:00| apr16|
|    7| 3736796330|      ATL|        AL|   null|00296|      G|   null|      Y|   null|2016-04-07 00:00:00|               null|               null| apr16|
|   15|  666643185|      WAS|        MI|     OS|   93|      T|      O|   null|      M|2016-04-01 00:00:00|2016-08-25 00:00:00|2016-01-30 00:09:00| apr16|
|   16|92468461330|      NYC|        MA|     AA|00199|      O|      O|   nul

In [57]:
# combining immigration and demographic data to denormalize it for analytics in spark

country_code = spark.read.csv("country_code.txt", inferSchema=True, header=True)
state_code = spark.read.csv("state_code.txt", inferSchema=True, header=True)

demo_oi = demo\
.select("median_age", "city_id", "total_population", "foreign_born")\
.join(city.select("state_code", "city_id"), "city_id")\
.drop('city_id')\
.groupBy("state_code")\
.agg(
    F.mean("median_age").alias('median_age'),
    F.sum("total_population").alias("total_population"),
    F.sum("foreign_born").alias("foreign_born")
)

imm_demo = immigrant\
.select('cicid', 'from_country_code', 'age', 'occupation', 'gender', 'i94_dt')\
.join(country_code, immigrant.from_country_code==country_code.code, 'left')\
.drop('from_country_code', 'code')\
.withColumnRenamed('country', 'from_country')\
.join(immigration.select('cicid','state_code'), 'cicid', 'left')\
.join(state_code, immigration.state_code==state_code.code, 'left')\
.drop('code')\
.join(demo_oi, 'state_code')\
.drop('state_code')

imm_demo.show(2)

+-----+---+----------+------+------+---------------+-------+----------+----------------+------------+
|cicid|age|occupation|gender|i94_dt|   from_country|  state|median_age|total_population|foreign_born|
+-----+---+----------+------+------+---------------+-------+----------+----------------+------------+
|40574| 59|      null|     M| apr16|UNITED KINGDOM'|ARIZONA|   35.0375|        22497710|     3411565|
|68579| 34|      null|     M| apr16|          CHINA|ARIZONA|   35.0375|        22497710|     3411565|
+-----+---+----------+------+------+---------------+-------+----------+----------------+------------+
only showing top 2 rows



In [58]:
# cobining airport and weather data to denormalize the data for analytics

airport_weather = us_airport\
.select("name", "elevation_ft", "city_id")\
.join(city, 'city_id', 'left')\
.join(state_code, city.state_code==state_code.code, 'left')\
.join(global_temp, 'city_id', 'inner')\
.drop('state_code', 'code', 'city_id')

airport_weather.show(5)

+--------------------+------------+------+----------+--------+--------+---------+---------+
|                name|elevation_ft|  city|     state|avg_temp|std_temp|longitude|lattitude|
+--------------------+------------+------+----------+--------+--------+---------+---------+
|Arnold Ranch Airport|         382|Fresno|CALIFORNIA|  24.186|   0.797|  -119.34|    36.17|
|Valley Medical Ce...|         307|Fresno|CALIFORNIA|  24.186|   0.797|  -119.34|    36.17|
|Pg&E, Fresno Serv...|         289|Fresno|CALIFORNIA|  24.186|   0.797|  -119.34|    36.17|
|Community Regiona...|         415|Fresno|CALIFORNIA|  24.186|   0.797|  -119.34|    36.17|
|Sierra Sky Park A...|         321|Fresno|CALIFORNIA|  24.186|   0.797|  -119.34|    36.17|
+--------------------+------------+------+----------+--------+--------+---------+---------+
only showing top 5 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.