# Project Title
### Data Engineering Capstone Project

#### Project Summary
SEE DESCRIPTION IN README
This notebook was only used to develop the code for etl_main.py and elt_capstone.py.
The ETL is found in these two files.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import numpy as np 
from pyspark.sql import SparkSession
from datetime import datetime, timedelta
from pyspark.sql.types import IntegerType, DateType
from pyspark.sql.functions import isnan, when, count, col, udf, to_date, from_unixtime, unix_timestamp, countDistinct, upper, monotonically_increasing_id

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

#### Immigration Data

In [2]:
# Read in the data here
immi_df = pd.read_csv('immigration_data_sample.csv')

In [3]:
immi_df.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [4]:
# CAREFUL: takes 30 minutes to read in data
# fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
# df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [5]:
# df.head()

In [6]:
# read data into Spark
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").enableHiveSupport().getOrCreate()

In [7]:
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [8]:
df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [9]:
df_spark.select([count(when(col(c).isNull(), c)).alias(c) for c in df_spark.columns]).show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|  occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender| insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|    0|    0|     0|     0|     0|      0|      0|    239| 152592| 142457|   802|      0|    0|       1| 1881250|3088187|    238| 138429|3095921| 138429|    802|    477|414269|2982605|  83627|     0|19549|       0|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+---

In [10]:
df_spark.show(10)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [11]:
# remove null values
df_spark2 = df_spark.na.fill(0, ["depdate", "i94bir", "biryear"])

In [12]:
df_spark2 = df_spark2.na.fill(99, ["i94addr"])
df_spark2 = df_spark2.na.fill(9, ["i94mode"])
df_spark2 = df_spark2.na.fill("", ["i94addr", "dtadfile", "visapost", "occup", "entdepa", "entdepd", "entdepu", "matflag", "dtaddto", "gender", "insnum", "airline", "fltno", "occup"])

In [13]:
df_spark2.select([count(when(col(c).isNull(), c)).alias(c) for c in df_spark2.columns]).show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|    0|    0|     0|     0|     0|      0|      0|      0|      0|      0|     0|      0|    0|       0|       0|    0|      0|      0|      0|      0|      0|      0|     0|     0|      0|     0|    0|       0|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-

In [14]:
df_spark2.show(10)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|    9.0|       |    0.0|  37.0|    2.0|  1.0|        |        |     |      T|       |      U|       | 1979.0|10282016|      |      |       | 1.897628485E9|     |      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|    0.0|  25.0|    3.0|  1.0|20130811|     SE

In [15]:
# cast double to integer
df_spark2 = df_spark2.withColumn("cicid", col("cicid").cast(IntegerType()))\
    .withColumn("i94yr", col("i94yr").cast(IntegerType()))\
    .withColumn("i94mon", col("i94mon").cast(IntegerType()))\
    .withColumn("i94cit", col("i94cit").cast(IntegerType()))\
    .withColumn("i94res", col("i94res").cast(IntegerType()))\
    .withColumn("i94mode", col("i94mode").cast(IntegerType()))\
    .withColumn("i94bir", col("i94bir").cast(IntegerType()))\
    .withColumn("i94visa", col("i94visa").cast(IntegerType()))\
    .withColumn("count", col("count").cast(IntegerType()))\
    .withColumn("biryear", col("biryear").cast(IntegerType()))

In [16]:
df_spark3 = df_spark2.withColumn("depdate", col("depdate").cast(IntegerType()))

In [17]:
def convert_datetime(x):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None
udf_datetime_from_sas = udf(lambda x: convert_datetime(x), DateType())

df_spark3 = df_spark2.withColumn("arrdate", udf_datetime_from_sas("arrdate")) \
    .withColumn("depdate", udf_datetime_from_sas("depdate"))

In [18]:
df_spark3 = df_spark3.withColumn('dtadfile', to_date(unix_timestamp("dtadfile", 'yyyyMMdd').cast('timestamp')))

In [19]:
df_spark3 = df_spark3.withColumn('dtaddto', to_date(unix_timestamp("dtaddto", 'MMddyyyy').cast('timestamp')))
df_spark3.show(10)

+-----+-----+------+------+------+-------+----------+-------+-------+----------+------+-------+-----+----------+--------+-----+-------+-------+-------+-------+-------+----------+------+------+-------+--------------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|   arrdate|i94mode|i94addr|   depdate|i94bir|i94visa|count|  dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|   dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+-----+------+------+------+-------+----------+-------+-------+----------+------+-------+-----+----------+--------+-----+-------+-------+-------+-------+-------+----------+------+------+-------+--------------+-----+--------+
|    6| 2016|     4|   692|   692|    XXX|2016-04-29|      9|       |1960-01-01|    37|      2|    1|      null|        |     |      T|       |      U|       |   1979|2016-10-28|      |      |       | 1.897628485E9|     |      B2|
|    7| 2016|     4|   254|   276|    ATL|2016-04-07|      1|     AL|1960-01

In [20]:
df_spark3.printSchema()

root
 |-- cicid: integer (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: date (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = false)
 |-- depdate: date (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- dtadfile: date (nullable = true)
 |-- visapost: string (nullable = false)
 |-- occup: string (nullable = false)
 |-- entdepa: string (nullable = false)
 |-- entdepd: string (nullable = false)
 |-- entdepu: string (nullable = false)
 |-- matflag: string (nullable = false)
 |-- biryear: integer (nullable = true)
 |-- dtaddto: date (nullable = true)
 |-- gender: string (nullable = false)
 |-- insnum: string (nullable = false)
 |-- airline: string (nullable = false)
 |-- admnum: double 

In [21]:
# change column labels
df_spark3 = df_spark3.withColumnRenamed("i94yr", "year")\
    .withColumnRenamed("i94mon", "month")\
    .withColumnRenamed("i94cit", "citizenship")\
    .withColumnRenamed("i94res", "residence")\
    .withColumnRenamed("i94port", "port")\
    .withColumnRenamed("arrdate", "arrivaldate")\
    .withColumnRenamed("i94mode", "mode")\
    .withColumnRenamed("i94addr", "state")\
    .withColumnRenamed("depdate", "departuredate")\
    .withColumnRenamed("i94bir", "age")\
    .withColumnRenamed("biryear", "birthyear")

In [22]:
df_spark3.show(5)

+-----+----+-----+-----------+---------+----+-----------+----+-----+-------------+---+-------+-----+----------+--------+-----+-------+-------+-------+-------+---------+----------+------+------+-------+--------------+-----+--------+
|cicid|year|month|citizenship|residence|port|arrivaldate|mode|state|departuredate|age|i94visa|count|  dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|birthyear|   dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+----+-----+-----------+---------+----+-----------+----+-----+-------------+---+-------+-----+----------+--------+-----+-------+-------+-------+-------+---------+----------+------+------+-------+--------------+-----+--------+
|    6|2016|    4|        692|      692| XXX| 2016-04-29|   9|     |   1960-01-01| 37|      2|    1|      null|        |     |      T|       |      U|       |     1979|2016-10-28|      |      |       | 1.897628485E9|     |      B2|
|    7|2016|    4|        254|      276| ATL| 2016-04-07|   1|   AL|   1

In [23]:
df_spark3.select([count(when(col(c).isNotNull(), c)).alias(c) for c in df_spark3.columns]).show()

+-------+-------+-------+-----------+---------+-------+-----------+-------+-------+-------------+-------+-------+-------+--------+--------+-------+-------+-------+-------+-------+---------+-------+-------+-------+-------+-------+-------+--------+
|  cicid|   year|  month|citizenship|residence|   port|arrivaldate|   mode|  state|departuredate|    age|i94visa|  count|dtadfile|visapost|  occup|entdepa|entdepd|entdepu|matflag|birthyear|dtaddto| gender| insnum|airline| admnum|  fltno|visatype|
+-------+-------+-------+-----------+---------+-------+-----------+-------+-------+-------------+-------+-------+-------+--------+--------+-------+-------+-------+-------+-------+---------+-------+-------+-------+-------+-------+-------+--------+
|3096313|3096313|3096313|    3096313|  3096313|3096313|    3096313|3096313|3096313|      3096313|3096313|3096313|3096313| 3096312| 3096313|3096313|3096313|3096313|3096313|3096313|  3096313|3050489|3096313|3096313|3096313|3096313|3096313| 3096313|
+-------+---

In [24]:
df_spark3.count()

3096313

In [25]:
# write to parquet
# df_spark.write.parquet("sas_data")
# df_spark=spark.read.parquet("sas_data")

##### Demographic Information Data

In [26]:
# read us cities demographic
cities_df = pd.read_csv('us-cities-demographics.csv', delimiter=';')

In [27]:
cities_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [28]:
cities_df.count()

City                      2891
State                     2891
Median Age                2891
Male Population           2888
Female Population         2888
Total Population          2891
Number of Veterans        2878
Foreign-born              2878
Average Household Size    2875
State Code                2891
Race                      2891
Count                     2891
dtype: int64

In [29]:
demo_df = spark.read.options(header='True',inferSchema='True',delimiter=';').csv("us-cities-demographics.csv")

In [30]:
demo_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- Male Population: integer (nullable = true)
 |-- Female Population: integer (nullable = true)
 |-- Total Population: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [31]:
demo_df.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

In [32]:
demo_df.select([count(when(col(c).isNotNull(), c)).alias(c) for c in demo_df.columns]).show()

+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|City|State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|Race|Count|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|2891| 2891|      2891|           2888|             2888|            2891|              2878|        2878|                  2875|      2891|2891| 2891|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+



In [33]:
demo_df.count()

2891

In [34]:
demo_df.select([count(when(col(c).isNull(), c)).alias(c) for c in demo_df.columns]).show()

+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|City|State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|Race|Count|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+
|   0|    0|         0|              3|                3|               0|                13|          13|                    16|         0|   0|    0|
+----+-----+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+----+-----+



In [35]:
demo_df = demo_df.dropna(how='any', subset=["Male Population", "Female Population", "Number of Veterans", "Foreign-born", "Average Household Size"])
demo_df.count()

2875

In [36]:
demo_df = demo_df.na.fill(0, ["Male Population", "Female Population", "Number of Veterans", "Foreign-born"])
demo_df = demo_df.na.fill(0.0, ["Average Household Size"])

In [37]:
cities_df.dtypes

City                       object
State                      object
Median Age                float64
Male Population           float64
Female Population         float64
Total Population            int64
Number of Veterans        float64
Foreign-born              float64
Average Household Size    float64
State Code                 object
Race                       object
Count                       int64
dtype: object

In [38]:
cities_df['Male Population'] = cities_df['Male Population'].fillna(0)
cities_df['Female Population'] = cities_df['Female Population'].fillna(0)
cities_df['Number of Veterans'] = cities_df['Number of Veterans'].fillna(0)
cities_df['Foreign-born'] = cities_df['Foreign-born'].fillna(0)

In [39]:
cities_df = cities_df.astype({"Male Population":'int', "Female Population":'int', "Number of Veterans":'int', "Foreign-born":'int'})  
cities_df.dtypes

City                       object
State                      object
Median Age                float64
Male Population             int64
Female Population           int64
Total Population            int64
Number of Veterans          int64
Foreign-born                int64
Average Household Size    float64
State Code                 object
Race                       object
Count                       int64
dtype: object

In [40]:
cities_df.groupby('State').count()

Unnamed: 0_level_0,City,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
State,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
Alabama,34,34,34,34,34,34,34,34,34,34,34
Alaska,5,5,5,5,5,5,5,5,5,5,5
Arizona,80,80,80,80,80,80,80,80,80,80,80
Arkansas,29,29,29,29,29,29,29,29,29,29,29
California,676,676,676,676,676,676,676,676,676,676,676
Colorado,80,80,80,80,80,80,80,80,80,80,80
Connecticut,39,39,39,39,39,39,39,39,39,39,39
Delaware,5,5,5,5,5,5,5,5,5,5,5
District of Columbia,5,5,5,5,5,5,5,5,5,5,5
Florida,222,222,222,222,222,222,222,219,222,222,222


In [41]:
cities_df.loc[cities_df['State'] == "Hawaii"].head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
296,Urban Honolulu,Hawaii,41.4,176807,175959,352766,23213,101312,2.69,HI,Asian,240978
798,Urban Honolulu,Hawaii,41.4,176807,175959,352766,23213,101312,2.69,HI,Black or African-American,11781
1097,Urban Honolulu,Hawaii,41.4,176807,175959,352766,23213,101312,2.69,HI,Hispanic or Latino,24586
1545,Urban Honolulu,Hawaii,41.4,176807,175959,352766,23213,101312,2.69,HI,White,110508
1546,Urban Honolulu,Hawaii,41.4,176807,175959,352766,23213,101312,2.69,HI,American Indian and Alaska Native,5592


In [42]:
cities_df.loc[cities_df['State'] == "California"].head(10)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
7,West Covina,California,39.8,51629,56860,108489,3800,37038,3.56,CA,Asian,32716
10,Folsom,California,40.9,41051,35317,76368,4187,13234,2.62,CA,Hispanic or Latino,5822
11,Folsom,California,40.9,41051,35317,76368,4187,13234,2.62,CA,American Indian and Alaska Native,998
18,Berkeley,California,32.5,60142,60829,120971,3736,25000,2.35,CA,Asian,27089
19,Santa Clara,California,35.2,63278,62938,126216,4426,52281,2.75,CA,White,55847
25,Rancho Cordova,California,33.8,34844,36182,71026,4590,17020,2.86,CA,Asian,12653
31,Chula Vista,California,34.6,131485,134269,265754,12368,82710,3.32,CA,Hispanic or Latino,156522
39,Jurupa Valley,California,33.8,49430,50884,100314,3833,25338,3.87,CA,White,58201
40,Los Angeles,California,35.0,1958998,2012898,3971896,85417,1485425,2.86,CA,White,2177650


##### I94 SAS Labels Description

In [43]:
with open('./I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')

In [44]:
def code_mapper(file, idx):
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    return dic

In [45]:
i94cit_res = code_mapper(f_content, "i94cntyl")

In [46]:
# convert dictionary in dataframe
lol = list(map(list, i94cit_res.items()))
i94cit_res_df = spark.createDataFrame(lol, ["i94city_code", "i94cntyl"])

In [47]:
i94cit_res_df.show(5)

+------------+--------------------+
|i94city_code|            i94cntyl|
+------------+--------------------+
|         582|MEXICO Air Sea, a...|
|         236|         AFGHANISTAN|
|         101|             ALBANIA|
|         316|             ALGERIA|
|         102|             ANDORRA|
+------------+--------------------+
only showing top 5 rows



In [48]:
i94cit_res_df.count()

289

In [49]:
i94port = code_mapper(f_content, "i94prtl")

In [50]:
# convert dictionary in dataframe
lol = list(map(list, i94port.items()))
i94port_df = spark.createDataFrame(lol, ["i94port_code", "i94prtl"])

In [51]:
i94port_df.show(5)

+------------+--------------------+
|i94port_code|             i94prtl|
+------------+--------------------+
|         ALC|           ALCAN, AK|
|         ANC|       ANCHORAGE, AK|
|         BAR|BAKER AAF - BAKER...|
|         DAC|   DALTONS CACHE, AK|
|         PIZ|DEW STATION PT LA...|
+------------+--------------------+
only showing top 5 rows



In [52]:
i94port_df.count()

660

In [53]:
i94mode = code_mapper(f_content, "i94model")
# convert dictionary in dataframe
lol = list(map(list, i94mode.items()))
i94model_df = spark.createDataFrame(lol, ["i94model_code", "i94model"])

i94addr = code_mapper(f_content, "i94addrl")
# convert dictionary in dataframe
lol = list(map(list, i94addr.items()))
i94addrl_df = spark.createDataFrame(lol, ["i94addrl_code", "i94addrl"])

i94visa = {'1':'Business',
'2': 'Pleasure',
'3' : 'Student'}
# convert dictionary in dataframe
lol = list(map(list, i94visa.items()))
i94visa_df = spark.createDataFrame(lol, ["i94visa_code", "i94visa"])

In [54]:
i94model_df.count()

4

In [55]:
i94addrl_df.count()

55

In [56]:
i94visa_df.count()

3

In [57]:
i94model_df.show(5)

+-------------+------------+
|i94model_code|    i94model|
+-------------+------------+
|            1|         Air|
|            2|         Sea|
|            3|        Land|
|            9|Not reported|
+-------------+------------+



In [58]:
i94addrl_df.show(5)

+-------------+----------+
|i94addrl_code|  i94addrl|
+-------------+----------+
|           AL|   ALABAMA|
|           AK|    ALASKA|
|           AZ|   ARIZONA|
|           AR|  ARKANSAS|
|           CA|CALIFORNIA|
+-------------+----------+
only showing top 5 rows



In [59]:
i94visa_df.show(5)

+------------+--------+
|i94visa_code| i94visa|
+------------+--------+
|           1|Business|
|           2|Pleasure|
|           3| Student|
+------------+--------+



##### Airport Code Data

In [60]:
airports_df = pd.read_csv('airport-codes_csv.csv')
airports_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [61]:
airports_df.count()

ident           55075
type            55075
name            55075
elevation_ft    48069
continent       27356
iso_country     54828
iso_region      55075
municipality    49399
gps_code        41030
iata_code        9189
local_code      28686
coordinates     55075
dtype: int64

#### World Temperature Data

In [62]:
fname2 = '../../data2/GlobalLandTemperaturesByCity.csv'
temp_df = pd.read_csv(fname2)

In [63]:
temp2_df = spark.read.options(header='True',inferSchema='True',delimiter=',').csv("../../data2/GlobalLandTemperaturesByCity.csv")

In [64]:
temp2_df.count()

8599212

In [65]:
temp2_df.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [66]:
temp2_df.select([count(when(col(c).isNull(), c)).alias(c) for c in temp2_df.columns]).show()

+---+------------------+-----------------------------+----+-------+--------+---------+
| dt|AverageTemperature|AverageTemperatureUncertainty|City|Country|Latitude|Longitude|
+---+------------------+-----------------------------+----+-------+--------+---------+
|  0|            364130|                       364130|   0|      0|       0|        0|
+---+------------------+-----------------------------+----+-------+--------+---------+



In [67]:
temp2_df = temp2_df.dropna(how='any', subset=["AverageTemperature", "AverageTemperatureUncertainty"])

In [68]:
temp2_df.count()

8235082

In [69]:
newtemp = temp2_df.join(i94cit_res_df, upper(temp2_df.Country) == i94cit_res_df.i94cntyl, "leftouter")
newtemp.show(5)

+-------------------+------------------+-----------------------------+------+-------+--------+---------+------------+--------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty|  City|Country|Latitude|Longitude|i94city_code|i94cntyl|
+-------------------+------------------+-----------------------------+------+-------+--------+---------+------------+--------+
|1780-01-01 00:00:00|            -4.888|                        3.353|Gyumri|Armenia|  40.99N|   44.73E|         151| ARMENIA|
|1780-02-01 00:00:00|            -2.479|                         2.98|Gyumri|Armenia|  40.99N|   44.73E|         151| ARMENIA|
|1780-03-01 00:00:00|2.8089999999999997|                         2.57|Gyumri|Armenia|  40.99N|   44.73E|         151| ARMENIA|
|1780-04-01 00:00:00|            13.089|                        2.605|Gyumri|Armenia|  40.99N|   44.73E|         151| ARMENIA|
|1780-05-01 00:00:00|17.144000000000002|                        2.575|Gyumri|Armenia|  40.99N|   44.73E|       

In [70]:
newtemp = newtemp.withColumn('id', monotonically_increasing_id())

In [71]:
temp_df = newtemp.withColumnRenamed("dt", "date")\
        .withColumnRenamed("AverageTemperature", "average_temperature")\
        .withColumnRenamed("AverageTemperatureUncertainty", "average_temperature_uncertainty")\
        .withColumnRenamed("City", "city")\
        .withColumnRenamed("Country", "country")\
        .withColumnRenamed("Latitude", "latitude")\
        .withColumnRenamed("Longitude", "longitude")

In [72]:
temp_df = temp_df.select("id", "date", "average_temperature", "average_temperature_uncertainty", "city", "i94cntyl", "i94city_code", "latitude", "longitude")

In [73]:
temp_df.head()

Row(id=0, date=datetime.datetime(1780, 1, 1, 0, 0), average_temperature=-4.888, average_temperature_uncertainty=3.353, city='Gyumri', i94cntyl='ARMENIA', i94city_code='151', latitude='40.99N', longitude='44.73E')

In [74]:
temp_df.count()

8235082

In [80]:
# min and max date
temp_df.agg({"date": "max"}).collect()[0][0]

datetime.datetime(2013, 9, 1, 0, 0)

In [81]:
temp_df.agg({"date": "min"}).collect()[0][0]

datetime.datetime(1743, 11, 1, 0, 0)

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [83]:
# Performing cleaning tasks here

# IMPORTANT: SEE ABOVE



In [84]:
df_spark3.printSchema()
df_spark3.show(5)

root
 |-- cicid: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- citizenship: integer (nullable = true)
 |-- residence: integer (nullable = true)
 |-- port: string (nullable = true)
 |-- arrivaldate: date (nullable = true)
 |-- mode: integer (nullable = true)
 |-- state: string (nullable = false)
 |-- departuredate: date (nullable = true)
 |-- age: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- dtadfile: date (nullable = true)
 |-- visapost: string (nullable = false)
 |-- occup: string (nullable = false)
 |-- entdepa: string (nullable = false)
 |-- entdepd: string (nullable = false)
 |-- entdepu: string (nullable = false)
 |-- matflag: string (nullable = false)
 |-- birthyear: integer (nullable = true)
 |-- dtaddto: date (nullable = true)
 |-- gender: string (nullable = false)
 |-- insnum: string (nullable = false)
 |-- airline: string (nullable = false)
 |-- admnum: 

In [85]:
df_spark3.createOrReplaceTempView("immigration")

In [87]:
spark.sql("""
    SELECT *
    FROM immigration
""").show(10)

+-----+----+-----+-----------+---------+----+-----------+----+-----+-------------+---+-------+-----+----------+--------+-----+-------+-------+-------+-------+---------+----------+------+------+-------+--------------+-----+--------+
|cicid|year|month|citizenship|residence|port|arrivaldate|mode|state|departuredate|age|i94visa|count|  dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|birthyear|   dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+----+-----+-----------+---------+----+-----------+----+-----+-------------+---+-------+-----+----------+--------+-----+-------+-------+-------+-------+---------+----------+------+------+-------+--------------+-----+--------+
|    6|2016|    4|        692|      692| XXX| 2016-04-29|   9|     |   1960-01-01| 37|      2|    1|      null|        |     |      T|       |      U|       |     1979|2016-10-28|      |      |       | 1.897628485E9|     |      B2|
|    7|2016|    4|        254|      276| ATL| 2016-04-07|   1|   AL|   1

### Step 3: Define the Data Model - SEE README
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
%run etl_main.py

read i94 label descriptions
i94 country codes:289
i94 port codes:660
i94 mode codes:4
i94 addr codes:55
i94 visa codes:3
read immigration data
immigration data records:3096313
immigration data records after cleaning:3096313
read demographic data
demographic data records:2891
demographic data records after cleaning:2875
read world temperature data
world temperature data records:8599212
world temperature data records after cleaning:8235082
join world temperature data records with country codes:8235082
join world temperature data records with country codes after join:8235082
Data quality check PASSED for table country_codes with 289 rows.
Performing null value check on table country_codes
Table country_codes passed.
Data quality check PASSED for table port_codes with 660 rows.
Performing null value check on table port_codes
Table port_codes passed.
Data quality check PASSED for table mode_codes with 4 rows.
Performing null value check on table mode_codes
Table mode_codes passed.
Data qual

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [89]:
# Perform quality checks here
# INCLUDED IN etl_capstone.py

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

In [90]:
# SEE README